Source code for fairdiverse.recommendation.evaluator

import torch
from tqdm import tqdm, trange
import numpy as np
from scipy.sparse import coo_matrix, csr_matrix
from .metric import NDCG, HR, MRR, AUC_score, MMF, Gini, Entropy
import os
import json
from .utils import Build_Adjecent_Matrix
from .metric import *




[docs] class Abstract_Evaluator(object): def __init__(self, config): self.config = config self.M, self.iid2pid = Build_Adjecent_Matrix(config)
[docs] def eval(self, dataloader, model, store_scores = False): """ Evaluates the model on the provided dataloader and calculates performance metrics. :param dataloader: The data loader that provides batches of user-item interactions and corresponding labels. :param model: The model to evaluate. :param store_scores: Whether to return the predicted scores as a sparse matrix. Defaults to `False`. :return: A dictionary containing the evaluation metric(s) (e.g., AUC score). """ pass
[docs] class CTR_Evaluator(Abstract_Evaluator): def __int__(self,config): super().__init__(config=config)
[docs] def eval(self, dataloader, model, store_scores = False): """ Evaluates the model on the provided dataloader and calculates performance metrics. This function runs the evaluation on a dataset using the provided model. It calculates the AUC score based on the predicted scores and ground truth labels. If `store_scores` is set to `True`, it also returns the evaluation results as a sparse matrix of predicted scores. :param dataloader: The data loader that provides batches of user-item interactions and corresponding labels. :param model: The model to evaluate. :param store_scores: Whether to return the predicted scores as a sparse matrix. Defaults to `False`. :return: A dictionary containing the evaluation metric(s) (e.g., AUC score), and optionally, a sparse matrix of predicted scores. """ model.eval() y_scores = [] y_true = [] row = [] col = [] data = [] with torch.no_grad(): for user_ids, item_ids, group_ids, label in tqdm(dataloader): row.extend(user_ids.numpy().tolist()) col.extend(item_ids.numpy().tolist()) user_ids, item_ids = user_ids.to(self.config['device']), item_ids.to(self.config['device']) score = model(user_ids, item_ids).cpu().numpy().tolist() data.extend(score) label = label.cpu().numpy().tolist() y_scores.extend(score) y_true.extend(label) auc_score = AUC_score(y_scores=y_scores, y_true=y_true) result_dict = {} result_dict["auc"] = np.round(auc_score,self.config['decimals']) if store_scores == False: return result_dict else: coo = coo_matrix((data, (row, col)), shape=(self.config['user_num'], self.config['item_num'])) csr = coo.tocsr() #to remove the zero rows csr_eliminated = csr[csr.getnnz(1) > 0] coo = csr_eliminated.tocoo() return result_dict, coo
[docs] class Ranking_Evaluator(Abstract_Evaluator): def __int__(self,config): super().__init__(config=config)
[docs] def eval(self, dataloader, model, store_scores = False): """ Evaluates the model on the provided dataloader and calculates performance metrics. This function runs the evaluation on a dataset using the provided model. It calculates the Ranking metrics based on the predicted scores and ground truth labels. If `store_scores` is set to `True`, it also returns the evaluation results as a sparse matrix of predicted scores. :param dataloader: The data loader that provides batches of user-item interactions and corresponding labels. :param model: The model to evaluate. :param store_scores: Whether to return the predicted scores as a sparse matrix. Defaults to `False`. :return: A dictionary containing the evaluation metric(s), and optionally, a sparse matrix of predicted scores. """ model.eval() y_scores = [] y_true = [] result_dict = {f"ndcg@{k}":0 for k in self.config['topk']} result_dict.update({f"mrr@{k}":0 for k in self.config['topk']}) result_dict.update({f"hr@{k}":0 for k in self.config['topk']}) result_dict.update({f"mmf@{k}": 0 for k in self.config['topk']}) result_dict.update({f"gini@{k}": 0 for k in self.config['topk']}) result_dict.update({f"entropy@{k}": 0 for k in self.config['topk']}) result_dict.update({f"maxminratio@{k}": 0 for k in self.config['topk']}) exposure_dict = {f"top@{k}":np.zeros(self.config['group_num']) for k in self.config['topk']} index = 0 #UI_matrix = np.zeros((self.config['user_num'], self.config['item_num'])) row = [] col = [] data = [] with torch.no_grad(): #for user_ids, history_behavior, items, pos_length in tqdm(dataloader): for eval_data in tqdm(dataloader): user_ids, history_behavior, items, pos_length = eval_data batch_size, sample_size = items.shape #item #print(items.shape) #exit(0) pos_length = pos_length.cpu().numpy() for b in range(batch_size): row.extend([index]*sample_size) index = index + 1 real_item_ids = items[b].numpy().tolist() col.extend(real_item_ids) #print(model.IR_type) if 'retrieval' not in model.IR_type: #if self.config['data_type'] == 'point' or self.config['data_type'] == 'pair': repeat_user_tensor = user_ids[b].repeat(sample_size).unsqueeze(0).to(self.config['device']) #else: repeat_history_tensor = history_behavior[b].repeat(sample_size, 1).unsqueeze(0).to(self.config['device']) user_dict = {"user_ids": repeat_user_tensor, "history_ids": repeat_history_tensor} i = items[b].to(self.config['device']) score = model(user_dict, i.unsqueeze(0)).cpu().numpy()[0] else: user_dict = {"user_ids":user_ids[b].unsqueeze(0).to(self.config['device']), "history_ids":history_behavior[b].unsqueeze(0).to(self.config['device'])} i = items[b].to(self.config['device']) score = model.full_predict(user_dict, i.unsqueeze(0)).cpu().numpy()[0] data.extend(score.tolist()) #ranked_score = np.sort(score)[::-1] label_list = [1] * pos_length[b] + [0] * (sample_size - pos_length[b]) label_list = np.array(label_list) ranked_args = np.argsort(score)[::-1] rank_list = label_list[ranked_args] for k in self.config['topk']: result_dict[f"ndcg@{k}"] += NDCG(rank_list, label_list, k) result_dict[f"mrr@{k}"] += MRR(rank_list, k) result_dict[f"hr@{k}"] += HR(rank_list, label_list, k) ######count the exposures for the computing fairness degree############# ids = ranked_args[:k] rank_items = np.array(real_item_ids)[ids] for i, iid in enumerate(rank_items): group_id = self.iid2pid[iid] if self.config['fairness_type'] == "Exposure": exposure_dict[f"top@{k}"][group_id] += 1 else: exposure_dict[f"top@{k}"][group_id] += np.round(score[ids[i]], self.config['decimals']) #exposure_dict[f"top@{k}"][group_id] += 1 for k in self.config['topk']: #print(exposure_dict[f"top@{k}"]) result_dict[f"mmf@{k}"] = MMF(exposure_dict[f"top@{k}"], ratio=self.config['mmf_eval_ratio']) * index result_dict[f"gini@{k}"] = Gini(exposure_dict[f"top@{k}"]) * index result_dict[f"entropy@{k}"] = Entropy(exposure_dict[f"top@{k}"]) * index result_dict[f"maxminratio@{k}"] = MinMaxRatio(exposure_dict[f"top@{k}"]) * index for key in result_dict.keys(): result_dict[key] = np.round(result_dict[key]/index, self.config['decimals']) if store_scores == False: return result_dict else: return result_dict, coo_matrix((data, (row, col)), shape=(index, self.config['item_num']))
[docs] class LLM_Evaluator(Abstract_Evaluator): def __init__(self, config): super().__init__(config=config) self.topk_list = config['topk']
[docs] def get_data(self, data): """ This method processes the input data to extract prediction lists, label lists, and score lists for each user. :param 'predict_list': A list of predicted items. :param 'positive_items': A list of items that are considered positive (e.g., liked or preferred by the user). :param 'scores': A list of scores corresponding to the predicted items, indicating the confidence of the prediction. :return: - `predict_lists`: A list of predict lists for all users. - `label_lists`: For each user, a list of binary labels indicating whether each predicted item is positive (1) or not (0). - `score_lists`: A list of score lists corresponding to the predicted items for all users. """ # ground_truths = [i['positive_items'] for i in data] # sens_feat = [i['sensitiveAttribute'] for i in data] label_lists = [] # ranking_lists = [] score_lists = [] predict_lists = [] for user in data: p = user['predict_list'] predict_lists.append(p) label_list = [1 if m in user['positive_items'] else 0 for m in p] # label_list = [1 if m in user['positive_items'] else 0 for m in user['item_candidates']] score = user['scores'] score_lists.append(score) label_lists.append(label_list) # ranking_lists.append(ranking_list) return predict_lists, label_lists, score_lists
[docs] def get_cates_value(self, iid2pid, predict, topk): """ Get the category values based on predicted indices and their corresponding categories. This method processes the predicted indices along with their mapping to category IDs and returns a list of counts for each category, representing the frequency of occurrence in the top-k predictions. :param iid2pid : dict A dictionary mapping item indices (int) to their respective category IDs (int). If an item index is not found in the dictionary, it defaults to -1. :param predict : List[List[int]] A 2D list where each sublist contains the predicted indices (top-k predictions) for corresponding input data points. :param topk : int The number of top predictions considered for each data point. This determines how many elements from the beginning of each sublist in `predict` are processed. :return: List[int] A list of integers where each value corresponds to the count of occurrences for a specific category across all top-k predictions. The order of these counts matches the sorted order of category IDs as returned by `get_categories(iid2pid)`. """ cates_name = self.get_categories(iid2pid) predict = [i[:topk] for i in predict] from collections import defaultdict cates_count = defaultdict(int) for p in predict: for prediction in p: c = iid2pid.get(prediction, -1) cates_count[c] += 1 # not score-based scores[idx][k] values = [cates_count[i] for i in cates_name] return values
[docs] def cal_acc_score(self, label_lists, score_lists, topk): """ Calculate accuracy scores for recommendation system evaluation. This method computes the average NDCG (Normalized Discounted Cumulative Gain), HR (Hit Ratio), and MRR (Mean Reciprocal Rank) at a specified `topk` cutoff for a list of ground-truth labels and corresponding prediction scores. :param label_lists : List[List[int]] A list of lists containing ground-truth labels. Each sublist represents the relevant items for a user or query. :param score_lists : List[List[float]] A list of lists containing predicted scores. Each sublist corresponds to the relevance scores for items matching the order in `label_lists`. :param topk : int The number of top predictions to consider when calculating the metrics. :return: Dict[str, float] A dictionary containing the average NDCG, HR, and MRR scores at the given `topk`, with keys formatted as 'NDCG@{topk}', 'HR@{topk}', and 'MRR@{topk}' respectively. Scores are rounded to 4 decimal places. """ score = {} ndcgs = [] hrs = [] mrrs = [] for lab, sco in zip(label_lists, score_lists): ndcg = NDCG(lab, lab, topk) hr = HR(lab, lab, topk) mrr = MRR(lab, topk) ndcgs.append(ndcg) hrs.append(hr) mrrs.append(mrr) # compute metrics score[f'NDCG@{topk}'] = np.round(np.mean(ndcgs), 4) score[f'HR@{topk}'] = np.round(np.mean(hrs), 4) score[f'MRR@{topk}'] = np.round(np.mean(mrrs), 4) return score
[docs] def get_categories(self, iid2pid): return list(set(iid2pid.values()))
[docs] def cal_fair_score(self, iid2pid, predict, topk): """ Calculate fairness scores for a recommendation system's evaluation. This method computes various fairness metrics at a specified top-k cutoff to evaluate the diversity and inclusiveness of the predicted items. It utilizes different fairness measures like MMF (Max-Min Fairness), Gini coefficient, Min-Max Ratio, and Entropy to quantify the balance across different categories or groups within the predictions. :param iid2pid (Dict[int, int]): A mapping where keys are item IDs and values are their respective group/category IDs. :param predict (List[Tuple[int, float]]): A list of tuples, each containing an item ID and its predicted score/score. :param topk (int): The top-k count used to consider the highest scored items for fairness evaluation. :return: A dictionary with keys as the metric names prefixed with the top-k cutoff (e.g., 'MMF@5') and values as the corresponding calculated scores, rounded to 4 decimal places. """ # score = {} cates_value = self.get_cates_value(iid2pid, predict, topk) # print(cates_value) mmf = MMF(cates_value) cate_gini = Gini(cates_value) maxmin_ratio = MinMaxRatio(cates_value) # cv = (cates_value) entropy = Entropy(cates_value) score[f'MMF@{topk}'] = np.round(mmf, 4) score[f'Gini@{topk}'] = np.round(cate_gini, 4) score[f'MMR@{topk}'] = np.round(maxmin_ratio, 4) # score[f'cv@{topk}'] = np.round(cv, 4) score[f'Entropy@{topk}'] = np.round(entropy, 4) return score
[docs] def llm_eval(self, grounding_result, iid2pid): """ Evaluate the performance of a language model based on grounding results and item-pid mappings. This method assesses the accuracy and fairness of the model's predictions at different top-K thresholds. It computes both accuracy scores and fairness scores,汇总 these into a comprehensive evaluation result. :param grounding_result (Dict[str, Any]): The output from the model grounding process, containing necessary information for evaluation. :param iid2pid (Dict[str, str]): A mapping from item IDs to product IDs, used in calculating fairness metrics. :return: - eval_result (Dict[str, float]): A dictionary summarizing the evaluation outcomes, including accuracy and fairness scores for each specified top-K value. """ predict_lists, label_lists, score_lists = self.get_data(grounding_result) eval_result = {} for topk in self.topk_list: acc_score = self.cal_acc_score(label_lists, score_lists, topk) fair_score = self.cal_fair_score(iid2pid, predict_lists, topk) eval_result.update(acc_score) eval_result.update(fair_score) # eval_result.update({f'Top{topk}': acc_score}) print(f'Evaluate_result:{eval_result}') return eval_result