Skip to content

Commit bb8de98

Browse files
KhrulkovVclaude
andcommitted
fix(prompt_coevolution): fix 5 co-evolution coupling bugs (Amendment #4)
1. prompt_id mismatch: write side used sha256(UUID), read side sha256(text) — never matched. Both now use prompt_text_to_id(). 2. Dead archive: default fitness 0.01 for unknown prompts (was 0.0, preventing archive entry). 3. Champion-only selection: stochastic fitness-proportional sampling replaces single-best. 4. Timing mismatch: MainRunSyncHook blocks prompt engine until main run advances 1 gen. 5. pre_step_hook added to EvolutionEngine (optional, null by default). All prior run data discarded (X1/X2 ran 4/3 gens on fallback prompts, no co-evolution occurred). DBs 4-7 flushed. Full restart required. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 09b7eca commit bb8de98

9 files changed

Lines changed: 165 additions & 36 deletions

File tree

config/constants/evolution.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ num_parents: 2
88
mutation_mode: rewrite
99
max_generations: null
1010
strip_comments_and_docstrings: false
11+
pre_step_hook: null

config/evolution/default.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,4 @@ evolution_engine:
4040
config: ${engine_config}
4141
writer: ${writer}
4242
metrics_tracker: ${metrics_tracker}
43+
pre_step_hook: ${pre_step_hook}

config/pipeline/prompt_evolution.yaml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,16 @@
1414
main_redis_db: ???
1515
main_redis_prefix: ???
1616

17+
# Sync hook: blocks prompt run engine until main run advances by 1 gen
18+
pre_step_hook:
19+
_target_: gigaevo.prompts.coevolution.sync.MainRunSyncHook
20+
host: ${redis.host}
21+
port: ${redis.port}
22+
db: ${main_redis_db}
23+
prefix: ${main_redis_prefix}
24+
timeout: 600.0
25+
poll_interval: 5.0
26+
1727
prompt_stats_provider:
1828
_target_: gigaevo.prompts.coevolution.stats.RedisPromptStatsProvider
1929
host: ${redis.host}

gigaevo/evolution/engine/core.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
from collections.abc import Awaitable, Callable
45
import contextlib
56
from typing import TYPE_CHECKING
67

@@ -49,6 +50,7 @@ def __init__(
4950
config: EngineConfig,
5051
writer: LogWriter,
5152
metrics_tracker: MetricsTracker,
53+
pre_step_hook: Callable[[], Awaitable[None]] | None = None,
5254
):
5355
self.storage = storage
5456
self.strategy = strategy
@@ -67,6 +69,7 @@ def __init__(
6769
self.metrics = EngineMetrics()
6870
self.state = ProgramStateManager(self.storage)
6971
self._metrics_tracker = metrics_tracker
72+
self._pre_step_hook = pre_step_hook
7073

7174
logger.info(
7275
"[EvolutionEngine] Init | strategy={}, acceptor={}",
@@ -180,6 +183,9 @@ async def run(self) -> None:
180183

181184
async def step(self) -> None:
182185
"""One generation step (idle → mutate → idle → ingest → refresh → idle)."""
186+
if self._pre_step_hook:
187+
await self._pre_step_hook()
188+
183189
# Phase 1: wait until engine is idle (no QUEUED/RUNNING programs)
184190
await self._await_idle()
185191
logger.debug("[EvolutionEngine] Phase 1: Idle confirmed")

gigaevo/prompts/coevolution/stages.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,12 @@ async def compute(self, program: Program) -> FloatDictContainer:
145145
prompt_id = execution_output.prompt_id
146146
stats = await self._stats_provider.get_stats(prompt_id)
147147

148-
fitness = stats.success_rate
148+
if stats.trials < self._min_trials:
149+
# Optimistic default: allows archive entry but won't displace
150+
# prompts with real fitness data.
151+
fitness = 0.01
152+
else:
153+
fitness = stats.success_rate
149154
prompt_length = float(len(execution_output.prompt_text))
150155

151156
logger.debug(
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""Synchronization hook for prompt co-evolution.
2+
3+
MainRunSyncHook blocks the prompt run's engine until the main run advances
4+
by at least one generation. This prevents the lightweight prompt run from
5+
racing far ahead of the expensive main run.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import asyncio
11+
import time
12+
13+
from loguru import logger
14+
15+
16+
class MainRunSyncHook:
17+
"""Pre-step hook that blocks until the main run advances by 1 generation.
18+
19+
Polls the main run's ``engine:total_generations`` counter in Redis and
20+
waits until it exceeds the value seen on the previous call.
21+
22+
Args:
23+
host: Redis host of the main run
24+
port: Redis port
25+
db: Redis DB of the main run
26+
prefix: Key prefix of the main run (e.g. "chains/hotpotqa")
27+
timeout: Maximum seconds to wait before proceeding anyway
28+
poll_interval: Seconds between polls
29+
"""
30+
31+
def __init__(
32+
self,
33+
host: str,
34+
port: int,
35+
db: int,
36+
prefix: str,
37+
timeout: float = 600.0,
38+
poll_interval: float = 5.0,
39+
):
40+
self._host = host
41+
self._port = port
42+
self._db = db
43+
self._prefix = prefix
44+
self._timeout = timeout
45+
self._poll_interval = poll_interval
46+
self._last_main_gen: int = -1
47+
self._redis: object | None = None
48+
49+
def _get_redis(self) -> object:
50+
if self._redis is None:
51+
from redis import asyncio as aioredis
52+
53+
self._redis = aioredis.Redis(
54+
host=self._host,
55+
port=self._port,
56+
db=self._db,
57+
decode_responses=True,
58+
)
59+
return self._redis
60+
61+
async def __call__(self) -> None:
62+
"""Poll until the main run's generation counter advances."""
63+
r = self._get_redis()
64+
key = f"{self._prefix}:run_state"
65+
field = "engine:total_generations"
66+
start = time.monotonic()
67+
68+
while True:
69+
raw = await r.hget(key, field) # type: ignore[attr-defined]
70+
current_gen = int(raw) if raw else 0
71+
72+
if current_gen > self._last_main_gen:
73+
logger.debug(
74+
"[MainRunSyncHook] Main run at gen {} (was {})",
75+
current_gen,
76+
self._last_main_gen,
77+
)
78+
self._last_main_gen = current_gen
79+
return
80+
81+
elapsed = time.monotonic() - start
82+
if elapsed > self._timeout:
83+
logger.warning(
84+
"[MainRunSyncHook] Timeout after {:.0f}s waiting for main gen > {} "
85+
"(current={}), proceeding",
86+
elapsed,
87+
self._last_main_gen,
88+
current_gen,
89+
)
90+
return
91+
92+
await asyncio.sleep(self._poll_interval)

gigaevo/prompts/fetcher.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@
99

1010
from abc import ABC, abstractmethod
1111
from dataclasses import dataclass
12-
import hashlib
1312
from pathlib import Path
13+
import random
1414
import time
1515
from typing import TYPE_CHECKING, Any
1616

1717
from loguru import logger
1818

1919
from gigaevo.prompts import load_prompt
20+
from gigaevo.prompts.coevolution.stats import prompt_text_to_id
2021

2122
if TYPE_CHECKING:
2223
from gigaevo.database.program_storage import ProgramStorage
@@ -218,10 +219,13 @@ def _is_cache_stale(self) -> bool:
218219
return (time.monotonic() - self._cache_timestamp) >= self._cache_ttl
219220

220221
def _refresh_champion(self) -> "_PromptPack | None":
221-
"""Read the current champion from the prompt run's Redis archive.
222+
"""Select a prompt from the prompt run's archive using fitness-proportional sampling.
223+
224+
Instead of always picking the single best, uses stochastic selection so
225+
that multiple prompts accumulate trial data from the main run.
222226
223227
Returns:
224-
_PromptPack if a champion was found, None if archive is empty
228+
_PromptPack if a prompt was selected, None if archive is empty
225229
"""
226230
try:
227231
r = self._get_sync_redis()
@@ -235,46 +239,47 @@ def _refresh_champion(self) -> "_PromptPack | None":
235239
)
236240
return None
237241

238-
# Fetch all programs and find the champion
239-
best_program_id: str | None = None
240-
best_fitness: float = float("-inf")
241-
best_code: str | None = None
242+
# Collect all candidates with their fitness and code
243+
import json
242244

245+
candidates: list[tuple[str, float, str]] = [] # (pid, fitness, code)
243246
for pid in program_ids:
244247
program_key = f"{self._prompt_prefix}:program:{pid}"
245248
raw = r.get(program_key)
246249
if not raw:
247250
continue
248251
try:
249-
import json
250-
251252
data = json.loads(raw)
252253
metrics = data.get("metrics", {})
253-
fitness = float(metrics.get(self._fitness_key, float("-inf")))
254+
fitness = float(metrics.get(self._fitness_key, 0.0))
254255
code = data.get("code", "")
255-
if fitness > best_fitness and code:
256-
best_fitness = fitness
257-
best_program_id = pid
258-
best_code = code
256+
if code:
257+
candidates.append((pid, fitness, code))
259258
except Exception as exc:
260259
logger.debug(
261260
f"[GigaEvoArchivePromptFetcher] Error parsing program {pid}: {exc}"
262261
)
263262
continue
264263

265-
if best_code is None or best_program_id is None:
264+
if not candidates:
266265
return None
267266

268-
# Execute the champion's entrypoint() to get the prompt pack
269-
prompt_id = hashlib.sha256(best_program_id.encode()).hexdigest()[:16]
270-
pack = self._execute_entrypoint(best_code, prompt_id)
267+
# Fitness-proportional sampling (epsilon floor for zero-fitness prompts)
268+
epsilon = 0.01
269+
weights = [max(f, epsilon) for _, f, _ in candidates]
270+
chosen_pid, chosen_fitness, chosen_code = random.choices(
271+
candidates, weights=weights, k=1
272+
)[0]
273+
274+
pack = self._execute_entrypoint(chosen_code)
271275
if pack is None:
272276
return None
273277

274278
logger.debug(
275-
f"[GigaEvoArchivePromptFetcher] Champion: {best_program_id[:8]} "
276-
f"fitness={best_fitness:.4f} prompt_id={prompt_id} "
277-
f"has_user={pack.user is not None}"
279+
f"[GigaEvoArchivePromptFetcher] Selected: {chosen_pid[:8]} "
280+
f"fitness={chosen_fitness:.4f} prompt_id={pack.prompt_id} "
281+
f"has_user={pack.user is not None} "
282+
f"(from {len(candidates)} candidates)"
278283
)
279284
return pack
280285

@@ -285,17 +290,19 @@ def _refresh_champion(self) -> "_PromptPack | None":
285290
)
286291
return None
287292

288-
def _execute_entrypoint(self, code: str, prompt_id: str) -> "_PromptPack | None":
293+
def _execute_entrypoint(self, code: str) -> "_PromptPack | None":
289294
"""Execute a program's entrypoint() in a clean namespace.
290295
296+
Computes prompt_id from the system prompt TEXT (not the program UUID)
297+
so it matches the ID used by PromptFitnessStage on the read side.
298+
291299
Args:
292300
code: Python source code with entrypoint() function that returns
293301
either a str (system prompt only) or a dict with keys
294302
"system" (required) and "user" (optional).
295-
prompt_id: Pre-computed prompt_id to attach to the resulting pack.
296303
297304
Returns:
298-
_PromptPack with system/user texts, or None on error
305+
_PromptPack with system/user texts and text-derived prompt_id, or None on error
299306
"""
300307
try:
301308
namespace: dict[str, Any] = {}
@@ -313,7 +320,8 @@ def _execute_entrypoint(self, code: str, prompt_id: str) -> "_PromptPack | None"
313320
"[GigaEvoArchivePromptFetcher] entrypoint() returned empty string"
314321
)
315322
return None
316-
return _PromptPack(system=result, user=None, prompt_id=prompt_id)
323+
pid = prompt_text_to_id(result)
324+
return _PromptPack(system=result, user=None, prompt_id=pid)
317325
elif isinstance(result, dict):
318326
system = result.get("system", "")
319327
if not isinstance(system, str) or not system.strip():
@@ -327,7 +335,8 @@ def _execute_entrypoint(self, code: str, prompt_id: str) -> "_PromptPack | None"
327335
"[GigaEvoArchivePromptFetcher] dict entrypoint() has invalid 'user' key — ignoring"
328336
)
329337
user = None
330-
return _PromptPack(system=system, user=user, prompt_id=prompt_id)
338+
pid = prompt_text_to_id(system)
339+
return _PromptPack(system=system, user=user, prompt_id=pid)
331340
else:
332341
logger.warning(
333342
f"[GigaEvoArchivePromptFetcher] entrypoint() returned {type(result)}, "

tests/prompts/test_coevolution_stages.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ async def test_compute_with_stats(self, mock_stats_provider: MagicMock):
272272

273273
@pytest.mark.asyncio
274274
async def test_compute_no_stats(self, mock_stats_provider: MagicMock):
275-
"""compute() returns 0.0 fitness when no stats available."""
275+
"""compute() returns optimistic default fitness when no stats available."""
276276
mock_stats_provider.get_stats = AsyncMock(
277277
return_value=PromptMutationStats(trials=0, successes=0, success_rate=0.0)
278278
)
@@ -285,7 +285,7 @@ async def test_compute_no_stats(self, mock_stats_provider: MagicMock):
285285

286286
result = await stage.compute(program)
287287

288-
assert result.data["fitness"] == 0.0
288+
assert result.data["fitness"] == 0.01 # optimistic default
289289
assert result.data["is_valid"] == 1.0
290290

291291
@pytest.mark.asyncio

tests/prompts/test_fetcher.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,14 @@ def test_execute_entrypoint_str_return(self, tmp_prompts_dir: Path):
220220
fallback_prompts_dir=tmp_prompts_dir,
221221
)
222222
code = 'def entrypoint() -> str:\n return "Hello system."'
223-
pack = fetcher._execute_entrypoint(code, "abc123")
223+
pack = fetcher._execute_entrypoint(code)
224224
assert pack is not None
225225
assert pack.system == "Hello system."
226226
assert pack.user is None
227-
assert pack.prompt_id == "abc123"
227+
# prompt_id is now derived from the text, not passed in
228+
from gigaevo.prompts.coevolution.stats import prompt_text_to_id
229+
230+
assert pack.prompt_id == prompt_text_to_id("Hello system.")
228231

229232
def test_execute_entrypoint_dict_return_with_user(self, tmp_prompts_dir: Path):
230233
"""_execute_entrypoint() handles dict-returning entrypoint with user key."""
@@ -237,11 +240,13 @@ def test_execute_entrypoint_dict_return_with_user(self, tmp_prompts_dir: Path):
237240
"def entrypoint() -> dict:\n"
238241
' return {"system": "System text.", "user": "User text {count}."}'
239242
)
240-
pack = fetcher._execute_entrypoint(code, "xyz789")
243+
pack = fetcher._execute_entrypoint(code)
241244
assert pack is not None
242245
assert pack.system == "System text."
243246
assert pack.user == "User text {count}."
244-
assert pack.prompt_id == "xyz789"
247+
from gigaevo.prompts.coevolution.stats import prompt_text_to_id
248+
249+
assert pack.prompt_id == prompt_text_to_id("System text.")
245250

246251
def test_execute_entrypoint_dict_return_no_user(self, tmp_prompts_dir: Path):
247252
"""_execute_entrypoint() handles dict with system only."""
@@ -251,7 +256,7 @@ def test_execute_entrypoint_dict_return_no_user(self, tmp_prompts_dir: Path):
251256
fallback_prompts_dir=tmp_prompts_dir,
252257
)
253258
code = 'def entrypoint() -> dict:\n return {"system": "System only."}'
254-
pack = fetcher._execute_entrypoint(code, "sys_only")
259+
pack = fetcher._execute_entrypoint(code)
255260
assert pack is not None
256261
assert pack.system == "System only."
257262
assert pack.user is None
@@ -266,7 +271,7 @@ def test_execute_entrypoint_dict_missing_system_returns_none(
266271
fallback_prompts_dir=tmp_prompts_dir,
267272
)
268273
code = 'def entrypoint() -> dict:\n return {"user": "Only user."}'
269-
pack = fetcher._execute_entrypoint(code, "no_sys")
274+
pack = fetcher._execute_entrypoint(code)
270275
assert pack is None
271276

272277
def test_execute_entrypoint_invalid_type_returns_none(self, tmp_prompts_dir: Path):
@@ -277,7 +282,7 @@ def test_execute_entrypoint_invalid_type_returns_none(self, tmp_prompts_dir: Pat
277282
fallback_prompts_dir=tmp_prompts_dir,
278283
)
279284
code = "def entrypoint():\n return 42"
280-
pack = fetcher._execute_entrypoint(code, "bad_type")
285+
pack = fetcher._execute_entrypoint(code)
281286
assert pack is None
282287

283288

0 commit comments

Comments
 (0)