updated to Ray 2.7

This commit is contained in:
GokuMohandas
2023-09-18 22:03:20 -07:00
parent 71b3d50a05
commit b98bd5b1ae
15 changed files with 3484 additions and 2086 deletions

View File

@@ -5,7 +5,6 @@ import sys
from pathlib import Path
import mlflow
import pretty_errors # NOQA: F401 (imported but unused)
# Directories
ROOT_DIR = Path(__file__).parent.parent.absolute()

View File

@@ -5,7 +5,6 @@ import numpy as np
import pandas as pd
import ray
from ray.data import Dataset
from ray.data.preprocessor import Preprocessor
from sklearn.model_selection import train_test_split
from transformers import BertTokenizer
@@ -135,13 +134,18 @@ def preprocess(df: pd.DataFrame, class_to_index: Dict) -> Dict:
return outputs
class CustomPreprocessor(Preprocessor):
class CustomPreprocessor:
"""Custom preprocessor class."""
def _fit(self, ds):
def __init__(self, class_to_index={}):
self.class_to_index = class_to_index or {} # mutable defaults
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
def fit(self, ds):
tags = ds.unique(column="tag")
self.class_to_index = {tag: i for i, tag in enumerate(tags)}
self.index_to_class = {v: k for k, v in self.class_to_index.items()}
return self
def _transform_pandas(self, batch): # could also do _transform_numpy
return preprocess(batch, class_to_index=self.class_to_index)
def transform(self, ds):
return ds.map_batches(preprocess, fn_kwargs={"class_to_index": self.class_to_index}, batch_format="pandas")

View File

@@ -8,13 +8,13 @@ import ray
import ray.train.torch # NOQA: F401 (imported but unused)
import typer
from ray.data import Dataset
from ray.train.torch.torch_predictor import TorchPredictor
from sklearn.metrics import precision_recall_fscore_support
from snorkel.slicing import PandasSFApplier, slicing_function
from typing_extensions import Annotated
from madewithml import predict, utils
from madewithml.config import logger
from madewithml.predict import TorchPredictor
# Initialize Typer CLI app
app = typer.Typer()
@@ -133,8 +133,8 @@ def evaluate(
y_true = np.stack([item["targets"] for item in values])
# y_pred
z = predictor.predict(data=ds.to_pandas())["predictions"]
y_pred = np.stack(z).argmax(1)
predictions = preprocessed_ds.map_batches(predictor).take_all()
y_pred = np.array([d["output"] for d in predictions])
# Metrics
metrics = {

View File

@@ -1,13 +1,20 @@
import json
import os
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import BertModel
class FinetunedLLM(nn.Module): # pragma: no cover, torch model
"""Model architecture for a Large Language Model (LLM) that we will fine-tune."""
class FinetunedLLM(nn.Module):
def __init__(self, llm, dropout_p, embedding_dim, num_classes):
super(FinetunedLLM, self).__init__()
self.llm = llm
self.dropout_p = dropout_p
self.embedding_dim = embedding_dim
self.num_classes = num_classes
self.dropout = torch.nn.Dropout(dropout_p)
self.fc1 = torch.nn.Linear(embedding_dim, num_classes)
@@ -17,3 +24,36 @@ class FinetunedLLM(nn.Module): # pragma: no cover, torch model
z = self.dropout(pool)
z = self.fc1(z)
return z
@torch.inference_mode()
def predict(self, batch):
self.eval()
z = self(batch)
y_pred = torch.argmax(z, dim=1).cpu().numpy()
return y_pred
@torch.inference_mode()
def predict_proba(self, batch):
self.eval()
z = self(batch)
y_probs = F.softmax(z, dim=1).cpu().numpy()
return y_probs
def save(self, dp):
with open(Path(dp, "args.json"), "w") as fp:
contents = {
"dropout_p": self.dropout_p,
"embedding_dim": self.embedding_dim,
"num_classes": self.num_classes,
}
json.dump(contents, fp, indent=4, sort_keys=False)
torch.save(self.state_dict(), os.path.join(dp, "model.pt"))
@classmethod
def load(cls, args_fp, state_dict_fp):
with open(args_fp, "r") as fp:
kwargs = json.load(fp=fp)
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
model = cls(llm=llm, **kwargs)
model.load_state_dict(torch.load(state_dict_fp, map_location=torch.device("cpu")))
return model

View File

@@ -1,19 +1,20 @@
import json
from pathlib import Path
from typing import Any, Dict, Iterable, List
from urllib.parse import urlparse
import numpy as np
import pandas as pd
import ray
import torch
import typer
from numpyencoder import NumpyEncoder
from ray.air import Result
from ray.train.torch import TorchPredictor
from ray.train.torch.torch_checkpoint import TorchCheckpoint
from typing_extensions import Annotated
from madewithml.config import logger, mlflow
from madewithml.data import CustomPreprocessor
from madewithml.models import FinetunedLLM
from madewithml.utils import collate_fn
# Initialize Typer CLI app
app = typer.Typer()
@@ -48,25 +49,51 @@ def format_prob(prob: Iterable, index_to_class: Dict) -> Dict:
return d
def predict_with_proba(
df: pd.DataFrame,
predictor: ray.train.torch.torch_predictor.TorchPredictor,
class TorchPredictor:
def __init__(self, preprocessor, model):
self.preprocessor = preprocessor
self.model = model
self.model.eval()
def __call__(self, batch):
results = self.model.predict(collate_fn(batch))
return {"output": results}
def predict_proba(self, batch):
results = self.model.predict_proba(collate_fn(batch))
return {"output": results}
def get_preprocessor(self):
return self.preprocessor
@classmethod
def from_checkpoint(cls, checkpoint):
metadata = checkpoint.get_metadata()
preprocessor = CustomPreprocessor(class_to_index=metadata["class_to_index"])
model = FinetunedLLM.load(Path(checkpoint.path, "args.json"), Path(checkpoint.path, "model.pt"))
return cls(preprocessor=preprocessor, model=model)
def predict_proba(
ds: ray.data.dataset.Dataset,
predictor: TorchPredictor,
) -> List: # pragma: no cover, tested with inference workload
"""Predict tags (with probabilities) for input data from a dataframe.
Args:
df (pd.DataFrame): dataframe with input features.
predictor (ray.train.torch.torch_predictor.TorchPredictor): loaded predictor from a checkpoint.
predictor (TorchPredictor): loaded predictor from a checkpoint.
Returns:
List: list of predicted labels.
"""
preprocessor = predictor.get_preprocessor()
z = predictor.predict(data=df)["predictions"]
y_prob = torch.tensor(np.stack(z)).softmax(dim=1).numpy()
preprocessed_ds = preprocessor.transform(ds)
outputs = preprocessed_ds.map_batches(predictor.predict_proba)
y_prob = np.array([d["output"] for d in outputs.take_all()])
results = []
for i, prob in enumerate(y_prob):
tag = decode([z[i].argmax()], preprocessor.index_to_class)[0]
tag = preprocessor.index_to_class[prob.argmax()]
results.append({"prediction": tag, "probabilities": format_prob(prob, preprocessor.index_to_class)})
return results
@@ -125,11 +152,10 @@ def predict(
# Load components
best_checkpoint = get_best_checkpoint(run_id=run_id)
predictor = TorchPredictor.from_checkpoint(best_checkpoint)
# preprocessor = predictor.get_preprocessor()
# Predict
sample_df = pd.DataFrame([{"title": title, "description": description, "tag": "other"}])
results = predict_with_proba(df=sample_df, predictor=predictor)
sample_ds = ray.data.from_items([{"title": title, "description": description, "tag": "other"}])
results = predict_proba(ds=sample_ds, predictor=predictor)
logger.info(json.dumps(results, cls=NumpyEncoder, indent=2))
return results

View File

@@ -3,11 +3,9 @@ import os
from http import HTTPStatus
from typing import Dict
import pandas as pd
import ray
from fastapi import FastAPI
from ray import serve
from ray.train.torch import TorchPredictor
from starlette.requests import Request
from madewithml import evaluate, predict
@@ -21,7 +19,7 @@ app = FastAPI(
)
@serve.deployment(route_prefix="/", num_replicas="1", ray_actor_options={"num_cpus": 8, "num_gpus": 0})
@serve.deployment(num_replicas="1", ray_actor_options={"num_cpus": 8, "num_gpus": 0})
@serve.ingress(app)
class ModelDeployment:
def __init__(self, run_id: str, threshold: int = 0.9):
@@ -30,8 +28,7 @@ class ModelDeployment:
self.threshold = threshold
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) # so workers have access to model registry
best_checkpoint = predict.get_best_checkpoint(run_id=run_id)
self.predictor = TorchPredictor.from_checkpoint(best_checkpoint)
self.preprocessor = self.predictor.get_preprocessor()
self.predictor = predict.TorchPredictor.from_checkpoint(best_checkpoint)
@app.get("/")
def _index(self) -> Dict:
@@ -55,11 +52,10 @@ class ModelDeployment:
return {"results": results}
@app.post("/predict/")
async def _predict(self, request: Request) -> Dict:
# Get prediction
async def _predict(self, request: Request):
data = await request.json()
df = pd.DataFrame([{"title": data.get("title", ""), "description": data.get("description", ""), "tag": ""}])
results = predict.predict_with_proba(df=df, predictor=self.predictor)
sample_ds = ray.data.from_items([{"title": data.get("title", ""), "description": data.get("description", ""), "tag": ""}])
results = predict.predict_proba(ds=sample_ds, predictor=self.predictor)
# Apply custom logic
for i, result in enumerate(results):

View File

@@ -1,6 +1,7 @@
import datetime
import json
import os
import tempfile
from typing import Tuple
import numpy as np
@@ -10,21 +11,23 @@ import torch
import torch.nn as nn
import torch.nn.functional as F
import typer
from ray.air import session
from ray.air.config import (
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.data import Dataset
from ray.train import (
Checkpoint,
CheckpointConfig,
DatasetConfig,
DataConfig,
RunConfig,
ScalingConfig,
)
from ray.air.integrations.mlflow import MLflowLoggerCallback
from ray.data import Dataset
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch import TorchTrainer
from torch.nn.parallel.distributed import DistributedDataParallel
from transformers import BertModel
from typing_extensions import Annotated
from madewithml import data, models, utils
from madewithml import data, utils
from madewithml.config import EFS_DIR, MLFLOW_TRACKING_URI, logger
from madewithml.models import FinetunedLLM
# Initialize Typer CLI app
app = typer.Typer()
@@ -106,18 +109,18 @@ def train_loop_per_worker(config: dict) -> None: # pragma: no cover, tested via
lr = config["lr"]
lr_factor = config["lr_factor"]
lr_patience = config["lr_patience"]
batch_size = config["batch_size"]
num_epochs = config["num_epochs"]
batch_size = config["batch_size"]
num_classes = config["num_classes"]
# Get datasets
utils.set_seeds()
train_ds = session.get_dataset_shard("train")
val_ds = session.get_dataset_shard("val")
train_ds = train.get_dataset_shard("train")
val_ds = train.get_dataset_shard("val")
# Model
llm = BertModel.from_pretrained("allenai/scibert_scivocab_uncased", return_dict=False)
model = models.FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
model = FinetunedLLM(llm=llm, dropout_p=dropout_p, embedding_dim=llm.config.hidden_size, num_classes=num_classes)
model = train.torch.prepare_model(model)
# Training components
@@ -126,7 +129,8 @@ def train_loop_per_worker(config: dict) -> None: # pragma: no cover, tested via
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=lr_factor, patience=lr_patience)
# Training
batch_size_per_worker = batch_size // session.get_world_size()
num_workers = train.get_context().get_world_size()
batch_size_per_worker = batch_size // num_workers
for epoch in range(num_epochs):
# Step
train_loss = train_step(train_ds, batch_size_per_worker, model, num_classes, loss_fn, optimizer)
@@ -134,9 +138,14 @@ def train_loop_per_worker(config: dict) -> None: # pragma: no cover, tested via
scheduler.step(val_loss)
# Checkpoint
metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
checkpoint = TorchCheckpoint.from_model(model=model)
session.report(metrics, checkpoint=checkpoint)
with tempfile.TemporaryDirectory() as dp:
if isinstance(model, DistributedDataParallel): # cpu
model.module.save(dp=dp)
else:
model.save(dp=dp)
metrics = dict(epoch=epoch, lr=optimizer.param_groups[0]["lr"], train_loss=train_loss, val_loss=val_loss)
checkpoint = Checkpoint.from_directory(dp)
train.report(metrics, checkpoint=checkpoint)
@app.command()
@@ -183,7 +192,6 @@ def train_model(
num_workers=num_workers,
use_gpu=bool(gpu_per_worker),
resources_per_worker={"CPU": cpu_per_worker, "GPU": gpu_per_worker},
_max_cpu_fraction_per_node=0.8,
)
# Checkpoint config
@@ -201,7 +209,7 @@ def train_model(
)
# Run config
run_config = RunConfig(callbacks=[mlflow_callback], checkpoint_config=checkpoint_config, storage_path=EFS_DIR)
run_config = RunConfig(callbacks=[mlflow_callback], checkpoint_config=checkpoint_config, storage_path=EFS_DIR, local_dir=EFS_DIR)
# Dataset
ds = data.load_data(dataset_loc=dataset_loc, num_samples=train_loop_config["num_samples"])
@@ -210,14 +218,13 @@ def train_model(
train_loop_config["num_classes"] = len(tags)
# Dataset config
dataset_config = {
"train": DatasetConfig(fit=False, transform=False, randomize_block_order=False),
"val": DatasetConfig(fit=False, transform=False, randomize_block_order=False),
}
options = ray.data.ExecutionOptions(preserve_order=True)
dataset_config = DataConfig(datasets_to_split=["train"], execution_options=options)
# Preprocess
preprocessor = data.CustomPreprocessor()
train_ds = preprocessor.fit_transform(train_ds)
preprocessor = preprocessor.fit(train_ds)
train_ds = preprocessor.transform(train_ds)
val_ds = preprocessor.transform(val_ds)
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()
@@ -230,7 +237,7 @@ def train_model(
run_config=run_config,
datasets={"train": train_ds, "val": val_ds},
dataset_config=dataset_config,
preprocessor=preprocessor,
metadata={"class_to_index": preprocessor.class_to_index},
)
# Train

View File

@@ -73,7 +73,6 @@ def tune_models(
num_workers=num_workers,
use_gpu=bool(gpu_per_worker),
resources_per_worker={"CPU": cpu_per_worker, "GPU": gpu_per_worker},
_max_cpu_fraction_per_node=0.8,
)
# Dataset
@@ -90,7 +89,8 @@ def tune_models(
# Preprocess
preprocessor = data.CustomPreprocessor()
train_ds = preprocessor.fit_transform(train_ds)
preprocessor = preprocessor.fit(train_ds)
train_ds = preprocessor.transform(train_ds)
val_ds = preprocessor.transform(val_ds)
train_ds = train_ds.materialize()
val_ds = val_ds.materialize()
@@ -102,7 +102,7 @@ def tune_models(
scaling_config=scaling_config,
datasets={"train": train_ds, "val": val_ds},
dataset_config=dataset_config,
preprocessor=preprocessor,
metadata={"class_to_index": preprocessor.class_to_index},
)
# Checkpoint configuration
@@ -118,7 +118,7 @@ def tune_models(
experiment_name=experiment_name,
save_artifact=True,
)
run_config = RunConfig(callbacks=[mlflow_callback], checkpoint_config=checkpoint_config, storage_path=EFS_DIR)
run_config = RunConfig(callbacks=[mlflow_callback], checkpoint_config=checkpoint_config, storage_path=EFS_DIR, local_dir=EFS_DIR)
# Hyperparameters to start with
initial_params = json.loads(initial_params)