-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathai_solver.py
More file actions
161 lines (129 loc) · 5.87 KB
/
Copy pathai_solver.py
File metadata and controls
161 lines (129 loc) · 5.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import numpy as np
import json
import os
import random
from sklearn.decomposition import PCA
class AISolver:
def __init__(self, vocab_path="vocab.json", embeddings_path="embeddings.bin"):
self.vocab = []
self.embeddings = None
self.word_to_idx = {}
self.rank_to_sim = None
self.pca_3d = None
self.projected_embeddings = None
self.load_model(vocab_path, embeddings_path)
self.load_training_data()
self.reset_session()
def load_model(self, vocab_path, embeddings_path):
print("Loading AI model...")
with open(vocab_path) as f:
self.vocab = json.load(f)
raw = np.fromfile(embeddings_path, dtype=np.float32)
dim = raw.shape[0] // len(self.vocab)
self.embeddings = raw.reshape(len(self.vocab), dim)
# Normalize
norms = np.linalg.norm(self.embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1.0
self.embeddings /= norms
self.word_to_idx = {w: i for i, w in enumerate(self.vocab)}
# Compute PCA for visualization (3 components) once
# This might be slow for 40k words? PCA on 40k x 100 is fast enough (~1s).
print("Computing 3D projection for visualization...")
self.pca_3d = PCA(n_components=3)
self.projected_embeddings = self.pca_3d.fit_transform(self.embeddings)
print("Model loaded.")
def load_training_data(self):
# Load or initialize calibration curve
if os.path.exists("ai_calibration.json"):
with open("ai_calibration.json") as f:
data = json.load(f)
self.rank_to_sim = np.array(data["rank_to_sim"])
print("Loaded existing calibration data.")
else:
print("Calibrating new model...")
self.calibrate()
def save_training_data(self):
with open("ai_calibration.json", "w") as f:
json.dump({
"rank_to_sim": self.rank_to_sim.tolist()
}, f)
print("Calibration data saved.")
def calibrate(self):
sample_size = 100
vocab_size = len(self.vocab)
sample_idxs = np.random.choice(vocab_size, sample_size, replace=False)
curve_accum = np.zeros(vocab_size)
for idx in sample_idxs:
sims = self.embeddings @ self.embeddings[idx]
sims_sorted = np.sort(sims)[::-1]
curve_accum += sims_sorted
self.rank_to_sim = curve_accum / sample_size
self.save_training_data()
def reset_session(self):
self.candidate_scores = np.zeros(len(self.vocab))
self.guessed_indices = set()
self.history = [] # (word, rank, idx)
self.first_guess = True
def update_calibration(self, target_idx):
# After a game, we can use the "real" target interactions to refine our curve
# Simply weighted average slightly towards the new data
real_sims = self.embeddings @ self.embeddings[target_idx]
real_sims_sorted = np.sort(real_sims)[::-1]
# Learning rate
alpha = 0.1
self.rank_to_sim = (1 - alpha) * self.rank_to_sim + alpha * real_sims_sorted
self.save_training_data()
def get_next_guess(self):
if self.first_guess:
self.first_guess = False
guess_idx = random.randint(0, len(self.vocab) - 1)
return guess_idx, "Initial random exploration."
# Select best score
mask = np.ones(len(self.vocab), dtype=bool)
if self.guessed_indices:
mask[list(self.guessed_indices)] = False
valid_indices = np.where(mask)[0]
# Add some randomness to top scores to avoid getting stuck?
# For now argmax is fine as score is deterministic based on history.
best_local_idx = np.argmax(self.candidate_scores[mask])
guess_idx = valid_indices[best_local_idx]
reasoning = self._generate_reasoning(guess_idx)
return guess_idx, reasoning
def _generate_reasoning(self, guess_idx):
if not self.history:
return "First strategic guess."
sorted_history = sorted(self.history, key=lambda x: x[1])
best_support = None
min_diff = 1e9
for word, rank, idx in sorted_history[:3]:
# What similarity did we EXPECT based on rank?
expected_sim = self.rank_to_sim[rank-1]
# What similarity do we HAVE between candidate and this history word?
actual_sim = float(self.embeddings[guess_idx] @ self.embeddings[idx])
diff = abs(expected_sim - actual_sim)
if diff < min_diff:
min_diff = diff
best_support = (word, rank, actual_sim, expected_sim)
if best_support:
ref_word, ref_rank, act, exp = best_support
return f"Fits constraint from '{ref_word}' (#{ref_rank}). (Sim: {act:.2f} vs Exp: {exp:.2f})"
return "Fits the intersection of previous clues."
def process_result(self, guess_idx, rank):
self.guessed_indices.add(guess_idx)
self.candidate_scores[guess_idx] = -1e9 # Eliminate
word = self.vocab[guess_idx]
self.history.append((word, rank, guess_idx))
if rank == 1:
self.update_calibration(guess_idx)
return
# Update scores
target_sim = self.rank_to_sim[rank - 1]
current_sims = self.embeddings @ self.embeddings[guess_idx]
delta = current_sims - target_sim
# Update function: Score -= error^2
self.candidate_scores -= (delta ** 2) * 10.0
def get_3d_coordinates(self, indices):
# Return list of [x, y, z] for given indices
return self.projected_embeddings[indices].tolist()
def get_vocab_word(self, idx):
return self.vocab[idx]