diff --git a/pecos/xmr/reranker/README.md b/pecos/xmr/reranker/README.md new file mode 100644 index 0000000..450d83e --- /dev/null +++ b/pecos/xmr/reranker/README.md @@ -0,0 +1,91 @@ +# PECOS XMR Reranker + +This is a reranker for the PECOS XMR model. It is based on huggingface's transformers library. The reranker can be run in both +single process and distributed mode. It is based on the paper [Fine-Tuning LLaMA for Multi-Stage Text Retrieval](https://arxiv.org/abs/2310.08319). + +## How to run +### Single process +To run the reranker in single process mode, you can use the following command: + +```bash +python -m pecos.xmr.reranker.train --config_json_path +``` + +### Distributed mode +To run the reranker in distributed mode, you can use the following command to initialize the distributed configuration: +```bash +accelerate config +``` + +Then you can run the reranker using the following command: +```bash +accelerate launch -m pecos.xmr.reranker.train --config_json_path +``` + +## Configuration file +Here is an example of the configuration file: +```json +{ + "train_params": { + "__meta__": { + "class_fullname": "pecos.xmr.reranker.model###RankingModel.TrainParams" + }, + "target_data_folder": "/home/ec2-user/docker_disk/datasets/ms_marco_partitioned/target", + "input_data_folder": "/home/ec2-user/docker_disk/datasets/ms_marco_partitioned/input", + "label_data_folder": "/home/ec2-user/docker_disk/datasets/ms_marco_partitioned/label", + "training_args": { + "__meta__": { + "class_fullname": "pecos.xmr.reranker.trainer###RankLlamaTrainer.TrainingArgs" + }, + "learning_rate": 1e-4, + "output_dir": "./ds_model", + "per_device_train_batch_size": 8, + "gradient_accumulation_steps": 8, + "max_steps": -1, + "logging_strategy": "steps", + "logging_first_step": false, + "logging_steps": 10, + "save_strategy": "steps", + "save_steps": 50, + "save_total_limit": 5, + "seed": 42, + "data_seed": 42, + "bf16": true, + "dataloader_num_workers": 2, + "dataloader_prefetch_factor": 10, + "gradient_checkpointing": true, + "train_group_size": 16 + } + }, + "model_params": { + "__meta__": { + "class_fullname": "pecos.xmr.reranker.model###RankingModel.ModelParams" + }, + "encoder_args": { + "__meta__": { + "class_fullname": "pecos.xmr.reranker.model###CrossEncoder.Config" + }, + "model_shortcut": "meta-llama/Llama-2-7b-hf", + "model_init_kwargs": {}, + "model_modifier": { + "modifier_type": "peft", + "config_type": "LoraConfig" , + "config": { + "r": 8, + "lora_alpha": 64, + "target_modules": ["q_proj", "v_proj"], + "modules_to_save": ["score", "classifier"], + "lora_dropout": 0.1 + } + } + }, + "positive_passage_no_shuffle": false, + "negative_passage_no_shuffle": false, + "rerank_max_len": 196, + "query_prefix": "query: ", + "passage_prefix": "document: ", + "append_eos_token": false, + "pad_to_multiple_of": 16 + } +} +``` diff --git a/pecos/xmr/reranker/data_utils.py b/pecos/xmr/reranker/data_utils.py new file mode 100644 index 0000000..a445995 --- /dev/null +++ b/pecos/xmr/reranker/data_utils.py @@ -0,0 +1,141 @@ +import os +import random +from collections import OrderedDict +from typing import List, Tuple, Callable + +import numpy as np +import pyarrow.parquet as pq +from datasets import load_dataset + +import pecos + + +class RankingDataUtils(pecos.BaseClass): + """ + Utility class for handling data related tasks + """ + + @classmethod + def remap_ordereddict(cls, od: OrderedDict, keymap_fn: Callable): + """ + Function to remap the keys of an ordered Dictionary + Args: + od: The ordered dictionary to remap + keymap_fn: The function to map the keys + """ + new_od = OrderedDict() + for k, v in od.items(): + new_od[keymap_fn(k)] = v + return new_od + + @classmethod + def _format_sample( + cls, + inp_text: str, + lbl_text: str, + lbl_title: str, + inp_prefix: str = "...", + passage_prefix: str = "...", + ) -> str: + """ + Function to convert the text fields into a formatted string + that the model understands. + """ + lbl_title = lbl_title.replace("-", " ").strip() + return f"{inp_prefix} {inp_text} {passage_prefix} {lbl_title} {lbl_text}".strip() + + @classmethod + def _create_sample( + cls, + inp_id: int, + ret_idxs: List[int], + scores: List[float], + table_stores, + train_group_size: int, + inp_prefix: str, + passage_prefix: str, + ) -> Tuple[List[str], List[float]]: + """ + Function to create a sample for training. + Args: + inp_id: The input id + ret_idxs: The retrieved indices + scores: Scores for the retrieved indices + table_stores: Dictionary of table stores for input and label data + train_group_size: The number of passages used to train for each query + inp_prefix: The input prefix + passage_prefix: The passage prefix + + Returns: A tuple of formatted samples and scores + + """ + qid = inp_id + pidxs = ret_idxs + + input_store = table_stores["input"] + label_store = table_stores["label"] + + # get the values of the query + query = input_store[qid]["keywords"] + mean_score = np.mean(scores) + + # get idxs for positive items + pos_idxs = [(x, pid) for x, pid in zip(scores, pidxs) if x > mean_score] + neg_idxs = [(x, pid) for x, pid in zip(scores, pidxs) if x <= mean_score] + random.shuffle(pos_idxs) + random.shuffle(neg_idxs) + + num_positives = train_group_size // 2 + + all_selections = pos_idxs[:num_positives] + num_positives = len(all_selections) + num_negatives = train_group_size - num_positives + all_selections.extend(neg_idxs[:num_negatives]) + + if len(all_selections) < train_group_size: + all_selections.extend( + random.choices(neg_idxs, k=train_group_size - len(all_selections)) + ) + + all_scores = [s for s, _ in all_selections] + all_pids = [pid for _, pid in all_selections] + + # get the values for the retrieved items + ret_info = [label_store[i] for i in all_pids] + + formated_pair = [] + for info in ret_info: + formated_pair.append( + cls._format_sample( + query, info["contents"], info["title"], inp_prefix, passage_prefix + ) + ) + return formated_pair, all_scores + + @classmethod + def get_parquet_rows(cls, folder_path: str) -> int: + """ + Returns the count of rows in parquet files by reading the + metadata + """ + file_list = os.listdir(folder_path) + file_list = [os.path.join(folder_path, x) for x in file_list] + cumulative_rowcount = sum([pq.read_metadata(fp).num_rows for fp in file_list]) + + return cumulative_rowcount + + @classmethod + def get_sorted_data_files(cls, filenames: List[str], idx_colname) -> List[str]: + """ + Returns the list of files sorted by the id in the first row of each file + """ + # Load the datasets in streaming format and read the first id + fn_ordered = [] # this containes tuples with (idx, filename) + for fn in filenames: + tmp_ds = load_dataset("parquet", data_files=fn, streaming=True, split="train") + row = next(iter(tmp_ds.take(1))) + fn_ordered.append((row[idx_colname], fn)) + del tmp_ds + fn_ordered = sorted(fn_ordered, key=lambda x: x[0]) + + return [x[1] for x in fn_ordered] diff --git a/pecos/xmr/reranker/model.py b/pecos/xmr/reranker/model.py new file mode 100644 index 0000000..ee955b9 --- /dev/null +++ b/pecos/xmr/reranker/model.py @@ -0,0 +1,420 @@ +import dataclasses as dc +import json +import logging +import os +from dataclasses import dataclass, field +from functools import partial +from typing import Dict, List, Tuple, Any, Optional, Union + +import peft +import torch +from datasets import IterableDataset, Dataset +from peft import AutoPeftModelForSequenceClassification, get_peft_model +from peft.config import PeftConfig +from peft.mixed_model import PeftMixedModel +from peft.peft_model import PeftModel +from transformers import AutoModelForSequenceClassification, PreTrainedModel +from transformers import AutoTokenizer, PreTrainedTokenizer, PretrainedConfig + +import pecos +from pecos.xmr.reranker.trainer import RankLlamaTrainer, PARAM_FILENAME +from .data_utils import RankingDataUtils + +logger = logging.getLogger(__name__) + + +class CrossEncoderConfig(PretrainedConfig): + """ + The configuration class for the cross encoder model. This class contains the model shortcut, model modifier and + model initialization arguments for the model. The model shortcut is the name of the huggingface model. The model + modifier is the type of the modifier and the model initialization arguments are the arguments for the model. + """ + + model_type = "reranker_crossencoder" + + def __init__( + self, + model_shortcut: str = "", + model_modifier: Dict = {}, + model_init_kwargs: dict = {}, + **kwargs, + ): + super().__init__(**kwargs) + + self.model_shortcut = model_shortcut + self.model_modifier = model_modifier + self.model_init_kwargs = model_init_kwargs + + +class CrossEncoder(PreTrainedModel): + """ + The cross encoder model for ranking tasks (retrieval-based). This model is used for training and evaluation. + It is a wrapper around the huggingface transformer model. + """ + + TRANSFORMER_CLS = AutoModelForSequenceClassification + TRANSFORMER_PEFT_CLS = AutoPeftModelForSequenceClassification + + @dataclass + class Config(pecos.BaseParams): + """Encoder configuration + model_shortcut (str): the model shortcut of the HuggingFace model + model_init_kwargs (dict): model initialization kwargs + """ + + model_shortcut: str = "" + model_init_kwargs: dict = dc.field(default_factory=lambda: dict()) + model_modifier: dict = dc.field(default_factory=lambda: dict()) + + config_class = CrossEncoderConfig + + def __init__(self, config: CrossEncoderConfig): + """ + Initialize the cross encoder model + Args: + hf_model: The huggingface model + train_group_size: The group size for training (number of documents per query) + """ + super().__init__(config) + base_model = AutoModelForSequenceClassification.from_pretrained( + config.model_shortcut, num_labels=1, **config.model_init_kwargs + ) + base_model.config.pad_token_id = ( + 0 if base_model.config.pad_token_id is None else base_model.config.pad_token_id + ) + self.hf_model = base_model + + @classmethod + def from_pretrained( + cls, + pretrained_model_name_or_path: Union[str, os.PathLike], + *model_args, + config: Optional[Union[PretrainedConfig, str, os.PathLike]] = None, + cache_dir: Optional[Union[str, os.PathLike]] = None, + ignore_mismatched_sizes: bool = False, + force_download: bool = False, + local_files_only: bool = False, + token: Optional[Union[str, bool]] = None, + revision: str = "main", + use_safetensors: Optional[bool] = None, + **kwargs, + ): + """ + Load the model from the pretrained model name or path + """ + is_local = os.path.isdir(pretrained_model_name_or_path) + param_folder = pretrained_model_name_or_path + + def super_return(): + return PreTrainedModel.from_pretrained( + pretrained_model_name_or_path, + *model_args, + config, + cache_dir, + ignore_mismatched_sizes, + force_download, + local_files_only, + token, + revision, + use_safetensors, + **kwargs, + ) + + if not is_local: + raise NotImplementedError(f"{cls} can only load local models") + + with open(os.path.join(param_folder, PARAM_FILENAME), "r") as param_file: + params = json.load(param_file) + + xe_config = CrossEncoder.Config.from_dict(params["model_params"]["encoder_args"]) + xe_config = CrossEncoderConfig(**xe_config.to_dict()) + model = CrossEncoder(xe_config) + + try: + if xe_config.model_modifier["modifier_type"] == "peft": + model = PeftModel.from_pretrained(model, param_folder) + else: + super_return() + except KeyError: + logger.info("No peft configuration found") + + return model + + def forward(self, *args, **kwargs): + """ + Returns the forward output of the huggingface model + """ + return self.hf_model(*args, **kwargs) + + def gradient_checkpointing_enable(self, **kwargs): + """ + Enable gradient checkpointing for the model + """ + self.hf_model.enable_input_require_grads() + self.hf_model.gradient_checkpointing_enable(**kwargs) + + +class RankingModel(pecos.BaseClass): + """ + The ranking model class for training and evaluation of the cross encoder model. This class is used for training + and evaluation of the cross encoder model. It is a wrapper around the cross encoder model. It also contains the + parameters for the model. The model can be used for training and evaluation. + """ + + @dataclass + class TrainParams(pecos.BaseParams): + training_args: RankLlamaTrainer.TrainingArgs + target_data_folder: str = field( + metadata={ + "help": "Path to folder containing target parquet files (inp_id, [lbl_id], [rel_val])" + } + ) + input_data_folder: str = field( + metadata={"help": "Path to folder containing input parquet files (inp_id, keywords)"} + ) + label_data_folder: str = field( + metadata={ + "help": "Path to folder containing label parquet files (lbl_id, title, contents)" + } + ) + + @dataclass + class ModelParams(pecos.BaseParams): + """ + The parameters for the ranking model. This class contains the data, encoder and training arguments for the model. + """ + + encoder_args: CrossEncoder.Config + + positive_passage_no_shuffle: bool = False + negative_passage_no_shuffle: bool = False + rerank_max_len: int = 20000 + query_prefix: str = "query: " + passage_prefix: str = "document: " + append_eos_token: bool = False + pad_to_multiple_of: Optional[int] = 8 + + def __init__( + self, + encoder: Union[CrossEncoder, PeftModel, PeftMixedModel], + tokenizer: AutoTokenizer, + model_params: ModelParams, + train_params: Optional[TrainParams] = None, + ): + """ + Initialize the ranking model. The model contains the encoder, tokenizer, model parameters and training parameters. + """ + self.tokenizer = tokenizer + self.cross_encoder = encoder + + self.model_params = self.ModelParams.from_dict(model_params) + self.train_params = self.TrainParams.from_dict(train_params) if train_params else None + + @classmethod + def get_modified_model(cls, model: CrossEncoder, mod_config: Dict): + """ + Takes a pretrained Huggingface model and modifies it to include new features. Currently, the `modifier_type` + supported by this method is limited to the `peft` package. + + Args: + model (PreTrainedModel): A PreTrainedModel from the transformers package. + """ + if mod_config["modifier_type"] == "peft": + config_type = getattr(peft, mod_config["config_type"]) + peft_config: PeftConfig = config_type(**mod_config["config"]) + + # IMPORTANT This is needed for the peft adapters to train the classification head + # peft_config.modules_to_save = ["classifier", "score"] + # peft_config.task_type = TaskType.SEQ_CLS + # CrossEncoder -> (PeftModel, MLP, PeftModelVision) + model = get_peft_model(model, peft_config) + + return model + else: + logger.warn("Using model without modifiers (e.g. LoRA)") + return model + + @classmethod + def init_model(cls, model_params: ModelParams, train_params: TrainParams): + """Initiate a model with training parameters + + Args: + model_params (RankingModel.ModelParams): the model parameters + train_params (RankingModel.TrainParams): the training parameters + Returns: + An instance of UberGlobalModel + """ + hf_trainer_args = train_params.training_args + if hf_trainer_args.local_rank > 0: + torch.distributed.barrier() + + config = model_params.encoder_args.to_dict() + config = CrossEncoderConfig(**config) + encoder = CrossEncoder( + config=config, + ) + + if hf_trainer_args.bf16: + encoder = encoder.bfloat16() + + # TODO create peft model + if config.model_modifier: + encoder = cls.get_modified_model(model=encoder, mod_config=config.model_modifier) + + tokenizer = AutoTokenizer.from_pretrained( + model_params.encoder_args.model_shortcut, + ) + if tokenizer.pad_token_id is None: + tokenizer.pad_token_id = tokenizer.unk_token_id + tokenizer.padding_side = "right" + + if torch.distributed.is_initialized(): + if hf_trainer_args.local_rank == 0: + torch.distributed.barrier() + + return cls(encoder, tokenizer, model_params, train_params=train_params) + + @classmethod + def _collate_sharded( + cls, + tokenizer: Union[PreTrainedTokenizer, AutoTokenizer], + model_params: ModelParams, + train_params: TrainParams, + table_stores: Dict[str, Dataset], + data: List[Dict[str, Any]], + ) -> Dict[str, Any]: + """ + Collate function for training. Tokenizes the input and return features and returns the collated batch. + Args: + tokenizer: The huggingface tokenizer + params: The model parameters + table_stores: The table stores for the input and label data + data: The data to be collated + """ + fts_w_scores = [] + for s in data: + inp_id = s["inp_id"] + retr_idxs = s["ret_idxs"] + scores = s["rel"] + + fts_w_scores.append( + RankingDataUtils._create_sample( + inp_id, + retr_idxs, + scores, + table_stores, + train_params.training_args.train_group_size, + model_params.query_prefix, + model_params.passage_prefix, + ) + ) + + return cls._collate(tokenizer, model_params, fts_w_scores) + + @classmethod + def _collate( + cls, + tokenizer: Union[PreTrainedTokenizer, AutoTokenizer], + model_params: ModelParams, + features_w_scores: List[Tuple], + ): + """ + Collate function for training. Tokenizes the input and return features and returns the collated batch. + Args: + tokenizer: The huggerface tokenizer + params: The model parameters + features_w_scores: Tuple of features list and scores list + + Returns: The collated batch in the form of a dictionary with input and scores + + """ + features = [f for f, _ in features_w_scores] + scores = [s for _, s in features_w_scores] + + all_pairs = [] + for pairs in features: + all_pairs.extend(pairs) + + tokenized_pairs = tokenizer( + all_pairs, + padding=False, + truncation=True, + max_length=( + model_params.rerank_max_len - 1 + if model_params.append_eos_token + else model_params.rerank_max_len + ), + return_attention_mask=False, + return_token_type_ids=False, + add_special_tokens=True, + ) + + if model_params.append_eos_token: + tokenized_pairs["input_ids"] = [ + p + [tokenizer.eos_token_id] for p in tokenized_pairs["input_ids"] + ] + + pairs_collated = tokenizer.pad( + tokenized_pairs, + padding=True, + pad_to_multiple_of=model_params.pad_to_multiple_of, + return_attention_mask=True, + return_tensors="pt", + ) + # NOTE: Here scores has to be flattened, otherwise the huggingface trainer will distribute it + # incorrectly across devices in distributed training. + m_scores = torch.tensor(scores, dtype=torch.float).flatten() + + return {"input": pairs_collated, "scores": m_scores} + + @classmethod + def train( + cls, + train_dataset: IterableDataset, + table_stores: Dict[str, Dataset], + model_params: ModelParams, + train_params: TrainParams, + ): + """ + Train the ranking model + Args: + train_dataset: The training dataset (IterableDataset) + params: The model parameters (RankingModelParams) + """ + training_args = train_params.training_args + training_args.remove_unused_columns = False + outer_model = cls.init_model(model_params, train_params) + inner_model = outer_model.cross_encoder + + logger.info("Model loading...") + if torch.distributed.is_initialized(): + torch.distributed.barrier() + else: + # NOTE This is needed for the case where the program is run in a single process mode + if training_args.bf16 and not torch.distributed.is_initialized(): + inner_model = inner_model.bfloat16() + + logger.info("=" * 50) + logger.info( + f"Memory used by model: {round(inner_model.get_memory_footprint() / 1024 / 1024 / 1024, 2)} GB" + ) + + trainer = RankLlamaTrainer( + model=inner_model, + args=training_args, + train_dataset=train_dataset, + data_collator=partial( + cls._collate_sharded, + outer_model.tokenizer, + model_params, + train_params, + table_stores, + ), + outer_model=outer_model, + ) + + # NOTE: in the huggingface trainers `_prepare_input` method, the inputs are converted from + # mps device to cpu. To run on Apple Silicon, the method should be overridden. It is not + # clear if training is supported for Apple Silicon devices. + trainer.train() + trainer.save_model() diff --git a/pecos/xmr/reranker/train.py b/pecos/xmr/reranker/train.py new file mode 100644 index 0000000..aaa1f01 --- /dev/null +++ b/pecos/xmr/reranker/train.py @@ -0,0 +1,94 @@ +import argparse +import json +import logging +import os + +import datasets.distributed +import torch +from datasets import load_dataset +from transformers import set_seed + +from .data_utils import RankingDataUtils +from .model import RankingModel + +logger = logging.getLogger(__name__) + +""" +Usage: +```bash +python -m pecos.xmr.reranker.train --config_json_path config.json +``` +""" + + +def main(config_json_path: str): + """ + Args: + config_json_path: JSON configuration for running the training + """ + # parse train_params and model_params from json + with open(config_json_path, "r") as fin: + param = json.load(fin) + model_params: RankingModel.ModelParams = RankingModel.ModelParams.from_dict( + param.get("model_params", None), + recursive=True, + ) + + train_params: RankingModel.TrainParams = RankingModel.TrainParams.from_dict( + param.get("train_params", None), + recursive=True, + ) + + set_seed(train_params.training_args.seed) + + # helper function for getting the list of filepaths in a folder + def construct_file_list(folder): + return [os.path.join(folder, x) for x in os.listdir(folder)] + + input_files = construct_file_list(train_params.input_data_folder) + label_files = construct_file_list(train_params.label_data_folder) + input_files, label_files = RankingDataUtils.get_sorted_data_files( + input_files, "inp_id" + ), RankingDataUtils.get_sorted_data_files(label_files, "lbl_id") + + train_dataset = load_dataset( + "parquet", data_dir=train_params.target_data_folder, streaming=True, split="train" + ) + train_dataset_rows = RankingDataUtils.get_parquet_rows(train_params.target_data_folder) + logger.info(f"total target inputs: {train_dataset_rows}") + + training_args = train_params.training_args + # set the max_steps in accordance with the number of num_rows + if training_args.max_steps <= 0: + ws = training_args.world_size + bs = training_args.per_device_train_batch_size + gas = training_args.gradient_accumulation_steps + batch_size = ws * bs * gas + max_steps = train_dataset_rows // batch_size + training_args.max_steps = max_steps + logger.info(f"total batch size: {batch_size}, train steps: {max_steps}") + else: + logger.info(f"max steps: {training_args.max_steps}") + + table_stores = { + "input": load_dataset("parquet", data_files=input_files, split="train"), + "label": load_dataset("parquet", data_files=label_files, split="train"), + } + + train_dataset = train_dataset.shuffle(buffer_size=5000, seed=training_args.data_seed) + train_dataset = datasets.distributed.split_dataset_by_node( + train_dataset, training_args.local_rank, training_args.world_size + ) + + logger.info("Waiting for main process to perform the mapping") + if torch.distributed.is_initialized(): + torch.distributed.barrier() + + RankingModel.train(train_dataset, table_stores, model_params, train_params) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--config_json_path", type=str, required=True) + args = parser.parse_args() + main(args.config_json_path) diff --git a/pecos/xmr/reranker/trainer.py b/pecos/xmr/reranker/trainer.py new file mode 100644 index 0000000..baa4b37 --- /dev/null +++ b/pecos/xmr/reranker/trainer.py @@ -0,0 +1,128 @@ +import copy +import json +import logging +import os +from dataclasses import dataclass +from typing import Optional, Any, Tuple, Dict + +import torch +from torch.utils.data import DataLoader +from transformers import Trainer, TrainingArguments, HfArgumentParser + +import pecos + +TIME_FORMAT_STR: str = "%b_%d_%H_%M_%S" +PARAM_FILENAME: str = "param.json" + +logger = logging.getLogger(__name__) + + +class RankLlamaTrainer(Trainer, pecos.BaseClass): + """ + Trainer class for the RankLlama model. This class extends the Trainer class. + """ + + loss_fn = torch.nn.CrossEntropyLoss(reduction="mean") + outer_model = None + + def __init__(self, *args, **kwargs): + self.outer_model = kwargs.pop("outer_model") + super(RankLlamaTrainer, self).__init__(*args, **kwargs) + + @dataclass + class TrainingArgs(TrainingArguments, pecos.BaseParams): + train_group_size: int = 8 + + @classmethod + def from_dict(cls, param=None): + if param is None: + return cls() + elif isinstance(param, cls): + return copy.deepcopy(param) + elif isinstance(param, dict): + parser = HfArgumentParser(cls) + return parser.parse_dict(param, allow_extra_keys=True)[0] + raise ValueError(f"{param} is not a valid parameter dictionary for {cls.name}") + + def to_dict(self, with_meta=True): + d = super().to_dict() + return self.append_meta(d) if with_meta else d + + def get_train_dataloader(self) -> DataLoader: + """ + Returns the training dataloader. This function is called by the Trainer class. + """ + prefetch_factor = self.args.dataloader_prefetch_factor + prefetch_factor = prefetch_factor if prefetch_factor else 10 + return DataLoader( + self.train_dataset, + batch_size=self.args.per_device_train_batch_size, + # Ensure that at least one worker is creating batches + # parallel to the model compute + num_workers=max(self.args.dataloader_num_workers, 1), + # To ensure efficiency we prefetch samples in parallel + prefetch_factor=prefetch_factor, + collate_fn=self.data_collator, + ) + + def _save(self, output_dir: Optional[str] = None, state_dict=None): + """ + Save the model and tokenizer to the output directory. Makes sure the huggingface model is saved correctly. + Args: + output_dir: The output directory to save the model and tokenizer. + state_dict: The state dictionary to save + """ + # If we are executing this function, we are the process zero, so we don't check for that. + if output_dir is not None: + os.makedirs(output_dir, exist_ok=True) + logger.info(f"Saving to {output_dir}") + + outer_model: Any = self.outer_model + super()._save(output_dir, state_dict) + + # save the config + param = { + "model": outer_model.__class__.__name__, + "model_params": outer_model.model_params.to_dict(), + "train_params": outer_model.train_params.to_dict(), + } + + output_dir = output_dir if output_dir is not None else self.args.output_dir + + param = outer_model.append_meta(param) + with open(os.path.join(output_dir, PARAM_FILENAME), "w", encoding="utf-8") as f: + f.write(json.dumps(param, indent=True)) + + def _prepare_inputs(self, inputs): + """ + Prepare the inputs for the model. This function is called by the Trainer class. Converts the inputs to mps + tensors if available. + """ + super_inputs = super(RankLlamaTrainer, self)._prepare_inputs(inputs) + if torch.backends.mps.is_available(): + super_inputs = {k: v.to("mps") for k, v in super_inputs.items()} + return super_inputs + + def compute_loss( + self, model, inputs: Dict[str, Any], return_outputs: bool = False + ) -> Tuple[torch.Tensor, torch.Tensor]: + """ + Compute the loss for the model. This function is called by the Trainer class. + Args: + model: The model to compute the loss for + inputs: The inputs to the model + return_outputs: Whether to return the outputs + """ + self.args: RankLlamaTrainer.TrainingArgs + train_group_size = self.args.train_group_size + if not train_group_size: + raise NotImplementedError("Cannot perform ranking without train group") + gt_scores = inputs["scores"].reshape(-1, train_group_size) + ranker_logits = model(**inputs["input"], return_dict=True).logits + batch_size = gt_scores.shape[0] + + grouped_logits = ranker_logits.view(batch_size, -1) + assert grouped_logits.shape == gt_scores.shape + loss = self.loss_fn(grouped_logits, gt_scores) + + return (loss, ranker_logits) if return_outputs else loss diff --git a/setup.py b/setup.py index 45cbd58..07037f1 100644 --- a/setup.py +++ b/setup.py @@ -107,13 +107,13 @@ def get_blas_lib_dir(cls): # Requirements numpy_requires = [ 'numpy<1.20.0; python_version<"3.7"', # setup_requires needs correct version for <3.7 - 'numpy>=1.19.5; python_version>="3.7"' + 'numpy>=1.19.5,<2.0.0; python_version>="3.7"' ] setup_requires = numpy_requires + [ 'pytest-runner' ] install_requires = numpy_requires + [ - 'scipy>=1.4.1', + 'scipy>=1.4.1,<1.14.0', 'scikit-learn>=0.24.1', 'torch==1.13; python_version<"3.8"', 'torch>=2.0; python_version>="3.8"',