Skip to content

Commit ffe21e2

Browse files
committed
Add advanced heuristic tiers and bootstrap new ladder bots
1 parent d05f526 commit ffe21e2

17 files changed

Lines changed: 673 additions & 83 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import sys
5+
from dataclasses import dataclass
6+
from pathlib import Path
7+
8+
from sqlmodel import col, select
9+
10+
# Permite ejecutar el script desde la raiz del repo.
11+
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
12+
13+
14+
@dataclass(frozen=True)
15+
class HeuristicBotSpec:
16+
username: str
17+
heuristic_level: str
18+
notes: str
19+
20+
21+
BOT_SPECS: tuple[HeuristicBotSpec, ...] = (
22+
HeuristicBotSpec(
23+
username="ub_apexcore_v1",
24+
heuristic_level="apex",
25+
notes="Heuristica top-tier orientada a solidez tactica.",
26+
),
27+
HeuristicBotSpec(
28+
username="ub_gambitshade_v1",
29+
heuristic_level="gambit",
30+
notes="Heuristica exotica agresiva con presion lateral.",
31+
),
32+
HeuristicBotSpec(
33+
username="ub_sentinelloom_v1",
34+
heuristic_level="sentinel",
35+
notes="Heuristica exotica posicional con control de movilidad.",
36+
),
37+
)
38+
39+
40+
async def _ensure_heuristic_bots() -> None:
41+
from agents.heuristic import DEFAULT_HEURISTIC_LEVEL, is_supported_heuristic_level
42+
from api.db.enums import AgentType, BotKind
43+
from api.db.models import BotProfile, User
44+
from api.db.session import get_engine, get_sessionmaker
45+
from api.modules.ranking.repository import RankingRepository
46+
from api.modules.ranking.service import RankingService
47+
48+
invalid = [spec.heuristic_level for spec in BOT_SPECS if not is_supported_heuristic_level(spec.heuristic_level)]
49+
if invalid:
50+
raise RuntimeError(f"Unsupported levels in BOT_SPECS: {invalid}")
51+
52+
sessionmaker = get_sessionmaker()
53+
async with sessionmaker() as session:
54+
ranking_service = RankingService(ranking_repository=RankingRepository(session=session))
55+
season = await ranking_service.get_active_season()
56+
if season is None:
57+
raise RuntimeError("No active season found. Run scripts/bootstrap_active_season.py first.")
58+
59+
created_or_updated: list[User] = []
60+
for spec in BOT_SPECS:
61+
user_stmt = select(User).where(col(User.username) == spec.username)
62+
user = (await session.execute(user_stmt)).scalars().first()
63+
if user is None:
64+
user = User(
65+
username=spec.username,
66+
email=f"{spec.username}@bots.local",
67+
is_active=True,
68+
is_admin=False,
69+
is_bot=True,
70+
bot_kind=BotKind.HEURISTIC,
71+
is_hidden_bot=False,
72+
model_version_id=None,
73+
)
74+
session.add(user)
75+
await session.commit()
76+
await session.refresh(user)
77+
78+
user.is_active = True
79+
user.is_bot = True
80+
user.bot_kind = BotKind.HEURISTIC
81+
user.is_hidden_bot = False
82+
session.add(user)
83+
await session.commit()
84+
await session.refresh(user)
85+
86+
profile_stmt = select(BotProfile).where(col(BotProfile.user_id) == user.id)
87+
profile = (await session.execute(profile_stmt)).scalars().first()
88+
if profile is None:
89+
profile = BotProfile(
90+
user_id=user.id,
91+
agent_type=AgentType.HEURISTIC,
92+
heuristic_level=DEFAULT_HEURISTIC_LEVEL,
93+
model_mode=None,
94+
enabled=True,
95+
)
96+
97+
profile.agent_type = AgentType.HEURISTIC
98+
profile.heuristic_level = spec.heuristic_level
99+
profile.model_mode = None
100+
profile.enabled = True
101+
session.add(profile)
102+
await session.commit()
103+
await session.refresh(profile)
104+
105+
await ranking_service.get_or_create_rating(user.id, season.id)
106+
created_or_updated.append(user)
107+
108+
leaderboard = await ranking_service.recompute_leaderboard(season_id=season.id, limit=500)
109+
rank_by_user_id = {entry.user_id: entry.rank for entry in leaderboard}
110+
111+
print("Heuristic bots ready:")
112+
print(f" season_id={season.id}")
113+
for spec in BOT_SPECS:
114+
user = next((item for item in created_or_updated if item.username == spec.username), None)
115+
if user is None:
116+
continue
117+
rating = await ranking_service.get_or_create_rating(user.id, season.id)
118+
rank = rank_by_user_id.get(user.id)
119+
print(
120+
" "
121+
f"username={user.username} "
122+
f"level={spec.heuristic_level} "
123+
f"rating={rating.rating:.1f} "
124+
f"rank={rank}"
125+
)
126+
127+
await get_engine().dispose()
128+
129+
130+
def main() -> None:
131+
asyncio.run(_ensure_heuristic_bots())
132+
133+
134+
if __name__ == "__main__":
135+
main()

src/agents/heuristic.py

Lines changed: 192 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,231 @@
55
from game.board import AtaxxBoard
66
from game.types import Move
77

8+
HEURISTIC_LEVELS: tuple[str, ...] = (
9+
"easy",
10+
"normal",
11+
"hard",
12+
"apex",
13+
"gambit",
14+
"sentinel",
15+
)
16+
HEURISTIC_LEVEL_SET = frozenset(HEURISTIC_LEVELS)
17+
DEFAULT_HEURISTIC_LEVEL = "normal"
818

9-
def _score_move(state: AtaxxBoard, move: Move) -> float:
19+
20+
def is_supported_heuristic_level(level: str) -> bool:
21+
return level in HEURISTIC_LEVEL_SET
22+
23+
24+
def heuristic_mode_from_level(level: str) -> str:
25+
if not is_supported_heuristic_level(level):
26+
raise ValueError(f"Unsupported heuristic level: {level}")
27+
return f"heuristic_{level}"
28+
29+
30+
def _chebyshev_distance(move: Move) -> int:
1031
r1, c1, r2, c2 = move
32+
return max(abs(r1 - r2), abs(c1 - c2))
33+
34+
35+
def _count_targets_in_radius(
36+
board: AtaxxBoard,
37+
*,
38+
row: int,
39+
col: int,
40+
target: int,
41+
radius: int,
42+
) -> int:
43+
board_size = board.grid.shape[0]
44+
r_min = max(0, row - radius)
45+
r_max = min(board_size, row + radius + 1)
46+
c_min = max(0, col - radius)
47+
c_max = min(board_size, col + radius + 1)
48+
window = board.grid[r_min:r_max, c_min:c_max]
49+
return int(np.sum(window == target))
50+
51+
52+
def _mobility_advantage(after_move: AtaxxBoard) -> float:
53+
opponent_moves = len(after_move.get_valid_moves(player=after_move.current_player))
54+
own_moves = len(after_move.get_valid_moves(player=-after_move.current_player))
55+
return float(own_moves - opponent_moves)
56+
57+
58+
def _score_move(state: AtaxxBoard, move: Move) -> float:
59+
_, _, r2, c2 = move
1160
me = state.current_player
1261
before_me = int(np.sum(state.grid == me))
1362
before_opp = int(np.sum(state.grid == -me))
1463
scratch = state.copy()
1564
scratch.step(move)
1665
after_me = int(np.sum(scratch.grid == me))
1766
after_opp = int(np.sum(scratch.grid == -me))
18-
clone_bonus = 0.15 if max(abs(r1 - r2), abs(c1 - c2)) == 1 else 0.0
67+
clone_bonus = 0.15 if _chebyshev_distance(move) == 1 else 0.0
1968
center_bonus = 0.05 * (3 - abs(r2 - 3) + 3 - abs(c2 - 3))
2069
return float((after_me - before_me) + (before_opp - after_opp)) + clone_bonus + center_bonus
2170

2271

72+
def _best_reply_penalty(after_move: AtaxxBoard) -> float:
73+
opp_moves = after_move.get_valid_moves()
74+
if len(opp_moves) == 0:
75+
return -2.0
76+
return float(max(_score_move(after_move, opp_move) for opp_move in opp_moves))
77+
78+
79+
def _softmax_choice(
80+
rng: np.random.Generator,
81+
scored_moves: list[tuple[Move, float]],
82+
*,
83+
temperature: float,
84+
) -> Move:
85+
scores = np.asarray([score for _, score in scored_moves], dtype=np.float32)
86+
logits = (scores - float(np.max(scores))) / temperature
87+
probs = np.exp(logits)
88+
probs = probs / float(np.sum(probs))
89+
pick_idx = int(rng.choice(len(scored_moves), p=probs))
90+
return scored_moves[pick_idx][0]
91+
92+
93+
def _score_apex(board: AtaxxBoard, move: Move) -> float:
94+
base = _score_move(board, move)
95+
after = board.copy()
96+
after.step(move)
97+
opp_moves = after.get_valid_moves()
98+
mobility = _mobility_advantage(after)
99+
if len(opp_moves) == 0:
100+
return base + 3.0 + 0.2 * mobility
101+
102+
# Two-ply selective lookahead: punish lines where opponent can spike value
103+
# and we fail to recover with a strong counter on the next turn.
104+
opp_candidates = sorted(
105+
opp_moves,
106+
key=lambda opp_move: _score_move(after, opp_move),
107+
reverse=True,
108+
)[:3]
109+
worst_line = float("-inf")
110+
for opp_move in opp_candidates:
111+
reply_board = after.copy()
112+
reply_board.step(opp_move)
113+
reply_moves = reply_board.get_valid_moves()
114+
reply_best = (
115+
max(_score_move(reply_board, reply_move) for reply_move in reply_moves)
116+
if len(reply_moves) > 0
117+
else -2.5
118+
)
119+
line_value = _score_move(after, opp_move) - 0.55 * float(reply_best)
120+
worst_line = max(worst_line, float(line_value))
121+
122+
return base - 0.92 * worst_line + 0.2 * mobility
123+
124+
125+
def _score_gambit(board: AtaxxBoard, move: Move) -> float:
126+
_, _, r2, c2 = move
127+
base = _score_move(board, move)
128+
after = board.copy()
129+
after.step(move)
130+
enemy = after.current_player
131+
frontier_risk = _count_targets_in_radius(
132+
after,
133+
row=r2,
134+
col=c2,
135+
target=enemy,
136+
radius=1,
137+
)
138+
pressure_ring = _count_targets_in_radius(
139+
after,
140+
row=r2,
141+
col=c2,
142+
target=enemy,
143+
radius=2,
144+
)
145+
jump_bonus = 0.55 if _chebyshev_distance(move) == 2 else -0.12
146+
flank_bonus = 0.35 if r2 in {0, 6} or c2 in {0, 6} else 0.0
147+
hard_guard = _best_reply_penalty(after)
148+
return (
149+
base
150+
- 0.58 * hard_guard
151+
+ 0.46 * float(pressure_ring)
152+
+ jump_bonus
153+
+ flank_bonus
154+
- 0.42 * float(frontier_risk)
155+
)
156+
157+
158+
def _score_sentinel(board: AtaxxBoard, move: Move) -> float:
159+
_, _, r2, c2 = move
160+
base = _score_move(board, move)
161+
after = board.copy()
162+
after.step(move)
163+
enemy = after.current_player
164+
own_piece = -enemy
165+
frontier_risk = _count_targets_in_radius(
166+
after,
167+
row=r2,
168+
col=c2,
169+
target=enemy,
170+
radius=1,
171+
)
172+
local_support = (
173+
_count_targets_in_radius(
174+
after,
175+
row=r2,
176+
col=c2,
177+
target=own_piece,
178+
radius=1,
179+
)
180+
- 1
181+
)
182+
mobility = _mobility_advantage(after)
183+
center_bonus = 0.18 * (3 - abs(r2 - 3) + 3 - abs(c2 - 3))
184+
clone_bias = 0.4 if _chebyshev_distance(move) == 1 else -0.06
185+
hard_guard = _best_reply_penalty(after)
186+
return (
187+
base
188+
- 0.56 * hard_guard
189+
+ 0.34 * mobility
190+
+ 0.36 * float(local_support)
191+
+ center_bonus
192+
+ clone_bias
193+
- 0.5 * float(frontier_risk)
194+
)
195+
196+
23197
def heuristic_move(
24198
board: AtaxxBoard,
25199
rng: np.random.Generator,
26-
level: str = "normal",
200+
level: str = DEFAULT_HEURISTIC_LEVEL,
27201
) -> Move | None:
202+
if not is_supported_heuristic_level(level):
203+
raise ValueError(f"Unsupported heuristic level: {level}")
204+
28205
valid_moves = board.get_valid_moves()
29206
if len(valid_moves) == 0:
30207
return None
31208

32209
if level == "easy":
33-
scores = np.asarray([_score_move(board, move) for move in valid_moves], dtype=np.float32)
34-
scores = scores - float(np.min(scores)) + 0.2
35-
probs = scores / float(np.sum(scores))
36-
return valid_moves[int(rng.choice(len(valid_moves), p=probs))]
210+
scored_moves = [(move, _score_move(board, move)) for move in valid_moves]
211+
# Easy should still punish obvious blunders while keeping variety.
212+
return _softmax_choice(rng, scored_moves, temperature=0.85)
37213

38214
scored_moves: list[tuple[Move, float]] = []
39215
for move in valid_moves:
40216
score = _score_move(board, move)
41217
if level == "hard":
42218
scratch = board.copy()
43219
scratch.step(move)
44-
opp_moves = scratch.get_valid_moves()
45-
if len(opp_moves) > 0:
46-
opp_best = max(_score_move(scratch, opp_move) for opp_move in opp_moves)
47-
score -= 0.65 * opp_best
220+
score -= 0.65 * _best_reply_penalty(scratch)
221+
score += 0.12 * _mobility_advantage(scratch)
222+
elif level == "apex":
223+
score = _score_apex(board, move)
224+
elif level == "gambit":
225+
score = _score_gambit(board, move)
226+
elif level == "sentinel":
227+
score = _score_sentinel(board, move)
48228
scored_moves.append((move, score))
49229

50230
if level == "normal":
51231
# Normal is deliberately non-greedy to avoid repetitive games.
52-
scores = np.asarray([score for _, score in scored_moves], dtype=np.float32)
53-
temperature = 0.35
54-
logits = (scores - float(np.max(scores))) / temperature
55-
probs = np.exp(logits)
56-
probs = probs / float(np.sum(probs))
57-
pick_idx = int(rng.choice(len(scored_moves), p=probs))
58-
return scored_moves[pick_idx][0]
232+
return _softmax_choice(rng, scored_moves, temperature=0.35)
59233

60234
best_score = max(score for _, score in scored_moves)
61235
best_moves = [move for move, score in scored_moves if score == best_score]

0 commit comments

Comments
 (0)