Source code for fairdiverse.search.utils.div_type

import os
import pickle
import torch
import pandas as pd
import numpy as np
from tqdm import tqdm

pd.set_option('display.max_rows', 2000)


[docs] class subtopic: def __init__(self, subtopic_id, subtopic): """ Represents a subtopic of a query. :param subtopic_id: Unique identifier for the subtopic. :param subtopic: Text representation of the subtopic. """ self.subtopic_id = subtopic_id self.subtopic = subtopic
[docs] class div_query: def __init__(self, qid, query, subtopic_id_list, subtopic_list): """ Represents a diversity query for re-ranking search results. :param qid: Unique query identifier. :param query: Text of the query. :param subtopic_id_list: List of subtopic IDs associated with the query. :param subtopic_list: List of subtopic texts corresponding to the subtopic IDs. """ self.qid = qid self.query = query self.subtopic_id_list = subtopic_id_list self.subtopic_list = [] self.doc_list = [] self.doc_score_list = [] self.best_metric = 0 self.stand_alpha_DCG = 0 for index in range(len(subtopic_id_list)): t = subtopic(subtopic_id_list[index], subtopic_list[index]) self.subtopic_list.append(t)
[docs] def set_std_metric(self, m): """ Sets the standard alpha-DCG metric for normalization. :param m: Standard alpha-DCG metric value. """ self.stand_alpha_DCG = m
[docs] def add_docs(self, doc_list): """ Adds a list of documents to the query and initializes subtopic relevance tracking. :param doc_list: List of document identifiers. """ self.doc_list = doc_list self.DOC_NUM = len(self.doc_list) init_data = np.zeros((len(doc_list), len(self.subtopic_list)), dtype=int) self.subtopic_df = pd.DataFrame(init_data, columns=self.subtopic_id_list, index=doc_list)
[docs] def add_query_suggestion(self, query_suggestion): """ Adds query suggestions related to the main query. :param query_suggestion: Suggested query string. """ self.query_suggestion = query_suggestion
[docs] def add_docs_rel_score(self, doc_score_list): """ Adds relevance scores for the documents associated with the query. :param doc_score_list: List of relevance scores for documents. """ self.doc_score_list = doc_score_list
[docs] def get_test_alpha_nDCG(self, docs_rank): """ Get the alpha_nDCG@20 for the input document list (for testing). :param docs_rank: Ordered list of document identifiers. :return: Alpha-nDCG score for the given ranking. """ temp_data = np.zeros((len(docs_rank), len(self.subtopic_list)), dtype=int) temp_array = np.array(self.best_subtopic_df) metrics = [] p = 0.5 real_num = min(20, len(docs_rank)) best_docs_index = [] for index in range(real_num): result_index = self.best_docs_rank.index(docs_rank[index]) best_docs_index.append(result_index) temp_data[index, :] = temp_array[result_index, :] if index == 0: score = np.sum(temp_data[index, :]) metrics.append(score) else: r_ik = np.array([np.sum(temp_data[:index, s]) for s in range(temp_data.shape[1])], dtype=np.int64) t = np.power(p, r_ik) score = np.dot(temp_data[index, :], t) / np.log2(2 + index) metrics.append(score) ''' normalized by the stand alpha DCG ''' if hasattr(self, 'stand_alpha_DCG') and self.stand_alpha_DCG > 0: try: alpha_nDCG = np.sum(metrics) / self.stand_alpha_DCG except: print('except np.sum =', np.sum(metrics), 'self.global_best_metric = ', self.global_best_metric) else: print('error! qid =', self.qid) alpha_nDCG = 0 return alpha_nDCG
[docs] def get_alpha_DCG(self, docs_rank, print_flag=False): """ Computes the alpha-DCG for the input document list (for generating training samples) :param docs_rank: A list of document IDs representing the ranking order. :param print_flag: A boolean flag indicating whether to print intermediate computation results. :return: The computed alpha-DCG score for the given document ranking. """ temp_data = np.zeros((len(docs_rank), len(self.subtopic_list)), dtype=int) temp_array = np.array(self.best_subtopic_df) metrics = [] p = 0.5 for index in range(len(docs_rank)): result_index = self.best_docs_rank.index(docs_rank[index]) temp_data[index, :] = temp_array[result_index, :] if index == 0: score = np.sum(temp_data[index, :]) metrics.append(score) else: r_ik = np.array([np.sum(temp_data[:index, s]) for s in range(temp_data.shape[1])], dtype=np.int64) t = np.power(p, r_ik) score = np.dot(temp_data[index, :], t) / np.log2(2 + index) metrics.append(score) if print_flag: print('self.best_gain = ', self.best_gain, 'sum(best_gain) = ', np.sum(self.best_gain), 'best_metric = ', self.best_metric) print('test metrics = ', metrics, 'sum(metrics) = ', np.sum(metrics)) '''get the total gain for the input document list''' alpha_nDCG = np.sum(metrics) return alpha_nDCG
[docs] def get_best_rank(self, top_n=None, alpha=0.5): """ Generates the best document ranking using a greedy selection strategy. :param top_n: The number of top documents to be selected (default: all available documents). :param alpha: A parameter controlling redundancy reduction (default: 0.5). :return: Updates class attributes with the best document ranking and associated gains. """ p = 1.0 - alpha if top_n == None: top_n = self.DOC_NUM real_num = int(min(top_n, self.DOC_NUM)) temp_data = np.zeros((real_num, len(self.subtopic_list)), dtype=int) temp_array = np.array(self.subtopic_df) best_docs_rank = [] best_docs_rank_rel_score = [] best_gain = [] ''' greedy document selection ''' for step in range(real_num): scores = [] if step == 0: for index in range(real_num): temp_score = np.sum(temp_array[index, :]) scores.append(temp_score) result_index = np.argsort(scores)[-1] gain = scores[result_index] docid = self.doc_list[result_index] doc_rel_score = self.doc_score_list[result_index] best_docs_rank.append(docid) best_docs_rank_rel_score.append(doc_rel_score) best_gain.append(scores[result_index]) temp_data[0, :] = temp_array[result_index, :] else: for index in range(real_num): if self.doc_list[index] not in best_docs_rank: r_ik = np.array([np.sum(temp_data[:step, s]) for s in range(temp_array.shape[1])], dtype=np.int64) t = np.power(p, r_ik) temp_score = np.dot(temp_array[index, :], t) scores.append(temp_score) else: scores.append(-1.0) result_index = np.argsort(scores)[-1] gain = scores[result_index] docid = self.doc_list[result_index] doc_rel_score = self.doc_score_list[result_index] if docid not in best_docs_rank: best_docs_rank.append(docid) best_docs_rank_rel_score.append(doc_rel_score) else: print('document already added!') best_gain.append(scores[result_index] / np.log2(2 + step)) temp_data[step, :] = temp_array[result_index, :] self.best_docs_rank = best_docs_rank self.best_docs_rank_rel_score = best_docs_rank_rel_score self.best_gain = best_gain self.best_subtopic_df = pd.DataFrame(temp_data, columns=self.subtopic_id_list, index=self.best_docs_rank) self.best_metric = np.sum(self.best_gain)
[docs] class div_dataset: def __init__(self, config): """ Initializes the dataset object with file paths and configuration. :param config: A dictionary containing configuration settings. """ self.Best_File = os.path.join(config['data_dir'], 'div_query.data') self.Train_File = os.path.join(config['data_dir'], config['model'], 'listpair_train.data') if not os.path.exists(os.path.join(config['data_dir'], config['model'])): os.makedirs(os.path.join(config['data_dir'], config['model'])) self.config = config
[docs] def get_listpairs(self, div_query, context, top_n): """ Generates list-pair samples :param div_query: The query object that contains the list of ranked documents. :param context: A list of previously considered documents in the context. :param top_n: The number of top-ranked documents to consider. :return: A list of generated samples, each containing metrics, positive/negative masks, and weights. """ best_rank = div_query.best_docs_rank metrics = [] samples = [] for index in range(len(best_rank)): if best_rank[index] not in context: metric = div_query.get_alpha_DCG(context + [best_rank[index]]) else: metric = -1.0 metrics.append(metric) ''' padding the metrics ''' if len(metrics) < top_n: metrics.extend([0] * (top_n - len(metrics))) total_count = 0 for i in range(len(best_rank)): ''' set a limit to the total sample number ''' if total_count > 20: break count = 0 for j in range(i + 1, len(best_rank)): ''' set a limit to sample number on the same context''' if count > 5: break if metrics[i] < 0 or metrics[j] < 0 or metrics[i] == metrics[j]: pass elif metrics[i] > metrics[j]: count += 1 total_count += 1 positive_mask = torch.zeros(top_n) negative_mask = torch.zeros(top_n) weight = metrics[i] - metrics[j] positive_mask[i] = 1 negative_mask[j] = 1 samples.append((metrics, positive_mask, negative_mask, weight)) elif metrics[i] < metrics[j]: count += 1 total_count += 1 positive_mask = torch.zeros(top_n) negative_mask = torch.zeros(top_n) weight = metrics[j] - metrics[i] positive_mask[j] = 1 negative_mask[i] = 1 samples.append((metrics, positive_mask, negative_mask, weight)) return samples
[docs] def get_listpair_train_data(self, top_n=50): """ Generates list-pair training samples using the top N relevant documents. This function processes the best document ranks for each query, generates list-pair samples, and saves them to a file: listpair_train.data. data_dict[qid] = [(metrics, positive_mask, negative_mask, weight),...] metrics, positive_mask and negative_mask are padding as tensors with length of top_n :param top_n: The number of top-ranked documents to use for generating the list-pairs. :return: Saves the generated list-pair training data into a file. """ qd = pickle.load(open(self.Best_File, 'rb')) train_dict = {} for qid in tqdm(qd, desc="Gen Train Data"): temp_q = qd[qid] result_list = [] real_num = int(min(top_n, temp_q.DOC_NUM)) for i in range(real_num): listpair_data = self.get_listpairs(temp_q, temp_q.best_docs_rank[:i], top_n) if len(listpair_data) > 0: result_list.extend(listpair_data) train_dict[str(qid)] = result_list pickle.dump(train_dict, open(self.Train_File, 'wb'), True)