Source code for fairdiverse.search.ranker_model.ranklib_ranker

import os
import subprocess
from pathlib import Path
import pandas as pd

from .ranker import Ranker
from ..utils.process_tabular_data import norm_features, check_nan, writeToTXT


[docs] class RankLib(Ranker): """ Wrapper class to run the available ranking models in the Ranklib library. For more information about available models and params check the official documentation: https://sourceforge.net/p/lemur/wiki/RankLib%20How%20to%20use/ """ def __init__(self, configs, dataset): """ Initialize the RankLib model with configuration settings and dataset. :param configs : dict The configuration dictionary that contains hyperparameters and paths needed for training. :param dataset : object The dataset object containing the necessary columns like feature columns and target variable. """ super().__init__(configs, dataset)
[docs] def train(self, data_train, data_test, run): """ Trains ranking models using RankLib. This method generates RankLib-compatible training data and then runs the RankLib training script. :param data_train : pandas.DataFrame The training dataset to be used for training the ranking model. :param data_test : pandas.DataFrame The testing dataset to be used for evaluating the ranking model. :param run : str The identifier for the current training run. """ self.generate_ranklib_data(data_train, data_test, run) for experiment in self._get_experiments(run): model_path = self.out_dir / str(run) / experiment / self.configs['ranker'] if not model_path.exists(): self._run_ranklib_training(experiment, run)
[docs] def predict(self, data, run, file_name): """ Generates predictions using the trained RankLib model. This method reads the predictions from the trained model and saves them as a CSV file. :param data : pandas.DataFrame The dataset on which predictions need to be made. :param run : str The identifier for the current run. :param file_name : str The file name to save the predictions as a CSV. :return : pandas.DataFrame A DataFrame containing the predictions. """ predictions = self.read_predictions(data, run) pred_dir_path = os.path.join(self.out_dir, str(run), "predictions") os.makedirs(pred_dir_path, exist_ok=True) predictions.to_csv(os.path.join(pred_dir_path, f'{file_name}_pred.csv')) return predictions
def _get_experiments(self, run): """ Fetches all experiment directories that contain '__' in the name. :param run : str The identifier for the run. :return : list A list of experiment directory names. """ return [f for f in os.listdir(self.out_dir / str(run)) if "__" in f] def _run_ranklib_training(self, experiment, run): """ Executes the RankLib training script. This method calls an external script to train the ranking model with the provided configurations. :param experiment : str The name of the current experiment. :param run : str The identifier for the current run. """ project_dir = Path.cwd() try: subprocess.check_call([ str(project_dir / self.ranker_path / "run-LTR-model.sh"), str(project_dir / self.ranker_path), str(self.configs['metric']), str(self.configs['top_k']), str(self.configs['rel_max']), str(self.configs['ranker']), str(self.configs['ranker_id']), str(self.out_dir / str(run) / experiment), str(self.configs['lr']), str(self.configs['epochs']), "none" ]) except subprocess.CalledProcessError as e: print(f"Error occurred: {e}") print(f"Command output: {e.output}")
[docs] def generate_ranklib_data(self, data_train, data_test, run): """ Generates data formatted for RankLib training and testing. This method prepares the training and testing data for RankLib by generating the required feature matrix and label information in a format that RankLib can process. :param data_train : pandas.DataFrame The training dataset. :param data_test : pandas.DataFrame The testing dataset. :param run : str The identifier for the current run. """ experiments = list(zip(self.configs['train_data'], self.configs['test_data'])) for train_name, test_name in experiments: data_train_copy = data_train.copy() data_test_copy = data_test.copy() out_dir = self.out_dir / str(run) / f"{train_name}__{test_name}" if not os.path.exists(out_dir): out_dir.mkdir(parents=True, exist_ok=True) cols_train = self._get_feature_columns(train_name) cols_test = self._get_feature_columns(test_name) self._validate_data(data_train_copy, cols_train, "train") self._validate_data(data_test_copy, cols_test, "test") data_train_copy = self._process_data(data_train_copy, cols_train) data_test_copy = self._process_data(data_test_copy, cols_test) self.create_ranklib_data(cols_train, data_train_copy, out_dir, "train") self.create_ranklib_data(cols_test, data_test_copy, out_dir, "test")
def _get_feature_columns(self, mode): """ Returns feature columns with fairness suffix if applicable. :param mode : str The mode can be "fair" or another value. If "fair", fairness-related features will be included. :return : list A list of feature column names. """ suffix = "_fair" if mode == "fair" else "" return [col + suffix for col in self.dataset.feature_cols] + [self.dataset.score_col + suffix] def _validate_data(self, data, cols, split): """ Checks for NaN values in the dataset. This method ensures that there are no missing values in the data before further processing. :param data : pandas.DataFrame The dataset to be validated. :param cols : list A list of column names to check for NaN values. :param split : str The split type, either "train" or "test". :raises ValueError : if NaN values are detected. """ if check_nan(data, cols): raise ValueError(f"NaN values detected in {split} data!") def _process_data(self, data, cols): """ Assigns judgement scores and normalizes features. This method applies judgement scoring based on relevance and normalizes the features for RankLib. :param data : pandas.DataFrame The dataset to be processed. :param cols : list The list of feature column names. :return : pandas.DataFrame The processed dataset with normalized features. """ data = data.groupby(self.dataset.query_col).apply( lambda x: self.assign_judgement(x, self.dataset.pos_th, cols)).reset_index(drop=True) return norm_features(cols, data)
[docs] def create_ranklib_data(self, cols, data, out_dir, split): """ Formats and writes data for RankLib. This method prepares the data by formatting it according to RankLib's required format and writes it to a text file. :param cols : list The list of feature columns. :param data : pandas.DataFrame The data to be written to a text file. :param out_dir : Path The output directory where the file will be saved. :param split : str The type of data, either "train" or "test". """ data = self._format_ranklib_data(cols, data) output_f = out_dir / f"{split}_ranklib.txt" writeToTXT(output_f, data)
def _format_ranklib_data(self, cols, data): """ Formats features for RankLib. This method ensures that the features are formatted in a way RankLib can consume, and shuffles the data within query groups. :param cols : list The list of feature columns. :param data : pandas.DataFrame The dataset to be formatted. :return : pandas.DataFrame The formatted dataset ready for RankLib. """ for idx, col in enumerate(cols): data[col] = data[col].apply(lambda x: f"{idx + 1}:{round(x, 4)}") data["QID"] = data["QID"].apply(lambda x: f"qid:{x}") data["UID"] = data[["UID", "judgement", cols[-1]]].astype(str).apply( lambda x: f"#docid={x.iloc[0]};rel={x.iloc[1]};{cols[-1]}={x.iloc[2]};", axis=1 ) # Shuffle within query groups data = pd.concat([g.sample(frac=1) for _, g in data.groupby("QID")]).reset_index(drop=True) return data[["judgement", "QID"] + cols + ["UID"]]
[docs] def assign_judgement(self, x, th, cols): """ Assigns judgement scores based on relevance ranking. This method assigns a judgement score to each document based on its relevance to a query. :param x : pandas.DataFrame The subset of data belonging to a single query. :param th : float The threshold for classifying relevance. :param cols : list The list of feature columns. :return : pandas.DataFrame The data with the assigned judgement scores. """ mask_pos = x[self.dataset.score_col].apply(lambda x: round(x, 2) > th) pos_x, neg_x = x[mask_pos], x[~mask_pos] pos_x['judgement'] = pos_x[cols[-1]].rank(ascending=True, method='dense') pos_x['judgement'] = self._scale_judgements(pos_x) neg_x['judgement'] = 0 return pd.concat([pos_x, neg_x])
def _scale_judgements(self, pos_x): """ Scales and rounds judgement values. This method scales the judgement values to a defined range and ensures all values are within the valid range. :param pos_x : pandas.DataFrame The data with the initial judgement scores. :return : pandas.Series The scaled judgement scores. """ min_rank, max_rank = pos_x['judgement'].min(), pos_x['judgement'].max() pos_x['judgement'] = ((pos_x['judgement'] - min_rank) / (max_rank - min_rank + 1)) * self.configs['rel_max'] pos_x['judgement'] = pos_x['judgement'].round().astype(int) # Ensure values are within valid range if pos_x['judgement'].max() < self.configs['rel_max']: pos_x['judgement'] += self.configs['rel_max'] - pos_x['judgement'].max() pos_x['judgement'] = pos_x['judgement'].clip(lower=1) return pos_x['judgement']
[docs] def read_predictions(self, data, run): """ Retrieves LTR predictions for the dataset. This method loads the predictions from the trained RankLib model. :param data : pandas.DataFrame The dataset for which predictions need to be made. :param run : str The identifier for the run. :return : pandas.DataFrame The dataset with predictions added. """ pred_dir = self.out_dir / str(run) predictions = get_LTR_predict(data, pred_dir, self.configs['ranker'], self.dataset.score_col, self.dataset.query_col, self.dataset.ID) return predictions
[docs] def get_LTR_predict(data, out_dir, ranker, score_col, query_col, id_col): """ Fetches RankLib prediction scores. This method loads prediction scores from the model and merges them with the provided dataset. :param data : pandas.DataFrame The dataset that needs the predictions. :param out_dir : Path The directory where the RankLib predictions are stored. :param ranker : str The name of the ranking model used. :param score_col : str The column name of the score in the dataset. :param query_col : str The column representing queries. :param id_col : str The unique identifier for each data point. :return : pandas.DataFrame The dataset with added prediction scores. """ experiments = [f for f in os.listdir(out_dir) if "__" in f] for experiment in experiments: train_set, test_set = experiment.split("__") train_set = f"_{train_set}" if train_set == "fair" else "" test_set = f"_{test_set}" if test_set == "fair" else "" pred_col = f"{score_col}{train_set}__{score_col}{test_set}" score_pred = get_prediction_scores(out_dir / experiment / ranker) data = data[data[id_col].astype(str).isin(score_pred)] data[pred_col] = data[id_col].apply(lambda x: score_pred.get(str(x), 0)) return data
[docs] def get_prediction_scores(pred_path): """ Retrieves prediction scores from the latest RankLib experiment. This method reads the predictions generated from the latest experiment and returns them. :param pred_path : Path The directory containing the prediction files. :return : dict A dictionary mapping document IDs to predicted scores. """ sub_experiments = [x for x in os.listdir(pred_path) if "experiments_" in x] if not sub_experiments: raise ValueError(f"No predictions found in {pred_path}!") latest_exp = max(sub_experiments, key=lambda x: os.path.getmtime(os.path.join(pred_path, x))) pred_file = Path(pred_path) / latest_exp / "predictions" / "prediction.txt" if pred_file.exists(): print(f"**** Reading predictions from {pred_file}") with pred_file.open("r") as file: lines = file.read().splitlines() return {li.split(" ")[2].split(";")[0].replace("docid=", ""): int(li.split(" ")[3]) for li in lines} raise ValueError(f"Prediction file not found in {pred_path}!")