-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdiversify.py
More file actions
141 lines (114 loc) · 5.23 KB
/
diversify.py
File metadata and controls
141 lines (114 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import math
import json
import copy
from tqdm import tqdm
import multiprocessing as mp
import random
from typing import List, Dict, Any
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from .clustering import kmeans_clustering
from .utils import sorted_highest_score
class DiversifySelector:
"""Diversified selector based on UCB; only penalty reward is supported."""
def __init__(self, alpha: float = 1.0, cluster_size: int = 5):
self.alpha = alpha
self.cluster_size = cluster_size
self.reward_method = 'penalty' # fixed to penalty
# State variables
self.cluster_score = {i: 0 for i in range(0, cluster_size)}
self.cluster_chose = {i: 0 for i in range(0, cluster_size)}
self.cluster_ucb = {i: 0 for i in range(0, cluster_size)}
self.cluster_sample = {i: 0 for i in range(0, cluster_size)}
self.sum_chose = 0
self.top_k_context = []
self.clusters = {}
self.selected_contexts = []
def update_cluster_reward(self, selected_cluster: int) -> float:
"""Update cluster reward using the penalty method."""
labels = self.clusters[selected_cluster]['labels']
if len(labels) == 0:
self.cluster_score[selected_cluster] = -9999999
self.cluster_ucb[selected_cluster] = -9999999
return -9999999
# Penalty reward: relevance to query minus redundancy to selected
index = labels[0]
doc_embedding = np.array(self.top_k_context[index][0]["data"][0]["embedding"]).reshape(1, -1)
sim_to_query = self.top_k_context[index][1]
penalty_beta = 0.2
# No selected contexts yet → no penalty
if not self.selected_contexts:
reward = sim_to_query
else:
# Max similarity to already selected contexts (encourage diversity)
selected_embeddings = np.array([
self.top_k_context[i][0]["data"][0]["embedding"] for i in self.selected_contexts
])
sims = cosine_similarity(doc_embedding, selected_embeddings)[0]
max_sim_to_selected = np.max(sims)
reward = sim_to_query - penalty_beta * max_sim_to_selected
self.cluster_score[selected_cluster] = reward
return reward
def initial_reward_and_ucb(self):
"""Initialize rewards and UCB scores for all clusters."""
for i in range(len(self.clusters)):
reward = self.update_cluster_reward(i)
self.cluster_ucb[i] = reward
def step1(self) -> int:
"""Pick the cluster with the highest UCB that still has candidates."""
sorted_keys = sorted_highest_score(self.cluster_ucb)
selected_cluster = 0
for i in sorted_keys:
if len(self.clusters[i]['labels']) == 0:
self.cluster_score[i] = -9999999
self.cluster_ucb[i] = -9999999
continue
else:
selected_cluster = i
break
selected_context = self.clusters[selected_cluster]['labels'][0]
self.selected_contexts.append(selected_context)
self.sum_chose += 1
return selected_cluster
def step2(self, selected_cluster: int) -> float:
"""Pop the used candidate from the selected cluster and refresh reward."""
self.clusters[selected_cluster]['labels'].pop(0)
return self.update_cluster_reward(selected_cluster)
def step3(self, selected_cluster: int):
"""Update UCB scores after a selection."""
self.cluster_chose[selected_cluster] += 1
self.cluster_sample[selected_cluster] += 1
for key in self.cluster_ucb.keys():
ucb = self.alpha * math.sqrt(
2 * math.log(float(self.sum_chose)) / float(self.cluster_chose[key] + 1)
)
self.cluster_ucb[key] = self.cluster_score[key] + ucb
def context_selection(self, contexts: List[Dict], cluster_method: str = "kmeans") -> List[Dict]:
"""Run the diversified selection over contexts using K-Means clustering."""
# Cluster with K-Means
self.clusters, sil_score = kmeans_clustering(contexts, num_clusters=self.cluster_size)
if self.clusters is None:
return contexts
# Reset state
self.cluster_size = len(self.clusters)
iteration = len(contexts)
self.sum_chose = 0
self.top_k_context = contexts
self.selected_contexts = []
self.cluster_score = {i: 0 for i in range(0, self.cluster_size)}
self.cluster_chose = {i: 0 for i in range(0, self.cluster_size)}
self.cluster_ucb = {i: 0 for i in range(0, self.cluster_size)}
self.cluster_sample = {i: 0 for i in range(0, self.cluster_size)}
# Initialize rewards/UCB
self.initial_reward_and_ucb()
# Iteratively select
for k in range(iteration):
selected_cluster = self.step1()
self.step2(selected_cluster)
self.step3(selected_cluster)
# Build reranked list (reverse selection order)
reranked_contexts = []
for i in self.selected_contexts:
reranked_contexts.append(contexts[i])
reranked_contexts.reverse()
return reranked_contexts