Skip to content

feat(dataplane): typed Redis coordination plane with atomic Lua substrate, FSM migration, and Hydra hardening#20

Open
GrigoryEvko wants to merge 849 commits into
FusionBrainLab:mainfrom
GrigoryEvko:feat/dataplane-foundation
Open

feat(dataplane): typed Redis coordination plane with atomic Lua substrate, FSM migration, and Hydra hardening#20
GrigoryEvko wants to merge 849 commits into
FusionBrainLab:mainfrom
GrigoryEvko:feat/dataplane-foundation

Conversation

@GrigoryEvko

@GrigoryEvko GrigoryEvko commented May 17, 2026

Copy link
Copy Markdown

DataPlane

1. Аннотация

Настоящий пакет изменений вводит в репозиторий gigaevo-core подсистему gigaevo.dataplane: единственный типизированный асинхронный координатор для всех взаимодействий с Redis. Координатором закрываются десять из четырнадцати структурных классов дефектов, идентифицированных и адресуемых настоящим PR, а сам подход кодифицирует архитектурную дисциплину, при которой бо́льшая часть исторически наблюдавшихся ошибочных состояний становится непредставимой по построению (unrepresentable by construction).

В рамках PR реализуется:

  1. Полноценный субстрат координатора: пул соединений с обязательным decode_responses=True, реестр Lua-скриптов LuaRegistry с однократным NOSCRIPT-восстановлением, каталог из восьми атомарных Lua-скриптов, типизированная иерархия исключений, словарь идентификаторов на NewType, перечислительные FSM-таблицы, типизированный возвращаемый тип Result[T, E], словарь свежести Freshness, фантомные типы провенанса Sourced[T, S], линейные токены Token[Tag] с подсистемой выпуска, обёртки потерянного владения wrap_lease / CrashWatchedHandle, гигиенический индикатор OneShotFlag, носители времени HlcTimestamp.

  2. Полные миграции пяти прикладных вертикалей: хранилище программ, координатор переходов состояний, архив элит, очередь миграционной шины (частично; полное закрытие отложено в фазу 2 настоящей программы изоляции), словарь подсказок (prompts/fetcher, prompts/coevolution/{stats,sync}), реестр многоруких бандитов (llm/bandit), распределённая блокировка RedisInstanceLock.

  3. Упрочнение пути, проходящего через Hydra и OmegaConf (фаза 0): удалены резолверы произвольного исполнения кода, устранены долгоживущие модульные синглтоны, исправлен дрейф lru_cache поверх объектов с привязкой к событийному циклу, формализованы вспомогательные функции wire_* для каждой точки повторного связывания при сборке объектов.

  4. Дисциплинарные ограждения: блокировка прямых импортов redis/redis.asyncio/redis.exceptions вне субстрата DataPlane через ruff TID251 с явным белым списком унаследованных файлов; блокировка прямых вызовов pickle.loads/cloudpickle.loads через тот же механизм с белым списком точек кода подпроцессного IPC; AST-инспектор, отвергающий неаннотированные широкие except внутри субстрата.

  5. Дополнительные слои тестирования: исчерпывающее покрытие FSM (полное произведение ProgramState × ProgramState), фаззинг конкурентных писателей (одновременная гонка на crdt_inc, transition_program_state, try_replace_elite, acquire_instance_lock), property-based проверки идемпотентности через content_hash, регрессионные тесты для каждого исправленного класса дефектов.

PR подготавливает архитектуру для последующей фазы 2 (событийный поток с группами потребителей, валидация версии схемы, межпроцессная репликация состояний). Субстрат фазы 2 размещён в коде и каталогизирован в разделе 15 настоящего документа.

Концептуальным потребителем заложенной архитектурной базы выступает RFC «Распределённое исполнение в gigaevo-core» (issue #18). RFC описывает переход от модели «один драйвер, один хост» к парку независимых обработчиков, координируемых через Redis по принципу «единственная плоскость управления». Этапы 3, 4 и 5 RFC опираются на примитивы координации, типизированные исключения, идемпотентность и агрегацию с актор-партиционированием, поставляемые настоящим PR. Полная таблица соответствий «требование RFC ↔ компонент DataPlane PR» приводится в разделе 16 настоящего документа.

2. Контекст и мотивация

2.1. Историческая ситуация в репозитории

До настоящего PR взаимодействие с Redis в gigaevo-core распределялось по более чем двадцати разнородным точкам входа: ручные конвейеры WATCH/MULTI/EXEC, прямые вызовы INCR, HSET, HGETALL, XADD, XREAD, SETNX, выполнение Lua-скриптов через прямой EVAL, плюс набор приватных синхронных клиентов внутри асинхронных подсистем. Каждая точка вызова повторяла локальную копию инвариантов: какие именно поля инкрементируются, какой именно набор ключей участвует в WATCH, в каком порядке производится проверка инвариантов, какой именно тип возвращаемого значения интерпретируется как ошибка.

Подобная фрагментация порождала структурно повторяющиеся классы дефектов. Каждое исправление носило локальный характер и не предотвращало воспроизведение того же дефекта в соседнем модуле спустя месяц.

2.2. Цель PR

Целью настоящего PR является ввод единственной точки концентрации семантики координации, через которую обязаны проходить все операции записи, чтение которых требует свежести, и все операции с условной атомарностью. Через типизацию Result[T, E], линейные токены Token[Tag], дискриминированные FSM-таблицы и фантомные типы провенанса достигается состояние, при котором значительная часть исторически встречавшихся ошибочных состояний оказывается непредставимой на уровне типов.

Помимо лечения существующих дефектов, ставится задача создания дисциплинарного субстрата для последующих фаз: событийный поток (фаза 2), источник событий с переключением источника правды (фаза 3), распределённое слияние через HLC + VectorClock (фаза 4 общего плана программы изоляции). Дополнительно архитектурная база поставляется в распоряжение распределённой топологии, описанной в RFC #18 «Распределённое исполнение в gigaevo-core»: целевая нагрузка (оптимизация CUDA-ядер через эволюционный поиск в смежном проекте kernel-evo с эксклюзивным GPU-захватом и гетерогенным парком обработчиков) предъявляет требования, удовлетворяемые ровно теми примитивами, которые вводятся настоящим PR.

2.3. Принципы

Реализация подчиняется следующим архитектурным принципам.

Принцип 1: монопольный субстрат. Все обращения к Redis обязаны проходить через gigaevo.dataplane.DataPlane. Ограничение поддерживается линтером ruff TID251 с исчерпывающим белым списком унаследованных файлов; каждая запись в белом списке сопровождается коммитом, удаляющим её по мере миграции соответствующего файла.

Принцип 2: атомарность на стороне сервера. Любая операция записи, требующая условной семантики (read-modify-write, expected-from CAS, content-hash дедупликация), выполняется в единственном EVALSHA через зарегистрированный заранее Lua-скрипт. Повторные попытки на стороне приложения исключаются как класс, повторные попытки WATCH-цикла исключаются как класс.

Принцип 3: типизация по построению. Каждая ID-сущность оформляется через NewType, каждое лицензионное состояние гарантируется через линейный токен Token[Tag], каждое чтение, требующее свежести, возвращается под фантомным конструктором провенанса (LocalValue[T], CachedValue[T], и т.п.), каждый возврат предоставляется в дискриминированной форме Result[T, E]. Средство проверки типов mypy в строгом режиме обнаруживает рассогласования до момента выполнения.

Принцип 4: один контур для каждого ресурса. Каждый подпространственный ресурс (программа в FSM, ячейка архива, счётчик CRDT, ключ блокировки) обладает собственным корневым токеном, выпускаемым через mint_root на этапе запуска движка. Дочерние токены доставляются вниз по графу вызовов посредством EngineRoot.split_*. Двух одновременных писателей одного подпространства невозможно сконструировать без явного видимого нарушения дисциплины линейного потока.

Принцип 5: упорядоченная разрядность жизненного цикла. Запуск и останов координатора идемпотентны, защищены ленивой инициализацией asyncio.Lock, обрабатывают asyncio.CancelledError отдельно от Exception, разделяют чистку пула и финальное освобождение токенов через ContextVar. await dp.shutdown() корректен даже после частичного сбоя await dp.startup().

Принцип 6: типизированная обработка сбоев. Внутренние исключения никогда не пересекают границу координатора. Любой результат возвращается через Result[T, E]; обработка сбоев производится через match. Внутри субстрата широкие except Exception допускаются исключительно на границе конвертации в Result, и каждое такое место помечается через # noqa: BLE001 под контролем AST-инспектора test_no_silent_broad_except.

2.4. Гранулярность изменений и стратегия поставки

Изменения поставляются в виде последовательности приблизительно пятидесяти коммитов, каждый из которых проходит полный прогон тестов (~4 789 тестов). Сборка PR проводилась поэтапно:

  1. Скелет субстрата (feat(dataplane): typed coordinator package foundation) с публичной поверхностью без тел методов.
  2. Упрочнение примитивов против враждебных входов.
  3. Внедрение восьми Lua-скриптов с типизированными обёртками на стороне Python.
  4. Миграция вертикалей по одной: блокировки, архив, словарь подсказок, FSM-переходы, бандит, миграционные пакетные переходы.
  5. Внедрение wrap_lease для типизированного контроля потерянной блокировки.
  6. Внедрение EngineRoot и потокового разделения токенов в хранилище.
  7. Расширение Freshness на crdt_read, добавление обёртки LocalValue поверх read_program.
  8. Финальная сборка run.py: точка входа после instantiate присоединяет координатор к хранилищу, бандиту, словарю подсказок, DagRunner и движку эволюции.
  9. Упрочнение пути через Hydra (фаза 0): удаление RCE-резолвера eval, дезактивация модульных синглтонов трекеров, устранение lru_cache на пуле подпроцессного исполнения, формализация обёртки strict_chat_openai поверх ChatOpenAI с явным контролем неизвестных kwargs, унификация ${ref:...} против ${X} в 14 YAML-файлах, добавление явного _self_ в 18 defaults-блоках.
  10. Уплотнение тестового покрытия: исчерпывающее покрытие FSM, конкурентный фаззинг, AST-инспектор широких except, регрессии для каждого исправленного класса.
  11. Контрольная зачистка набора изменений: первая итерация устранила около 1 629 строк описательного «шума», вторая итерация дополнительно вернула около 32 строк пустых переименований без семантического выигрыша.

17. Метрики PR

17.1. Количественные

Метрика Значение
Файлов изменено около 138
Строк добавлено 17 324
Строк удалено 833
Чистая дельта +16 491
Атомарных коммитов около 50
Тестов в полном прогоне 4 789
Новых тестов около 1 200
Время полного прогона 6 мин 38 сек
Lua-скриптов 8
Wire-функций 6
Публичных методов DataPlane 16
Типов исключений в иерархии 19
NewType-идентификаторов 22
Алиасов Sourced 6
FSM-таблиц 3 (program, claim, lock)
Решёточных классов 5
Финальной зачистки удалено строк около 1 660

17.2. Качественные

Метрика Значение
Структурных классов багов, идентифицированных и адресуемых PR 14
Закрытых классов 10 (полностью или существенно)
Отложенных в фазу 2 3 (#3, #4, #5)
Частично закрытых через линтер 1 (#6)
Конкретных индивидуальных дефектов из аудитов около 114
Полностью устранённых 5
Частично закрытых 14
Вне круга ответственности DataPlane около 95
Активно проявляющихся HIGH дефектов в production 3 (все закрыты)
HIGH дефектов эксплуатационной природы 4 (все нейтрализованы)

17.3. Покрытие

Зона Покрытие
gigaevo/dataplane/**/*.py mypy strict, 100% поверхности тестами
RedisProgramStorage.transition_program_state через DataPlane (wired) или legacy (unwired)
RedisProgramStorage.batch_transition_* через DataPlane (wired) или legacy (unwired)
RedisArchiveStorage.add_elite через DataPlane для редуцируемых селекторов
RedisInstanceLock через LuaRegistry (wired) или legacy (unwired)
GigaEvoArchivePromptFetcher полностью через DataPlane
BanditModelRouter зеркало через DataPlane (опционально)
MainRunSyncHook через DataPlane

18. Связанные параллельные PR

Настоящий PR координируется с серией параллельных открытых pull request, каждый из которых закрывает локализованный класс проблем в смежной зоне. Перечень с координатами и областью применимости:

PR Заголовок Зона Связь с DataPlane PR
#2 fix(bandit): return neutral reward for non-finite fitness inputs gigaevo/llm/bandit.py Дополняет Класс №9 (тихое маскирование) и раздел 14.1 на стороне compute_bandit_reward; те же isfinite()-проверки введены императивно в counter_inc.lua и archive_swap.lua
#3 fix(acceptor): reject NaN and +inf is_valid evolution/engine/acceptor.py Дополняет Класс №11 (cache возвращает невалидное) и раздел 14.1; смежно с серверной валидацией score в archive_swap.lua
#5 fix(mutation): keep canonicalization output parseable evolution/mutation/mutation_operator.py Прямой связи с DataPlane не имеет; работает на уровне корректности мутационного выхода
#8 fix(llm): _with_langfuse must not mutate caller's config llm/models.py Дополняет Класс №11 концептуально (защита от наведённой мутации через aliased-контейнеры)
#10 sanitize LLM-derived text before logging / JSON / database writes utils/text_sanitize.py и тринадцать точек интеграции Закрывает поверхности раздела 14.1 (UTF-16-суррогаты, NUL, ANSI, BIDI); интегрирован в fetcher.py, stats.py, bandit.py интеграционным коммитом
#11 enable pytest-xdist parallel execution pyproject.toml, pytest.ini Ускоряет прогон 4 789 тестов настоящего PR; маркер xdist_group доступен под Redis-fixture-семейства
#12 fix(bandit): skip reward updates for unknown arm names llm/bandit.py Дополняет Класс №9 и №5 (rolling-upgrade с устаревшими именами рук); закрывает один из путей тихого падения callback агента
#13 feat(llm): add LLM call outcome classifier / state machine llm/call_outcome.py Заменяет ad-hoc try/except ladder в bandit и роутере единым словарём 12 вариантов LLMCallOutcome; импорты BanditAction и classify_call_result введены в bandit.py интеграционным коммитом
#14 refactor(python_executors): replace WorkerPool with loky reusable executor programs/stages/python_executors/wrapper.py Закрывает Класс №8 (модульные синглтоны через lru_cache) более радикальным средством; TOCTOU-symlink-swap (RCE-вектор), фильтрация секретов окружения (AWS_*, OPENAI_API_KEY и др.), bounded spill через mmap. Настоящий PR принимает имплементацию из #14 целиком в интеграционном коммите
#15 preserve error context across stage / runner / validation paths четыре точки optimization, validation, dag_runner Дополняет Класс №9; восстанавливает __cause__, структурные атрибуты SyntaxError, logger.exception(...)
#16 fail-loudly hygiene + operator override hatches pipeline_builder, StageRegistry Концептуально смежно с дисциплиной типобезопасности по построению (тихий overwrite на дубликате регистрации поднимает ValueError)
#17 replace httpx with aiohttp + requests for long-running asyncio stability infra/aiohttp_factory.py, infra/requests_factory.py, infra/_net.py Дополняет Класс №7 (асинхронные сетевые границы); закрывает накопление неосвобождённых соединений в httpcore. Дисциплина RedisConnection настоящего PR родственна по форме
#19 replace deprecated asyncio.get_event_loop + skip-guard test_manifest одиннадцать тестовых точек, test_manifest.py Разблокирует сбор тестов на чистом checkout main; смежно с разделом 13.7 настоящего PR

После интеграционного коммита (см. историю ветки) настоящий PR содержит зеркало всего набора изменений nightly и автоматически объединяем с любой из перечисленных параллельных веток.

19. Заключение

Настоящий пакет изменений завершает первую фазу долгосрочной программы изоляции взаимодействий с Redis в единственный типизированный субстрат. Десять из четырнадцати структурных классов дефектов, идентифицированных и адресуемых настоящим PR, закрываются либо существенно нейтрализуются. Три класса (события, группы потребителей, версионность схемы) отнесены к фазе 2 с подготовленным субстратом. Один класс (десериализация pickle как RCE) частично закрывается через линтер с явным указанием оставшейся миграционной цели.

Дисциплинарные ограждения (линтер TID251, AST-инспектор широких except, валидаторы враждебных входов, экспериментная двухпроходная зачистка набора изменений) обеспечивают невозможность регрессии: любая новая попытка ввода прямого redis-импорта вне DataPlane или новой точки вызова pickle.loads обнаруживается на этапе CI; любой новый широкий except в субстрате без явного маркера приводит к падению AST-инспектора.

Полная типобезопасность через Result[T, E] + Token[Tag] + Versioned[T] + Freshness + Sourced[T, S] + CrashEvent обеспечивает состояние, при котором значительная часть исторически наблюдавшихся ошибочных состояний оказывается непредставимой по построению. mypy в строгом режиме обнаруживает рассогласования до момента запуска.

Архитектурная база подготовлена для последующих фаз: подпространство под upcasters/, словарь NewType для событий и потоков, решёточный каталог для распределённых путей допуска, фантомный конструктор провенанса для разделения кешированных, повторных и локальных чтений. Будущие миграции (RedisStreamTransport на группы потребителей, версия схемы на каждой Pydantic-модели, источник событий с переключением источника правды) опираются на субстрат, уже размещённый в коде и каталогизированный в разделе 15 настоящего документа. Концептуальным целевым потребителем выступает RFC «Распределённое исполнение в gigaevo-core» (issue #18): этапы 3, 4 и 5 RFC реализуются как прямое сложение типизированного событийного потока поверх настоящего субстрата без переизобретения дисциплины координационной плоскости, как подробно изложено в разделе 16.

Совокупно представленное вмешательство кодифицирует дисциплину «ill-formed state is unrepresentable by construction» применительно к Redis-взаимодействиям. Эта дисциплина переносима на последующие подсистемы (транспорт шины сообщений, источник событий, распределённое слияние) с минимальной добавочной стоимостью, поскольку базовый словарь типов, линтерных правил и тестовых паттернов уже введён в репозиторий и принят к исполнению.


Полный проектный документ (классы дефектов, аудит, архитектура, миграции, тестирование, связь с RFC #18) поставляется в виде серии комментариев ниже.

PetrAnokhin and others added 30 commits April 1, 2026 16:00
…nd async LLM refinement. Introduce RecordCardEmbedding and ClusterCard data structures for managing embeddings and clusters. Update existing IdeaAnalyzer to support description rewriting. Add new prompts for cluster refinement and representative selection.
… Update memory.yaml to include analyzer settings and fast analyzer configurations. Modify IdeaTracker to support both default and fast analyzer types, enhancing idea processing efficiency. Add new fabric components for analyzer and Redis configuration creation.
…loading and task description handling. Introduce new utility functions for loading configurations and task descriptions, and enhance the summary generation process. Update type checks for idea data structures in RecordBank. Add new components for statistics and summary processing.
…etons; drop exec-pool lru_cache

resolvers.py
  Three globally-registered resolvers gone: eval (= builtins.eval,
  full RCE reachable via interpolation), merge and len (zero call
  sites in any config). get_object and ref registrations gain
  replace=True so a second register_resolvers() call from a test or
  notebook re-run no longer raises ValueError. oc.env is a built-in
  OmegaConf resolver; no explicit registration to harden.

trackers/__init__.py
  Module-level _tb_default / _wb_default / _redis_default globals
  deleted. init_tb / init_wandb / init_redis now construct a fresh
  GenericLogger per call; the four get_* accessors deleted (zero
  external production callers). The composite init_tb_redis and
  init_wandb_redis factories no longer hand back a cached instance.
  Under multirun the second run gets its own writer pointed at its
  own config instead of silently reusing the first run's closed
  writer.

python_executors/wrapper.py
  @functools.lru_cache(maxsize=1) removed from
  default_exec_runner_pool. The asyncio.Queue / Lock / subprocess
  transports bound to the first event loop no longer leak across
  asyncio.run boundaries — a second engine run gets a fresh pool.
  Docstring documents that lifecycle is the caller's responsibility.

tests/conftest.py
  Replaced the _clear_exec_runner_pool autouse fixture (incompatible
  with the cache removal) with a session-scoped autouse fixture that
  calls register_resolvers() once per pytest session. Tests that load
  YAMLs via compose() without going through run.py no longer fail
  with Unsupported interpolation type ref.

New tests (+16):
- TestRegisterResolvers (4): dangerous resolvers absent, required
  resolvers present, double-registration idempotent, ref end-to-end.
- test_trackers_singletons (8): fresh-per-call identity for the
  three init_* factories plus multirun-safety routing for the two
  composite factories.
- test_exec_runner_pool (4): fresh-per-call identity, absence of
  lru_cache attributes, independent asyncio primitives across
  sequential asyncio.run invocations.

Production callers in gigaevo/programs/stages/python_executors/
execution.py and three other sites still invoke run_exec_runner
with pool=None — they now construct a single-use pool per call
instead of sharing a process-wide pool. Subprocess startup is no
longer amortized for those paths; follow-up will thread one pool
through run_experiment per the spec.
…lts to null

gigaevo/llm/strict_chat_openai.py (new)
  Factory strict_chat_openai(**kwargs) validates kwargs against
  ChatOpenAI.model_fields plus the harvested Pydantic aliases
  (model_name, openai_api_key, openai_api_base, openai_organization,
  max_completion_tokens, timeout, stop_sequences) before construction.
  Unknown kwargs raise StrictChatOpenAIError naming the offender;
  pre-Hydra typos like tempetature: 0.5 now fail at YAML load instead
  of silently shipping into model_kwargs and onward to the OpenAI API.

  api_key=None raises with a message naming OPENAI_API_KEY env var so
  the missing-env diagnostic is application-level instead of an
  InterpolationResolutionError deep in OmegaConf.

config/llm/{single,openai,google,heterogeneous,heterogeneous_bandit,
openrouter_bandit,openrouter_ensemble,gemini25_pro,gemini31_pro,
gemini3_flash}.yaml
  20 _target_ swaps: langchain_openai.ChatOpenAI →
  gigaevo.llm.strict_chat_openai.strict_chat_openai. 18 api_key
  interpolations gain the OmegaConf comma-default:
  ${oc.env:OPENAI_API_KEY,null}. The factory's None-guard surfaces a
  clear typed error when the env is genuinely unset.

tests/llm/test_strict_chat_openai.py (new, 10 tests)
  Known kwargs pass; known aliases pass (max_tokens, request_timeout,
  base_url, api_key, etc.); single-typo kwarg raises naming the
  offender; multi-typo kwarg raises naming both; None api_key raises
  naming OPENAI_API_KEY; via Hydra instantiate, a typoed YAML field
  raises InstantiationException whose __cause__ is the
  StrictChatOpenAIError.
…cit _self_ to defaults

config/algorithm/*.yaml, config/constants/islands.yaml
  IslandConfig.migration_rate was silently dropped at construction
  (Pydantic extra='ignore'). Engine audit confirmed no consumer reads
  per-island migration_rate; the global MapElitesMultiIsland.
  migration_interval and max_migrants_per_island are authoritative.
  Deleted the dead migration_rate: ${migration_rate} key at all 8
  algorithm sites and the dead constant in constants/islands.yaml.
  Schema pinned by two new tests in tests/evolution/test_island.py
  so future regressions fail fast.

config/pipeline/*.yaml, config/algorithm/_base.yaml,
config/evolution/{default,steady_state}.yaml,
config/pipeline/auto.yaml
  Plain interpolations ${redis_storage}, ${problem_context}, ${llm},
  ${metrics_context} unified to ${ref:...} across 14 YAMLs. Mixed
  use of the plain form and the ref-mutating form caused C3 double-
  instantiation of ProblemContext on every pipeline run; one half
  saw add_auxiliary_metrics mutation, the other did not. Post-edit
  rg over the four protected names returns zero plain references.
  The pure-value interpolations (${primary_key}, ${higher_is_better},
  ${seed}, ${stage_timeout}, ${temperature}, etc.) stay as-is —
  those don't traverse the mutating resolver.

config/experiment/*.yaml, config/algorithm/*.yaml,
config/constants/base.yaml, config/evolution/steady_state.yaml
  Added explicit - _self_ to the defaults: block of 18 YAMLs (8
  experiment + 8 algorithm + 2 misc). Hydra 1.3's implicit-last
  semantics emits UserWarning; future Hydra would have flipped the
  default. Last-position _self_ matches the runtime intent — file-
  level overrides win over inherited defaults — and the migration-
  bus / single-island-variant chains keep working unchanged.
…cycle leak fixes; gitignore artifacts

wrapper.py
  Module-level contextvars.ContextVar[WorkerPool | None] carries an
  experiment-scoped pool through every run_exec_runner(pool=None)
  call site without touching stage signatures or factory closures.
  set_ambient_exec_runner_pool / reset_ambient_exec_runner_pool /
  get_ambient_exec_runner_pool form the bind/reset/lookup surface.
  run_exec_runner consults the ambient pool when no explicit pool=
  is supplied; explicit pool= argument still wins. Restores the
  subprocess-startup amortization that the lru_cache removal in the
  previous commit silently regressed.

run.py
  run_experiment now builds one WorkerPool, binds it via the
  contextvar, and drains it in finally before resetting the token.
  Every existing pool=None call site (execution.py, runtime_metrics.py,
  optimization/utils.py, optimization/optuna/stage.py) now resolves
  to the bound pool instead of creating a fresh single-use pool per
  invocation.

  M3 fix: an inner try/finally wraps the dag_runner.start() +
  evolution_engine.start() + serve_until_signal sequence. The finally
  re-awaits both stop() methods so an exception between start and
  serve no longer leaks the two background asyncio tasks. Both stop()
  methods verified idempotent (engine.stop nulls out _task after
  cancel; runner.stop swaps _redis to None first).

  M4 fix: an outer try/finally covers the full body. Every component
  local defaults to None; the finally guards each component
  individually so a mid-flight instantiate failure no longer leaks
  the writer's daemon thread or the Redis ping connection. The
  ambient pool token is reset regardless.

.gitignore
  Added wandb/, runs/, tb_logs/, tensorboard/ — test-run artifacts
  that were leaking into git status.

13 new tests in tests/stages/test_exec_runner_pool.py and tests/
entrypoint/test_run_experiment_lifecycle.py pin:
- ambient-pool reuse across consecutive run_exec_runner calls
- explicit pool= argument overrides ambient
- fallback when no ambient is bound
- source-grep guard against future unguarded pool=None call sites
- start-then-exception still fires every stop()
- instantiate-failure does not leak threads or the ambient pool token
- stop()s remain idempotent after serve_until_signal already drove them

Wider test sweep: 4760 passed.

Out-of-lane (LOW, deferred): both engine.stop() and runner.stop()
both call self.storage.close() on the same underlying storage —
structurally safe due to RedisConnection idempotency, worth tightening
to single-owner in Phase 1+. serve_until_signal accepts coroutines
rather than callables, so an exception before iteration emits
RuntimeWarning on the unawaited coros — Phase 1+ refactor.
Per dataplane doc §7.1 step 11. The two subprocess-IPC sites in
gigaevo/programs/stages/python_executors/ are allowlisted at the
per-file level (local trust boundary; the subprocess is gigaevo
code running gigaevo code over a private pipe).

gigaevo/programs/utils.py:18 carries an inline noqa with a docstring
naming its Phase 3 migration target — it is the b64-encoded round-
trip from programs/core_types.py:25 content_hash, the bug-class-FusionBrainLab#6
RCE-on-deserialise surface for any Redis blob. Future work removes
this site by switching the content_hash codec to JSON+sha256.
engine_startup.py: add the two helpers mirroring wire_storage /
wire_prompt_fetcher conflict-rejection contract. Idempotent on
identical attach; raises on conflicting re-attach; returns False
silently when invoked on non-target object types.

run.py: replaces raw _dataplane = / _engine_root = attribute
assignments with the two helper invocations. Discipline gap from
the audit closed: every rebind site now goes through a named helper.

2 integration tests pin idempotent re-wire (silent no-op),
conflicting re-wire (raises), and the non-target degrades-to-False
path. __init__.py re-exports both helpers.
…elper

The last live bypass of bug class FusionBrainLab#14 (gigaevo/evolution/bus/node.py:120
direct program.state = ProgramState.DONE on cross-engine migrant
ingestion). The migrant is a freshly-rehydrated Python object with no
prior in-run FSM history, so the in-run FSM table can't validate the
transition — the migrant arrives in whatever state the originating
engine last persisted (typically QUEUED), and the receiving engine
must mark it DONE so its own evolution loop ignores it.

state_manager.register_external_terminal_state(program, new_state) is
the named cross-run bypass: rejects non-terminal targets, mutates the
in-memory Program, no Redis I/O. Greppable so any future contributor
adding cross-run state transfer has one place to read.

bus/node.py:120 now calls the named helper; the bypass is no longer
anonymous.

4 tests pin the helper's rejection of non-terminal targets and the
node-level routing through it.
…tion_program_state_batch

The single-program transition path migrated to dp.transition_program_state
earlier; the batch methods at redis_program_storage.py:709-849 still
used raw pipe.set with Python-side validate_transition only — concurrent
writers could race on the same program in a batch.

_batch_transition_via_dataplane builds BatchTransitionItem tuples and
dispatches through dp.transition_program_state_batch. Per-item tokens
derive from engine_root.split_program_token when wired; fall back to
mint_root per item otherwise. The batch wrapper is per-item atomic
(per dataplane doc §4.2 — whole-batch atomicity is not guaranteed);
that matches the legacy semantics.

batch_transition_by_ids loads programs via parallel get() in the dp
branch (trades the legacy raw-JSON optimisation for FSM Lua atomicity).

Legacy raw-pipeline path preserved under dataplane=None.

5 tests cover both branches, filter semantics, illegal pair rejection
(returns Err), and the legacy fallback.
…y; wrap_lease for typed crash flow

Two integrations on RedisInstanceLock.__init__.

(1) Eliminates ~50 LOC of script_load + NoScriptError retry that
duplicated LuaRegistry.evalsha. When dataplane is wired, _evalsha
delegates to dp._lua.evalsha; the dp's reload-and-retry path is the
single source of truth for the three lock Lua scripts. Legacy direct-
aioredis path stays under dataplane=None for backwards-compat with
callers that don't yet have a dp reference.

(2) Wires wrap_lease (dataplane doc §3.13 + §5.2). After acquire(),
the lease is wrapped via dp.wrap_lease and surfaces lock-loss as a
typed CrashEvent. The renewer signals the lease's OneShotFlag on
token-CAS failure or transport error; the new wrapped_lease property
+ observe_loss() coroutine return the CrashEvent (one-shot
consumption). Callers can opt into typed control flow instead of
polling lease.flag.is_set() out of band.

4 new tests pin: acquire routes via LuaRegistry, renewal-loss
surfaces as CrashEvent, release clears the wrapper, dataplane=None
returns None for wrapped_lease. 17 existing locking tests unchanged.
…iant

Only compute_content_hash_hex has a consumer (coordinator.py uses it
for the FSM idempotency token). The bytes variant had zero callers
anywhere. Phase 2's emit_event.lua may want a wire-efficient 32-byte
payload but that's a deliberate Phase 2 decision; today the bytes
variant is speculative substrate.

compute_content_hash_hex now computes sha256 directly via hexdigest()
instead of routing through the deleted bytes function. Behaviour
unchanged at every call site.

ContentHash NewType + canonical-JSON encoder kept (vocabulary +
in-use machinery).
mint_root + mint_split + mint_split_n cover every production caller
(engine_startup, redis_program_storage, archive_storage, dataplane
test suite). mint_combine had zero consumers — the Permission
algebra's recombine operation is speculative substrate until a
real consumer needs it.

Three dedicated test classes deleted along with the function. The
engine_startup comment that named mint_combine in a side note is
updated to describe the actual subspace contract.
The doc-proposed Program.atomic_counter -> Monotonic[int] migration
never landed; current production uses Lua-side INCR for atomicity.
No in-process monotonic counter consumer ever materialised. The
wrappers added runtime cost (an extra attribute + advance check)
without a benefit.

The lattices.py docstring is rewritten to describe what
MonotoneLattice actually backs in production (epoch / generation /
HLC counters) rather than the previous reference to the deleted
Monotonic wrapper.

47 LOC of dedicated test class deleted; 63 LOC of Monotonic /
MonotonicCounter implementation deleted; net -110 LOC.
_phase2_substrate.md enumerates the present-but-unused features that
ARE kept deliberately because they are Phase 2 substrate per the
design doc: lwwr_set/_get + Lua (LWW register with HLC tiebreak),
HlcTimestamp (event timestamps), the lattice classes (BoolLattice
powers Phase 2 admission), and 13 NewType IDs (events / streams /
consumer groups / causation tracking — all zero runtime cost).

For each: one sentence on why it ships now, one on its Phase 2
destination. Reachable from the package docstring at __init__.py.
A future contributor pruning dead code sees this list before
deleting them by mistake.

Incidental: ruff I001 auto-fix on the unrelated
gigaevo/utils/trackers/backends/wandb.py import block.
…nd archive try_replace_elite

Restores the single-writer-per-subspace witness (doc §3.5) on the two
surfaces that previously bypassed it: bandit (no token at all) and
archive (mint_root per call instead of split-from-root).

coordinator.py
  crdt_inc gains optional token: Token[CounterKey] | None. When
  supplied the token is consumed and tag-checked against the
  CounterKey; mismatch returns Err(DataPlaneError). When None the
  legacy path stands (backwards compat).

llm/bandit.py
  SlidingWindowUCB1 / BanditModelRouter accept optional engine_root.
  _schedule_crdt_inc mints per-call counter tokens from
  engine_root.split_counter_token and threads them through
  _do_crdt_inc. Asymmetric engine_root without dataplane raises
  ValueError at construction.

evolution/storage/archive_storage.py
  RedisArchiveStorage accepts engine_root. _add_elite_via_dataplane
  uses engine_root.split_cell_token when wired; mint_root fallback
  preserved for dataplane-less constructors.

dataplane/engine_startup.py
  wire_archive_storage(archive, dataplane, engine_root) helper added
  matching the wire_storage/wire_bandit_router pattern (idempotent,
  raises on conflict). wire_bandit_router extended to accept
  engine_root.

run.py
  wire_archive_storage invoked on every island; engine_root threaded
  into wire_bandit_router.

11 new tests pin token-discipline rejection (consumed twice, tag
mismatch), wire-helper idempotency, backwards-compat with
engine_root=None.
…lValue)

Sourced[T, S] + the six aliases (LocalValue, CachedValue, ReplayedValue,
GossipedValue, ExternalValue, SanitizedValue) were exported with zero
production consumers — the provenance vocabulary was dead code that
the doc §3.4 says should distinguish local-fresh-read vs cached-stale-
read at every call site.

dp.read_program now returns Result[LocalValue[Versioned[ProgramSnapshot]]
| None, DataPlaneError]. The phantom-type wrapper makes the read's
provenance explicit at the type level. Future replay paths would
return ReplayedValue[Versioned[...]]; future cache reads would return
CachedValue[Versioned[...]]; mypy then enforces that a function
demanding LocalValue[Program] cannot accept a CachedValue[Program]
by mistake (bug class FusionBrainLab#13 closed at the type system level).

Production caller updated: gigaevo/runner/dag_runner.py
_timeout_read_is_fresh unwraps the LocalValue via .value.value to
reach the underlying Versioned.

5 tests updated to unwrap the LocalValue wrapper; 1 new test pins
the phantom-wrapper shape returned by read_program.

This is the foothold: a follow-up provenance audit can
rg "LocalValue\|CachedValue\|ReplayedValue" gigaevo/ and find the
first real consumer; subsequent reads (crdt_read, lwwr_get) can
adopt the same pattern.
…p dead narrative

Net delta across 61 files: -1629 LOC (1069 added, 2698 deleted). No
behavioral changes, no test count delta — 4789 tests passing before
and after.

Three parallel cleanup lanes worked the branch diff against main:

Dataplane substrate (~-1500 LOC across coordinator, engine_startup,
permissions, models, codec, scripts, transitions, lattices, ids,
errors, crash, connection, the Lua scripts, and the co-located
tests). Stripped multi-paragraph rationales describing the design,
shrunk module docstrings, removed historical narrative ("previously",
"we now", "before this"), collapsed multi-line in-body comment
blocks to one-liners. The genuinely load-bearing WHY comments
stayed (the cjson directive fallback, the NOSCRIPT coalescing note,
the asyncio-only contract on OneShotFlag, etc.).

Migrated callers (~-570 LOC across redis_program_storage, locking,
state_manager, archive_storage, bandit, fetcher, stats, sync,
mutation_operator, dag_runner, selectors, strict_chat_openai). One
real code smell fixed in coevolution/stats.py: replaced
`for idx, (db, prefix) in enumerate(...)` with a stale dangling
`del db, prefix` that referenced no log line. Dual-path narration
("when wired, route through dp; when None, fall back...") collapsed
across the storage / archive / bandit dispatch sites. The
backwards-compat-shim apology comments tightened to one line each.

Tests + entry-point + configs (~-340 LOC across the test tree,
run.py, pyproject.toml, .gitignore). Test docstrings collapsed,
internal-slug "covers C3" / "T-A4" references stripped, narrative
about prior bypass behaviour replaced with what-the-test-asserts-now.
pyproject.toml banned-import policy block tightened; every TID251
per-file-ignore verified live (zero stale entries). .gitignore left
alone (no narrative to strip).

Code-smell pattern surfaced across 5 files (NOT fixed in this
sweep; flagged for a follow-up): the `if self._dataplane is not
None: return await self._<op>_via_dataplane(...)` dispatch shape
duplicates in redis_program_storage.atomic_state_transition,
fast_state_transition, batch_transition_state, batch_transition_by_ids,
and archive_storage.add_elite. A small dispatch_if_dataplane helper
or a coordinator-mixin would centralise the branch + assert. A
sibling pattern across prompts/fetcher.py, coevolution/stats.py,
coevolution/sync.py for lazy-init owned/borrowed DataPlane could
deduplicate via an OwnedDataPlane container.

Out-of-lane finding (not fixed): errors.all_error_types() omits
EliteInvalidError from its tuple but the test only checks subclassing,
not exhaustiveness. One-line fix; counted as behavior change; left
for a follow-up.
Net -32 LOC across 13 files. No behavioral changes, no test count
delta — 2841 focused tests + 4789 full sweep still pass.

Three parallel cleanup lanes worked the branch diff against main
with a focus on vacuous diffs (renames without semantic change,
reflowed messages with identical meaning, function extractions
that did not aid testing).

Dataplane substrate
  test_smoke.py: test_method_stubs_raise_notimplemented renamed
  to test_public_methods_callable (the old name asserted a stub-era
  NotImplementedError that the body no longer checked; the body
  comment explicitly contradicted the name). Module docstring
  reflow archeology trimmed in test_no_silent_broad_except,
  test_models, test_ids. Two back-references stripped from
  test_transition_state ("WATCH/MULTI/EXEC path", "Compat fields
  for non-coordinator readers"). ClaimState docstring no longer
  names the migration-bus consumer (zero in-tree references).
  make_actor_id soft-deprecation hint removed.

Migrated callers
  Four vacuous renames reverted:
  - mutation_operator.py: kwargs->positional flip on record_outcome
    restored to kwargs (no readability benefit from positional).
  - utils/trackers/__init__.py: four docstring rewordings ("Build"
    -> "Initialize", "fans out writes" -> "writes to multiple
    backends") rolled back.
  - redis_program_storage.py: __all__ inline-to-three-line reformat
    rolled back.
  - locking.py: re-ordered attribute initialisations restored.
  Dispatch-shape audit across the 5 dataplane-dispatch sites
  confirmed uniform; no helper introduced.

Tests + run.py + configs
  No edits needed. run.py / pyproject.toml / config YAMLs already
  pass the vacuous-diff test post first cleanup pass — every
  surviving comment describes present behaviour or load-bearing
  WHY. TID251 per-file-ignores all verified live against current
  import sites (zero stale entries).
Drop the dedicated _phase2_substrate.md and fold its enumeration of
deliberately-kept-but-unused names directly into the package docstring
at gigaevo/dataplane/__init__.py: lwwr_set / lwwr_get + Lua,
HlcTimestamp, the lattice classes, and the 13 NewType identifiers
covering events, streams, consumer groups, and causation tracking.

The keep-list now lives where Python tooling and IDE introspection
already surface it, removing the indirection through an out-of-band
markdown file.
@GrigoryEvko

Copy link
Copy Markdown
Author

3. Каталог классов дефектов старой реализации

В рамках настоящего PR идентифицировано четырнадцать структурных классов дефектов, каждый из которых наблюдался в исторической реализации в нескольких независимых точках. Ниже приводится перечень с указанием конкретных представителей и статуса закрытия в текущем PR.

Класс №1: потеря обновлений при read-modify-write

Историческое проявление. Несколько точек кода применяли паттерн «прочитать JSON-блоб из Redis, изменить в Python, записать обратно через SET». Между чтением и записью отсутствовала какая-либо синхронизация. Два конкурентных вызова record_outcome для одного и того же prompt_id, две конкурентные транзакции program.state = QUEUED, два конкурентных писателя archive_storage.add_elite для одной и той же ячейки приводили к потере одного из обновлений.

Конкретные точки кода в версии до рефактора.

  • gigaevo/prompts/fetcher.py:547-584: get → mutate → set на ключе {main_redis_prefix}:prompt_stats:{prompt_id} (JSON-блоб со всеми статистиками).
  • gigaevo/programs/program.py:139: инкремент atomic_counter через INCR, но дальнейшее слияние через update() страдало от read-modify-write.
  • gigaevo/llm/bandit.py: реестр многоруких бандитов целиком в процессной памяти; межпроцессное разделение отсутствовало.
  • gigaevo/evolution/storage/archive_storage.py:222-226: цикл while True: WATCH; GET; compute; SET; EXCEPT WatchError: continue без верхней границы по количеству попыток.

Закрытие в текущем PR. Каждая точка кода смигрирована на атомарную операцию в составе EVALSHA. Для словаря подсказок применяется HINCRBY на полях trials, successes, metrics_count и HINCRBYFLOAT на полях m:<metric_key>, с защитой бюджета через LPUSH + LTRIM 0 19. Для архива применяется атомарный CAS archive_swap.lua с реверс-индексом. Для бандита применяется G-counter в виде хеша bandit:trials:{arm} с обновлением через HINCRBY {hash} {actor} (актор формируется из (run_id, worker_id) и обеспечивает межпроцессную агрегацию через сумму по полям). Для всех переходов состояний программы применяется transition_state.lua с проверкой expected_from, ленивой записью idempotency_token через HSETNX поверх {prefix}:program:{pid}:idem, и одношаговым сетевым раундом.

Класс №2: безграничные циклы повторов WATCH/MULTI/EXEC

Историческое проявление. Цикл while True на исключении WatchError создавал потенциал нарастающей нагрузки на Redis при высокой конкуренции. Помимо этого, при систематической контентной перезаписи (например, два бандита, постоянно записывающие в одну и ту же ячейку архива) цикл мог не сходиться вовсе.

Конкретные точки кода. archive_storage.py:222-226, redis_program_storage.atomic_state_transition (в версии до рефактора).

Закрытие. WATCH-циклы заменены атомарным Lua-скриптом, выполняющимся в одном раунде. Где Lua полностью покрывает семантику (одиночный CAS, инкремент эпохи, обновление обратного индекса), цикл повторов как таковой исключён. Где Lua покрывает основной путь, но резервный путь остаётся на WATCH (SumArchiveSelector через tiebreak по score покрывается атомарно, а ParetoFrontSelector через многомерное сравнение требует WATCH-цикла), вводится верхняя граница _WATCH_MAX_ATTEMPTS = 50 с явной типизированной ошибкой Err(DataPlaneError) по её исчерпании.

Класс №3: продвижение курсора до захвата

Историческое проявление. RedisStreamTransport.consume в версии до рефактора инкрементировал локально хранимый курсор last_id до выполнения SETNX-захвата сообщения. При выигрыше другого потребителя в гонке SETNX проигравший потребитель уже сдвинул курсор, и сообщение оставалось без подтверждения через XACK, не доходило до повторной попытки данного потребителя, а резервный путь через истечение TTL SETNX-захвата не мог его восстановить, поскольку курсор уже миновал.

Конкретные точки кода. gigaevo/evolution/bus/transport.py:164,182.

Закрытие. Полное закрытие данного класса отнесено к фазе 2 общего плана программы изоляции. Транспорт шины сообщений сохранён в неизменённом виде на текущем этапе; миграция на dp.consume с группами потребителей и XACK предусмотрена в фазе 2. Текущий PR расширяет субстрат необходимыми примитивами (StreamName, ConsumerGroup, ConsumerName, EventId, CausationId, CorrelationId) и сохраняет их через каталог-оглавление в разделе 15.

Класс №4: отсутствие XACK и групп потребителей

Историческое проявление. XREAD с ручным сохранением последнего ID в отдельном хеше реализовывал ad-hoc потребителя; переключение при отказе между потребителями требовало отдельной координации через SETNX. Отсутствие XACK означало отсутствие явного контракта «сообщение обработано».

Закрытие. Аналогично классу №3, отнесено к фазе 2. Субстрат ConsumerGroup и ConsumerName подготовлен.

Класс №5: дрейф схемы при перекатывающем обновлении

Историческое проявление. Pydantic-модели Program, ProgramStageResult, MigrantEnvelope, prompt_stats, archive entries, run_state hash сохранялись в Redis без явного поля schema_version. Развёртывание новой версии кода поверх старого состояния в Redis приводило к тихой деградации: новые поля заполнялись значениями по умолчанию, удалённые поля игнорировались, изменённые семантики прокладывались без миграционной фазы.

Закрытие. Полное закрытие отнесено к фазе 2 общего плана программы изоляции. Подпространство upcasters (gigaevo/dataplane/upcasters/) подготовлено в виде директории-маркера; в каталоге-оглавлении раздела 15 зарегистрирована задача введения schema_version на каждую персистируемую модель и реестра upcasters. Текущий PR использует поле content_hash исключительно как идемпотентный ключ дедупликации в transition_state.lua через compute_content_hash_hex, и не привязывает его к версии схемы.

Класс №6: десериализация pickle как примитив RCE

Историческое проявление. ProgramStageResult.from_dict применял cloudpickle.loads(b64decode(value.encode())) к строковому полю output. Любой процесс, способный записать в ключ {prefix}:program:*, получал произвольное исполнение Python-кода в каждом читающем процессе. Тот же b64-pickle путь применялся внутри миграционного конверта шины сообщений MigrantEnvelope.

Конкретные точки кода. gigaevo/programs/core_types.py:164-178, gigaevo/programs/program.py:213 (исторически), gigaevo/programs/utils.py:15 (b64-pickle утилита), gigaevo/programs/stages/python_executors/wrapper.py:246,398, gigaevo/programs/stages/python_executors/exec_runner.py:267,285.

Закрытие. Фаза 1: введена блокировка pickle.loads и cloudpickle.loads через ruff TID251 с белым списком только тех файлов, где граница доверия локальна (wrapper.py и exec_runner.py представляют собой подпроцессный IPC между процессами одного движка, не загружают данные из Redis). Точка кода programs/utils.py:15 помечена инлайн-комментарием # noqa: TID251 с явной ссылкой на фазу 3, где предусмотрено её полное удаление за счёт миграции content_hash на канонический JSON. Любая новая точка вызова pickle.loads или cloudpickle.loads, добавленная после настоящего PR, обнаруживается линтером и отвергается на этапе CI.

Класс №7: синхронный Redis внутри асинхронного пути

Историческое проявление. prompts/fetcher.GigaEvoArchivePromptFetcher использовал внутри асинхронных вызовов синхронный клиент redis.Redis, что приводило к блокировке потока цикла событий на длительность сетевого раунда.

Закрытие. Словарь подсказок полностью смигрирован на асинхронный путь через DataPlane.crdt_inc, DataPlane.bounded_list_push, DataPlane.set_add, DataPlane.set_members, DataPlane.raw_hash_get и сопутствующие методы. Метод record_outcome стал асинхронным напрямую, без обёртки asyncio.to_thread. Из словаря подсказок удалены три записи TID251 в белом списке pyproject.toml: gigaevo/prompts/fetcher.py, gigaevo/prompts/coevolution/stats.py, gigaevo/prompts/coevolution/sync.py.

Смежная работа. Класс родственных дефектов асинхронного сетевого слоя на длительных прогонах закрывается PR #17 «fix(infra): replace httpx with aiohttp + requests for long-running asyncio stability»: прямое использование httpx заменяется парой aiohttp (асинхронный путь) и requests + urllib3 (синхронный путь), с ограниченными значениями ClientTimeout(total=300s, connect=15s, sock_read=300s), keepalive_timeout=20s (ниже типового порога LB), limit_per_host=100, ретрай-политикой urllib3 с исключением POST. Закрывает накопление неосвобождённых соединений в httpcore async semaphore на горизонте часов.

Класс №8: модульные синглтоны, выживающие через Hydra instantiate

Историческое проявление. Модульные переменные _tb_default, _wb_default, _redis_default в gigaevo/utils/trackers/__init__.py устанавливались при первом вызове init_tb, init_wandb, init_redis и сохранялись на протяжении жизни процесса. При повторных запусках экспериментов в одном процессе (типичный сценарий Hydra --multirun) второй запуск получал клиент, сконфигурированный для первого. Аналогично, default_exec_runner_pool в gigaevo/programs/stages/python_executors/wrapper.py оборачивался декоратором @lru_cache(maxsize=1), что создавало кеш-связь между пулом и первым событийным циклом. Повторный запуск получал пул, привязанный к закрытому циклу.

Закрытие. Указанные синглтоны устранены. Каждый вызов init_tb, init_wandb, init_redis, init_tb_redis, init_wandb_redis теперь конструирует свежий экземпляр GenericLogger. Декоратор @lru_cache(maxsize=1) снят с default_exec_runner_pool; владение пулом передано через ContextVar, который устанавливается при запуске run_experiment и сбрасывается в finally независимо от пути ошибки.

Смежная работа. Параллельный PR #14 «refactor(python_executors): replace WorkerPool with loky reusable executor» полностью заменяет ручной WorkerPool на loky.get_reusable_executor, что устраняет тот же класс дефектов (привязка пула к первому циклу событий через lru_cache поверх asyncio.Queue/asyncio.Lock) более радикальным средством, а заодно закрывает: отсутствие переработки воркеров, TOCTOU symlink-swap на пути spill-файла (RCE-вектор), невычищение stderr из подпроцесса (утечка RSS), молчаливый retry при тайм-ауте, фильтрацию переменных окружения от утечки секретов (AWS_*, OPENAI_API_KEY, LANGFUSE_SECRET_KEY, WANDB_API_KEY, STRIPE_* и др.) в подпроцесс пользовательского кода. После интеграции с nightly настоящий PR принимает имплементацию из #14 целиком (см. раздел 9.4 интеграционного коммита).

Класс №9: широкий except Exception, маскирующий сбои

Историческое проявление. Многочисленные точки кода использовали голый except: или except Exception: без явного контракта на конвертацию в типизированный возврат. Сбои тихо проглатывались, метрики деградировали бесшумно, отладка пост-фактум усложнялась отсутствием трассировки стека.

Закрытие в субстрате. Внутри gigaevo/dataplane/ введён AST-инспектор tests/dataplane/test_no_silent_broad_except.py, обходящий все .py файлы субстрата (за исключением тестового дерева) и отвергающий:

  • Голый except: любого вида.
  • except Exception или except BaseException без явного маркера # noqa: BLE001 на той же строке (либо в пределах диапазона lineno..end_lineno при многострочной обёртке кортежем).

Каждая граница конвертации в Result явно помечается маркером, и каждый такой маркер виден рецензенту при чтении кода. Дисциплина за пределами субстрата соблюдается на уровне условности; распространение AST-инспектора на остальные модули предусмотрено в последующих PR.

Смежная работа. Несколько параллельных PR закрывают конкретные проявления данного класса в зонах вне субстрата DataPlane:

  • PR #15 «fix: preserve error context across stage / runner / validation paths» восстанавливает цепочку __cause__ в четырёх точках: optimization/utils.evaluate_single и optuna/stage сохраняют полный stderr вместо rsplit("\n", 1)[-1]; _apply_modifications добавляет raise ... from e; ValidateCodeStage восстанавливает структурные атрибуты SyntaxError (.text, .lineno, .offset); DagScheduler использует logger.exception(...) вместо logger.error("...: {}", e).
  • PR #13 «feat(llm): add LLM call outcome classifier / state machine» вводит gigaevo/llm/call_outcome.py с тотальной функцией classify_call_result(exc, *, model_name=None), обходящей цепочку __cause__/__context__ с защитой от циклов и возвращающей LLMCallResult с одним из 12 вариантов LLMCallOutcome. Прямой ответ на ad-hoc-обработку исключений LLM-вызовов в bandit/router и общий словарь дисциплины «классифицировать причину сбоя, а не оборачивающее исключение».
  • PR #12 «fix(bandit): skip reward updates for unknown arm names» добавляет проверку членства arm_name in self._bandit.arms в начале on_mutation_outcome; раньше неизвестное имя руки (например, из Redis-снапшота старого роутера) поднимало KeyError и обрывало callback агента.

Класс №10: дубликаты сущностей через отсутствие идемпотентности

Историческое проявление. Memory API POST-запросы и эмиссии миграционных конвертов не сопровождались идемпотентными ключами. Повторная доставка приводила к дубликатам.

Закрытие. Идемпотентность встроена в transition_state.lua через поле idempotency_token = compute_content_hash_hex({"pid": pid, "from": from, "to": to, "patch": patch_dict}, schema_version=1). Lua-скрипт проверяет наличие токена через HEXISTS {prefix}:program:{pid}:idem {token} до выполнения мутации; повторный вызов с тем же токеном возвращает кешированный результат предыдущего успешного перехода. Дедупликация Memory API относится к фазе 2.

Класс №11: устаревший кеш возвращается как авторитетный

Историческое проявление. archive_storage._elite_cache поддерживал локальный словарь cell -> Program, обновлявшийся при удачных add_elite. Кеш десинхронизировался с Redis при одновременных операциях из соседних процессов. Аналогичный паттерн наблюдался в prompts/fetcher._current_pack: одна задача перезаписывала пакет, который другая задача только что выбрала через сэмплирование.

Закрытие. Локальный кеш _elite_cache устранён полностью; каждое чтение архива идёт в Redis. Свежесть гарантируется через Versioned[T] и допустимый порог Freshness. Для словаря подсказок переменная _current_pack перенесена из атрибута экземпляра в ContextVar, что обеспечивает изоляцию между конкурентными мутациями в одной задаче.

Класс №12: путаница bytes против str на границах потоков

Историческое проявление. RedisStreamTransport использовал decode_responses=False, что приводило к гибридным bytes/str представлениям полей внутри MigrantEnvelope.from_stream_fields. Любая забытая декодировка приводила к TypeError или (что хуже) к молчаливой записи bytes в JSON.

Закрытие. В субстрате DataPlane.RedisConnection устанавливает decode_responses=True императивно, без возможности конфигурационного отключения. Гарантия типов обеспечивается на уровне конструктора пула. Транспорт шины сообщений сохраняет историческое поведение; миграция на DataPlane отнесена к фазе 2.

Смежная работа. Дополнительный класс граничных проблем кодирования закрывается PR #10 «fix: sanitize LLM-derived text before logging, JSON serialization, and database writes»: вводится gigaevo/utils/text_sanitize.py с пятью чистыми функциями str -> str (sanitize_for_log, sanitize_for_json, sanitize_for_dbtext, clean_identifier, deep_sanitize_for_json), применяемыми на границах сериализации в Redis Streams, orjson, asyncpg, loguru-sinks и обратно в LLM-промпты. Закрывает классы дефектов: одиночные UTF-16 суррогаты в stage error роняют orjson.dumps и обрывают запись Program в Redis; NUL в трассировке отвергается asyncpg; ANSI-escape от nvcc/clang развращает читатели логов; CR пропускает многострочные LLM-обоснования через подделку строк лога; BIDI-overrides скрывают содержимое от ревью. После интеграции с nightly настоящий PR применяет sanitize_for_log в fetcher.py, coevolution/stats.py и bandit.py (см. раздел 9.3 и интеграционный коммит).

Класс №13: смешение провенанса значений

Историческое проявление. Программы, прочитанные локально из Redis, прочитанные из кеша, восстановленные из повтора событий и доставленные через гипотетическую gossip-репликацию, имели одинаковый Python-тип Program. Никакая статическая проверка не отличала «локально свежий» от «кешированного, возможно устаревшего» или «полученного через повтор из прошлого». Логика, требующая локальной свежести, могла случайно принять кешированное значение, и сбой не обнаруживался до момента запуска.

Закрытие. Введён фантомный конструктор Sourced[T, S] с шестью алиасами: LocalValue[T] = Sourced[T, Literal["local"]], CachedValue[T], ReplayedValue[T], GossipedValue[T], ExternalValue[T], SanitizedValue[T]. Метод dp.read_program оборачивает возвращаемое значение в LocalValue[Versioned[ProgramSnapshot]] явно. mypy в строгом режиме отвергает попытку передать CachedValue[Program] в функцию, требующую LocalValue[Program]. Время выполнения дополнительной стоимости не создаёт: Sourced хранит единственное поле value. Дальнейшая интеграция в crdt_read, lwwr_get, bounded_list_range подготовлена и отнесена к последующим PR.

Класс №14: незаконные переходы FSM

Историческое проявление. Восемь независимых точек кода модифицировали program.state напрямую: database/state_manager.py:108, database/redis_program_storage.py:563, database/redis_program_storage.py:637, database/redis_program_storage.py:751, evolution/engine/core.py:554, evolution/engine/steady_state.py:439, evolution/bus/node.py:112, runner/dag_runner.py:440. Часть точек кода сопровождала мутацию проверкой validate_transition, часть пропускала проверку. Возможность одновременного перехода DONE → QUEUED и RUNNING → DONE на одной программе сохранялась.

Закрытие. FSM-таблица PROGRAM_STATE_TRANSITIONS хранится в Redis под ключом {prefix}:fsm:program_state и загружается во время dp.startup() через load_fsm_table. transition_state.lua производит серверную валидацию каждого перехода через HGET {fsm_table} {from} и сопоставление со строкой допустимых целевых состояний. Все восемь точек кода либо смигрированы на маршрутизацию через dp.transition_program_state, либо переоформлены как явные именованные обходы (register_external_terminal_state для входящих миграционных конвертов в bus/node.py:120). Полное произведение 4 × 4 = 16 пар покрывается параметризованным тестом test_fsm_exhaustive.py. Все попытки незаконного перехода возвращают Err(TransitionError(kind="illegal")).

4. Аудит конкретных дефектов

Параллельно с классификацией структурных классов дефектов проведена серия из шести параллельных аудитов кода, выполненных независимыми инспекторами. Каждый инспектор работал в своей зоне и выдал перечень конкретных находок с координатами file:line и оценкой по серьёзности.

4.1. Сводка по аудитам

Общее количество находок: ориентировочно 114 индивидуальных дефектов. Распределение по аудиторам:

Аудит Зона Находки Серьёзность
a0903099 DAG, state-машина, программы, core_types 34 5 HIGH, 8 MEDIUM, 21 LOW
afcbae21 Эволюция, селекторы, шина, архив 15 6 HIGH, 5 MEDIUM, 4 LOW
a5961edd Сериализация, кодеки 13 5 HIGH, 4 MEDIUM, 4 LOW
ab074233 Подсказки, коэволюция 15 3 HIGH, 7 MEDIUM, 5 LOW
a032c213 Трекеры, коллекторы 20 9 HIGH, 6 MEDIUM, 5 LOW
a4956107 Ideas-tracker, A-MEM 14 6 HIGH, 5 MEDIUM, 3 LOW
a9f7bc32 Hydra-конфигурационный слой 3 + 1 бонус 2 HIGH, 1 MEDIUM, 1 MEDIUM

4.2. Полностью устранённые дефекты

Настоящий PR полностью устраняет следующие конкретные находки.

P1 (ab074233): gigaevo/prompts/fetcher.py:547-584. Гонка потери обновления на ключе prompt_stats:{prompt_id} через паттерн get → mutate → set. Устранение: переход на атомарные HINCRBY/HINCRBYFLOAT/LPUSH+LTRIM через DataPlane.crdt_inc, DataPlane.bounded_list_push, DataPlane.set_add. Коммит: ee87556, a0e9f8f.

P2 (ab074233): gigaevo/prompts/fetcher.py:470-494. Перекрёстное влияние между конкурентными мутациями через атрибут _current_pack. Устранение: перенос _current_pack из атрибута экземпляра в модульный ContextVar. Каждая конкурентная задача asyncio получает изолированную копию. Коммит: ee87556.

F9 (afcbae21): gigaevo/evolution/storage/archive_storage.py:184-219. Быстрый путь сравнения с кешированным current_prog приводил к отвержению объективно лучшего кандидата при устаревшем кеше. Устранение: кеш _elite_cache удалён полностью; обмен ячейки производится через атомарный archive_swap.lua с серверным сравнением скоров и tiebreak-битом. Коммит: 8a8f247, 983ed35.

Finding 2 (a9f7bc32): gigaevo/config/resolvers.py:43,47,48. Регистрация резолвера eval равной builtins.eval предоставляла полноценную поверхность произвольного исполнения кода через любую интерполяцию ${eval:...} в YAML или CLI-override. Резолверы merge и len имели нулевое количество точек вызова. Устранение: все три регистрации удалены. Любой ${eval:...} теперь возвращает ошибку OmegaConf на этапе резолюции. Коммит: b25c307.

Singleton #3 (a032c213): gigaevo/utils/trackers/__init__.py:8-10,17-21,34-38,56-62. Три модульных синглтона _tb_default, _wb_default, _redis_default выживали через Hydra instantiate, что приводило к возврату клиента, сконфигурированного для первого запуска эксперимента, во второй запуск в том же процессе. Устранение: глобальные переменные удалены; фабрики конструируют свежий экземпляр на каждый вызов. Регрессионный тест test_init_*_returns_fresh_per_call фиксирует инвариант. Коммит: b25c307.

4.3. Частично устранённые дефекты

Следующие находки частично закрываются текущим PR. Полное закрытие требует миграции конкретного потребителя на субстрат, который данный PR предоставляет.

#8 (a0903099): gigaevo/programs/core_types.py:24-26. content_hash использует cloudpickle.dumps(self.model_dump()) с усечением до 64 бит. Субстрат compute_content_hash_hex существует в gigaevo/dataplane/codec.py и применяет канонический JSON с сортировкой ключей + sha256 + 16 шестнадцатеричных символов. Миграция потребителя в core_types.py:24 отнесена к фазе 3 общего плана программы изоляции.

#9 (a0903099): gigaevo/programs/core_types.py:164-178. Pickle-десериализация произвольных строк. Линтер TID251 блокирует появление новых точек вызова; существующая точка кода помечена явно. Полное удаление отнесено к фазе 3.

#18 (a0903099): gigaevo/database/state_manager.py:127-128. Гонка вытеснения словаря блокировок. При маршрутизации переходов через dp.transition_program_state атомарность гарантируется серверной Lua, и внутрипроцессная блокировка более не несёт критическую нагрузку. При unwired-режиме (dataplane=None) гонка сохраняется как обратносовместимое поведение.

#20 (a0903099): gigaevo/database/state_manager.py:108-117. Дрейф состояния в памяти при сбое записи. Закрывается при wired-режиме через атомарную мутацию блоба и множеств статусов в одном EVALSHA. Зеркало в памяти через set_in_memory_state валидирует пару (current, target) против FSM-таблицы перед мутацией.

#25 (a0903099): gigaevo/programs/program.py:139-142. Потеря обновления atomic_counter при конкурентном merge. Закрывается для FSM-переходов через transition_state.lua (атомарный INCR). Для остальных путей записи (write_exclusive, update) рекомендуется завершить миграцию в фазе 2.

#34 (a0903099): gigaevo/programs/program_state.py:28-42. Переход DONE → QUEUED совместно с эвикцией lock создавал условия гонки между задачами-утечками от предыдущего запуска и текущим запуском. Закрывается через атомарную Lua-валидацию при wired-режиме.

F8 (afcbae21): archive_storage.py:222-226. Утечка обратного индекса при повторной попытке после WatchError. Закрывается для пути SumArchiveSelector через атомарный dp.try_replace_elite. Для ParetoFrontSelector сохраняется WATCH-цикл с верхней границей в 50 попыток и явным Err(DataPlaneError) по её исчерпании.

F13 (afcbae21): gigaevo/database/merge_strategies.py:25. atomic_counter инкрементируется на каждую запись, не на каждую логическую правку. Закрывается для FSM-переходов через атомарный Lua-инкремент; для прочих путей слияние через merge_programs сохраняет историческое поведение.

#2 (a5961edd): gigaevo/programs/utils.py:14. pickle_b64_deserialize без валидации длины и формата. Закрывается только в части блокирования появления новых точек вызова; существующая точка кода помечена на удаление в фазе 3.

#11 (a5961edd): gigaevo/programs/core_types.py:26. StageIO.content_hash использует cloudpickle.dumps(model_dump()). Аналогично #8: субстрат существует, потребитель ожидает миграции.

P8 (ab074233): gigaevo/prompts/coevolution/stats.py:96-101. Отсутствие тайм-аута сокета на асинхронном Redis-клиенте. Закрывается при маршрутизации через DataPlane.RedisConnection, который устанавливает socket_timeout_s=30.0 по умолчанию. Унаследованный путь ленивой инициализации в обратносовместимом режиме сохраняет историческое поведение.

P15 (ab074233): gigaevo/prompts/coevolution/stats.py:92-102. Асинхронные aioredis.Redis клиенты никогда не закрывались. Закрывается при wired-режиме через жизненный цикл DataPlane. Унаследованный путь сохраняет утечку.

#1 (a032c213): run.py:92. Порядок завершения: писатель → движок → исполнитель. Исправление M3/M4 в коммите 9aff248 ввело внутренний try/finally, гарантирующий выполнение engine.stop() и runner.stop() даже при исключении между запуском и serve_until_signal. Идемпотентность stop() проверена.

#13 (a032c213): composite.py:54-59. CompositeLogger.close() не учитывал совместное использование синглтонов. Закрывается через удаление синглтонов в фазе 0.

Finding 1 (a9f7bc32): gigaevo/config/resolvers.py:23-25. _ref_resolver мутирует исходный конфиг через присваивание parent[base] = instantiated_node. Закрывается частично через унификацию всех плейн-интерполяций ${X} в ${ref:X} в 14 YAML-файлах (закрывает следствие двойной инстанциации ProblemContext). Сам механизм мутации сохраняется до фазы 3 (полное удаление резолвера).

Finding 3 (a9f7bc32): regex резолвера ${ref:obj::attr}. Крайние случаи в полу-мутированном дереве при ошибке доступа к атрибуту закрываются по той же логике: триггерные пути устранены унификацией, сам механизм сохраняется до фазы 3.

4.4. Сводная оценка элиминации

Категория Количество
Полностью устранены 5
Частично закрыты (закрытие зависит от wired-режима или ограничено блокированием новых точек вызова) 14
Не закрыты в данном PR (отнесены к фазам 2, 3 или к зонам вне субстрата DataPlane) около 95

Стоит подчеркнуть, что значительная часть «незакрытых» дефектов относится к зонам, не входящим в круг ответственности DataPlane: внутренний параллелизм DAG, селекторы эволюции, бэкенды трекеров tensorboard/wandb, A-MEM ideas-tracker, транспорт шины сообщений (фаза 2). Для каждой зоны, входящей в круг DataPlane, проведено полное закрытие класса либо частичное закрытие с явным указанием отложенной части в каталоге keep-list раздела 15.

@GrigoryEvko

Copy link
Copy Markdown
Author

5. Архитектура решения

5.1. Структура пакета

gigaevo/dataplane/
├── __init__.py              # Публичная поверхность пакета.
├── coordinator.py           # Класс DataPlane: единая точка взаимодействия.
├── connection.py            # RedisConnection: жизненный цикл пула.
├── scripts.py               # LuaRegistry: реестр + NOSCRIPT recovery.
├── scripts/                 # Каталог Lua-скриптов.
│   ├── transition_state.lua
│   ├── archive_swap.lua
│   ├── instance_lock_acquire.lua
│   ├── instance_lock_renew.lua
│   ├── instance_lock_release.lua
│   ├── counter_inc.lua
│   ├── lwwr_set.lua
│   └── bounded_list_push.lua
├── codec.py                 # Канонический JSON + content_hash_hex.
├── permissions.py           # Token[Tag] + mint_root/mint_split/EngineRoot.
├── models.py                # Result, Versioned, Sourced, Freshness, HlcTimestamp.
├── errors.py                # Типизированная иерархия исключений.
├── transitions.py           # FSM-таблицы (ProgramState, ClaimState, LockState).
├── crash.py                 # OneShotFlag, CrashEvent, CrashWatchedHandle.
├── lattices.py              # BoolLattice, EpochLattice, GenerationLattice и др.
├── ids.py                   # NewType-словарь: ProgramId, EventId и т.д.
├── engine_startup.py        # Функции wire_* для повторного связывания после Hydra.
└── tests/                   # Тестовое дерево, в т.ч. AST-инспектор.

Подпространство upcasters/ зарегистрировано как директория-маркер; реализация upcasters отнесена к фазе 2.

5.2. Публичная поверхность DataPlane

Класс DataPlane экспортирует следующие методы.

Жизненный цикл:

  • await dp.startup(): устанавливает пул, регистрирует Lua-скрипты, загружает FSM-таблицы в Redis. Идемпотентен. Защищён ленивой asyncio.Lock.
  • await dp.shutdown(): освобождает ресурсы. Идемпотентен.

Программы и FSM:

  • await dp.transition_program_state(pid, *, token, expected_from, to, patch, deadline_monotonic) -> Result[Versioned[ProgramSnapshot], TransitionError].
  • await dp.transition_program_state_batch(items, *, deadline_monotonic) -> Result[BatchTransitionOutcome, DataPlaneError].
  • await dp.read_program(pid, *, freshness=FreshnessEventual()) -> Result[LocalValue[Versioned[ProgramSnapshot]] | None, DataPlaneError].

Блокировки:

  • await dp.acquire_instance_lock(key, *, ttl_s, deadline_monotonic) -> Result[InstanceLease, LockHeld].
  • await dp.renew_instance_lock(lease, *, ttl_s) -> Result[InstanceLease, LockLost].
  • await dp.release_instance_lock(lease) -> None.
  • dp.wrap_lease(lease) -> CrashWatchedHandle[InstanceLease, str, InstanceLease].

Архив элит:

  • await dp.try_replace_elite(cell, candidate_id, *, token, candidate_score, tiebreak_bit, deadline_monotonic) -> Result[EliteSwapOutcome, EliteInvalidError].

CRDT-счётчики:

  • await dp.crdt_inc(key, *, actor, delta, token, deadline_monotonic) -> Result[int, DataPlaneError].
  • await dp.crdt_read(key, *, freshness=FreshnessEventual()) -> Result[Versioned[int], StaleReadError].

LWW-регистр:

  • await dp.lwwr_set(key, value, *, hlc) -> Result[LwwrSetOutcome, DataPlaneError].
  • await dp.lwwr_get(key) -> Result[LwwrValue | None, DataPlaneError].

Списочные и хеш-примитивы (escape-hatch для миграционных вертикалей):

  • await dp.bounded_list_push(key, value, *, cap) -> Result[int, DataPlaneError].
  • await dp.bounded_list_range(key, *, count) -> Result[list[Any], DataPlaneError].
  • await dp.set_add(key, value) -> Result[int, DataPlaneError].
  • await dp.set_members(key) -> Result[set[str], DataPlaneError].
  • await dp.raw_get(key) -> Result[str | None, DataPlaneError].
  • await dp.raw_hash_get(key, field) -> Result[str | None, DataPlaneError].
  • await dp.raw_hash_values(key) -> Result[list[str], DataPlaneError].

5.3. Иерархия типов

DataPlaneError                  (база; голый Exception)
├── _StructuredError            (примесь для исключений на базе dataclass)
│   ├── StartupError
│   ├── ShutdownError
│   ├── NotStartedError
│   ├── DeadlineExceeded
│   ├── ScriptLostError
│   ├── ScriptNotRegisteredError
│   ├── TransitionError         (kind in {stale, illegal, duplicate, invalid, unknown})
│   ├── StaleReadError
│   ├── EliteInvalidError
│   ├── LockHeld
│   ├── LockLost
│   ├── TokenAlreadyConsumed
│   ├── TokenNotPickleable
│   ├── TokenTagCollisionError
│   ├── CanonicalEncodingError
│   ├── ContentHashMismatchError
│   ├── SchemaVersionMissingError
│   └── UpcasterMissingError

Каждый структурированный класс является @dataclass(slots=True, frozen=True). Поля, помеченные field(metadata={REDACT_META_KEY: True}), заменяются в строковом представлении на <redacted>, чтобы избежать утечки чувствительных значений (holder в LockHeld, tag_repr в TokenAlreadyConsumed).

Метаклассовая проверка __init_subclass__ запрещает использование изменяемых контейнеров (list, dict, set, bytearray) в полях структурированных исключений: типобезопасность экземпляров требует стабильности хеша.

5.4. Result и идиома match

Возвращаемые значения координатора имеют форму:

type Result[T, E] = Ok[T] | Err[E]

@dataclass(slots=True, frozen=True)
class Ok[T]:
    value: T

@dataclass(slots=True, frozen=True)
class Err[E]:
    error: E

Пример вызова:

match await dp.transition_program_state(pid, token=tok, expected_from=QUEUED, to=RUNNING, patch=p):
    case Ok(value=versioned):
        new_epoch = versioned.epoch
        ...
    case Err(error=TransitionError(kind="illegal", detail=msg)):
        ...
    case Err(error=TransitionError(kind="duplicate")):
        ...

mypy в строгом режиме обеспечивает проверку исчерпываемости блоков match. Шаблон применим в каждой точке вызова координатора.

5.5. Линейные токены Token[Tag]

Token[Tag] представляет собой линейный типизированный свидетель разрешения на запись в определённое подпространство. Реализация обеспечивает:

  • Запрет копирования: __copy__ и __deepcopy__ поднимают TokenNotPickleable.
  • Запрет сериализации: __reduce__ и __getstate__ поднимают TokenNotPickleable.
  • Запрет наследования с переопределением вышеуказанных методов: проверка через __init_subclass__.
  • Одноразовое потребление: consume() устанавливает флаг _consumed; повторный вызов поднимает TokenAlreadyConsumed.

Функции выпуска:

  • mint_root(tag: Tag) -> Token[Tag]: создаёт корневой токен.
  • mint_split(parent: Token[ParentTag], *, child_tag: Tag) -> Token[Tag]: потребляет parent и возвращает дочерний токен с новым тегом. parent становится недоступным.
  • mint_split_n(parent, *, child_tags) -> tuple[Token[T1], ...]: ветвление на N потомков.

Подсистема EngineRoot агрегирует три корневых токена: Token[ProgramId], Token[CellKey], Token[CounterKey]. Соответствующие методы split_program_token(pid), split_cell_token(cell), split_counter_token(key) обеспечивают атомарную ротацию корня и выдачу потомка на каждый вызов.

5.6. Versioned и Freshness

Свежесть значений сопровождается решёточной структурой Versioned[T]:

@dataclass(slots=True, frozen=True)
class Versioned[T]:
    value: T
    epoch: int
    generation: int

    def is_at_least(self, *, min_epoch: int, min_generation: int) -> bool:
        return self.epoch >= min_epoch and self.generation >= min_generation

Допуск к чтению контролируется через дискриминированный союз Freshness:

@dataclass(slots=True, frozen=True)
class FreshnessEventual: pass

@dataclass(slots=True, frozen=True)
class FreshnessAtLeast:
    epoch: int = 0
    generation: int = 0

@dataclass(slots=True, frozen=True)
class FreshnessStrict: pass

type Freshness = FreshnessEventual | FreshnessAtLeast | FreshnessStrict

FreshnessEventual() принимает любое значение. FreshnessAtLeast(epoch=N, generation=M) отвергает значения ниже порога через Err(StaleReadError). FreshnessStrict() производит дополнительный сетевой запрос к {prefix}:ts, чтобы получить актуальную верхнюю границу, и принимает только значения с эпохой, равной этой границе.

Обратносовместимая обёртка сохраняет старые kwargs min_epoch= / min_generation=; смешивание новых и старых каналов передачи свежести отвергается на этапе валидации.

5.7. Sourced[T, S] провенанс

Фантомный конструктор Sourced[T, S] сохраняет тип-параметром источник значения. Алиасы:

type LocalValue[T] = Sourced[T, Literal["local"]]
type CachedValue[T] = Sourced[T, Literal["cached"]]
type ReplayedValue[T] = Sourced[T, Literal["replayed"]]
type GossipedValue[T] = Sourced[T, Literal["gossiped"]]
type ExternalValue[T] = Sourced[T, Literal["external"]]
type SanitizedValue[T] = Sourced[T, Literal["sanitized"]]

Метод dp.read_program(pid) возвращает LocalValue[Versioned[ProgramSnapshot]]. Доступ к содержимому: local.value.value. Будущие потребители (проектор повтора фазы 3, уровень кеша фазы 2) выдают значения через соответствующие алиасы.

5.8. CrashEvent, OneShotFlag и wrap_lease

Подсистема обработки потерянного владения подпространством имеет следующую структуру.

OneShotFlag представляет собой однонаправленный сигнал «обнаружен сбой узла-партнёра». Экспортирует:

  • signal() -> None: устанавливает флаг. Идемпотентен.
  • is_set() -> bool: проверяет состояние.
  • await wait() -> None: ожидает установки.

Контракт ограничивает использование одним циклом событий (по умолчанию asyncio.Event), что задокументировано в docstring модуля.

CrashEvent[PeerTag, Resource] представляет собой полезную нагрузку в форме dataclass при сбое:

@dataclass(slots=True, frozen=True)
class CrashEvent[PeerTag, Resource]:
    peer: PeerTag
    resource: Resource
    survivor_tokens: tuple[object, ...] = ()

CrashWatchedHandle[T, PeerTag, Resource] оборачивает внутренний ресурс совместно с OneShotFlag и фабрикой восстановления. На каждом вызове проверяется флаг; при установленном флаге возвращается (None, CrashEvent), при чистом флаге выполняется метод и возвращается (value, None).

Метод dp.wrap_lease(lease) оборачивает InstanceLease через CrashWatchedHandle. Утечка владения блокировкой превращается из внеполосного флага в типизированный возврат, обрабатываемый через match.

5.9. HlcTimestamp

HlcTimestamp хранит упакованное представление гибридного логического времени:

@dataclass(slots=True, frozen=True)
class HlcTimestamp:
    physical_ns: int
    counter: int

    def pack_hex(self) -> str:
        """64 бита physical_ns + 32 бита counter + 32 бита padding => 32 hex chars."""

    @classmethod
    def unpack_hex(cls, s: str) -> HlcTimestamp:
        """Принимает только lowercase 32-char hex для сохранения лексикографического сравнения."""

Лексикографическое сравнение упакованного hex-представления соответствует временно́му порядку, что обеспечивает LWW-семантику в lwwr_set.lua.

5.10. Каталог решёток (Lattices)

Подсистема решёток предоставляет словарь для будущих фаз:

  • EpochLattice (полный порядок на int, join = max).
  • GenerationLattice (аналогично).
  • ProductLattice[L1, L2] (поточечный join).
  • MonotoneLattice[T, Cmp] (полный порядок под Cmp).
  • BoolLattice ({False, True}, join = OR; субстрат для admission-paths фазы 2).

Решётки используются неявно через Versioned (ProductLattice[EpochLattice, GenerationLattice]); явные применения отнесены к фазе 2.

6. Каталог Lua-скриптов

6.1. transition_state.lua

Назначение. Атомарный переход состояния программы с серверной валидацией FSM, слиянием patch в персистированный JSON, обновлением множеств статусов, эмиссией события в поток status_events и идемпотентной дедупликацией через idempotency_token.

Ключи (KEYS):

  1. {prefix}:program:{pid} (JSON blob программы).
  2. {prefix}:events:status (stream).
  3. {prefix}:ts (счётчик эпох).
  4. {prefix}:fsm:program_state (FSM-таблица).
  5. {prefix}:program:{pid}:idem (idempotency hash).

Аргументы (ARGV):

  1. program_id.
  2. expected_from ("" означает «любое»).
  3. to.
  4. patch_json (произвольные пары ключ/значение для слияния).
  5. idempotency_token (полученный из compute_content_hash_hex({pid, from, to, patch})).
  6. prefix (для построения ключа множеств статусов).

Алгоритм.

  1. Чтение текущего блоба программы. Если отсутствует, выполняется return {stale, "0", "program not found"}.
  2. Декодирование JSON с pcall. При сбое возвращается {invalid, "0", reason}.
  3. Чтение текущего state из блоба; валидация типа (только string). При сбое возвращается {invalid, ...}.
  4. Проверка expected_from: при заданном значении и несовпадении возвращается {stale, "0", reason}.
  5. Чтение FSM-таблицы; проверка to против допустимых переходов. При сбое возвращается {illegal, "0", "from -> to"}.
  6. Проверка HEXISTS {idem_hash} {token}: при true возвращается {duplicate, epoch, blob}.
  7. Запись idem_hash[token] = 1 с TTL 300 секунд.
  8. Декодирование patch_json с pcall. При сбое возвращается {invalid, "0", reason}.
  9. Удаление state, epoch, id из патча (защита от мутации полей, владеемых сервером). Применение оставшегося патча к блобу.
  10. INCR {prefix}:tsnew_epoch. Установка blob.state = to, blob.epoch = new_epoch, blob.atomic_counter = new_epoch (зеркало унаследованного поля).
  11. Повторное кодирование блоба через cjson.encode_empty_table_as_object(true) (опциональный путь, если функция поддерживается; иначе резервный путь через нормализацию на стороне чтения).
  12. SET {program_key} {new_blob}.
  13. SREM {prefix}:status:{from} {pid}, SADD {prefix}:status:{to} {pid}.
  14. XADD {prefix}:events:status MAXLEN ~ 10000 * id={pid} status={to} from={from} to={to} event=transition epoch={new_epoch} (двойная семантика: унаследованные поля id/status/event и поля dp-native pid/from/to/epoch).
  15. Return {ok, new_epoch, blob}.

Возвращаемые статусы:

  • ok: успех.
  • duplicate: повторное обращение с тем же ключом идемпотентности; возвращается актуальный блоб.
  • stale: программа отсутствует или expected_from не совпал.
  • illegal: FSM-таблица отвергает пару (from, to).
  • invalid: некорректный вход на стороне вызывающего (нарушение JSON, отсутствие state в блобе).
  • unknown: непредвиденный статус (защита от расхождения версий).

Дополнительные защиты.

  • cjson.encode_empty_table_as_object(true) сохраняет пустые {} в JSON-роундтрипе (по умолчанию пустые таблицы Lua кодируются как []).
  • Защита от запятых в словаре FSM: encode_for_lua на стороне Python отвергает значения состояний, содержащие запятую (иначе подстрочный поиск в списке допустимых целевых состояний, разделённом запятыми, мог бы дать ложное разрешение).

6.2. archive_swap.lua

Назначение. Атомарный обмен ячейки архива MAP-Elites с серверной валидацией score и tiebreak-битом.

Ключи:

  1. {prefix}:archive (forward hash: cell → candidate_id).
  2. {prefix}:archive:reverse (reverse hash: candidate_id → cell).
  3. {prefix}:archive:scores (scores hash: cell → score).

Аргументы:

  1. cell_key.
  2. candidate_id.
  3. candidate_score (numeric).
  4. tiebreak_bit ("0" или "1": при равенстве score решает приоритет кандидата).

Алгоритм.

  1. Валидация candidate_score через tonumber + check finite. NaN, +Inf, -Inf отвергаются с {invalid, reason}.
  2. Чтение текущего занимающего ячейку через HGET archive cell. При отсутствии выполняется HSET archive cell candidate, HSET reverse candidate cell, HSET scores cell score, возвращается {inserted, ""}.
  3. Чтение текущего score через HGET scores cell. Декодирование через tonumber; при nil/NaN применяется резервное значение 0.0.
  4. Сравнение: при candidate_score > current_score или (candidate_score == current_score AND tiebreak == "1") выполняется замена.
  5. На пути замены: удаление обратного индекса предыдущего занимающего, удаление прежней ячейки текущего кандидата (защита от двойной ячейки), запись новых прямого/обратного индексов и score. Return {swapped, previous_id}.
  6. На пути отказа: return {rejected, current_id}.

Инвариант биекции. Каждая операция замены и вставки освобождает предыдущую ячейку кандидата в обратном и прямом индексах. Программа не может оказаться одновременно в двух ячейках. Тест test_concurrent_writers.py::TestArchiveSwapRace::test_highest_score_wins подтверждает инвариант под нагрузкой 20 конкурентных кандидатов с различными значениями score.

6.3. instance_lock_acquire.lua

Назначение. Атомарная установка ключа блокировки с TTL и опциональным валидатором собственника.

Ключи: {prefix}:lock:{key}.

Аргументы: token, ttl_ms.

Алгоритм. SET key token NX PX ttl_ms. Возврат ok при успехе, held {holder} при отказе.

6.4. instance_lock_renew.lua

Назначение. Обновление TTL блокировки по принципу CAS на токене.

Алгоритм. GET key; если значение совпадает с token, выполняется PEXPIRE key ttl_ms. Иначе возвращается lost.

6.5. instance_lock_release.lua

Назначение. Освобождение блокировки по принципу CAS на токене.

Алгоритм. GET key; если значение совпадает с token, выполняется DEL key. Иначе молчаливый возврат (идемпотентность).

6.6. counter_inc.lua

Назначение. Атомарное увеличение G-счётчика по акторам с инкрементом эпохи.

Ключи:

  1. {prefix}:counter:{key} (хеш с разбиением по акторам).
  2. {prefix}:ts (счётчик эпох).

Аргументы: actor_id, delta.

Алгоритм. Валидация delta через tonumber + integer check + nonzero check. HINCRBY counter actor deltanew_value. INCR tsnew_epoch. Return {new_value, new_epoch}.

Чтение между акторами. crdt_read выполняется через HVALS counter с суммой; эпоха получается через отдельный GET ts.

6.7. lwwr_set.lua

Назначение. Last-Writer-Wins регистр с разрешением равенства по HLC.

Ключи: {prefix}:lwwr:{key} (hash с полями value, hlc).

Аргументы: candidate_value, candidate_hlc_hex.

Алгоритм. Валидация candidate_hlc_hex (длина 32, hex в нижнем регистре). HGET key hlc; при лексикографическом current_hlc >= candidate_hlc возвращается kept. Иначе HSET key value=candidate_value hlc=candidate_hlc → return replaced.

6.8. bounded_list_push.lua

Назначение. Атомарный LPUSH + LTRIM 0 (cap-1) для списков с ограниченным окном (используется на стороне словаря подсказок для окна значений приспособленности).

Аргументы: key, value, cap.

Алгоритм. LPUSH key value; LTRIM key 0 (cap-1); LLEN keynew_len. Return new_len.

7. Реестр Lua-скриптов: LuaRegistry

LuaRegistry управляет жизненным циклом Lua-скриптов в Redis. Класс реализует:

  1. Регистрация. register(name, source) сохраняет исходный код скрипта в локальном словаре. Регистрация выполняется на этапе _register_builtin_scripts внутри DataPlane.startup.

  2. Загрузка. load_all() выполняет SCRIPT LOAD для каждого зарегистрированного скрипта и сохраняет SHA в локальном кеше. Загрузка идёт под защитой ленивой блокировки повторной загрузки для координации с конкурентными вызовами.

  3. Исполнение. evalsha(name, *, keys, args) производит EVALSHA через сохранённый SHA. При получении NoScriptError от Redis (что случается, например, после SCRIPT FLUSH на сервере или при переключении при отказе на реплику чтения) выполняется однократная последовательность повторной загрузки и повторной попытки под защитой _reload_lock. Конкурентные первые загрузки объединяются: два вызывающих, обнаруживших NoScriptError одновременно, выполняют единственный SCRIPT LOAD через асинхронную блокировку.

  4. Защита от двойной загрузки. При повторном NoScriptError после повторной загрузки и повторной попытки поднимается типизированная ScriptLostError(script_name=name). Бесконечный цикл повторных загрузок исключён.

  5. Защита от обращения к незарегистрированному скрипту. evalsha против неизвестного имени поднимает ScriptNotRegisteredError(script_name=name).

8. Соединительный слой: RedisConnection

RedisConnection инкапсулирует пул redis.asyncio.Redis с обязательным decode_responses=True. Семантика жизненного цикла:

  1. Конструктор не выполняет I/O. Объект может быть сконструирован вне цикла событий.

  2. startup() идемпотентен. Внутренний asyncio.Lock создаётся лениво внутри запущенного цикла событий (защита от привязки к чужому циклу). Быстрый путь: проверка _pool is not None без блокировки. Медленный путь: вход под Lock, повторная проверка, конструирование пула, выполнение PING. При сбое PING или любом исключении производится освобождение частично сконструированного пула через _safe_close и проброс типизированной StartupError. asyncio.CancelledError обрабатывается отдельно (является BaseException, не покрывается except Exception).

  3. shutdown() идемпотентен. Защищён тем же asyncio.Lock, что и startup. Гарантируется, что одновременный shutdown и startup не оставят висячий пул.

  4. _safe_close с верхней границей. Закрытие пула обёрнуто в asyncio.wait_for(timeout=5s). Узел, удерживающий сокет полузакрытым, более не способен заблокировать процесс на бесконечно долгое время. Истечение тайм-аута приводит к логированию предупреждения, но не блокирует дальнейший выход.

9. Миграции прикладных вертикалей

9.1. RedisProgramStorage и FSM-переходы

Старая реализация. RedisProgramStorage.atomic_state_transition выполнял ручной цикл while True: WATCH/MULTI/EXEC с вычислением объединённого блоба на стороне Python, INCR atomic_counter, обновлением множеств статусов, эмиссией status_events. Цикл не имел верхней границы; ошибки маскировались исключениями WatchError без типизации.

Новая реализация. RedisProgramStorage принимает опциональный параметр dataplane: DataPlane | None = None и engine_root: EngineRoot | None = None. При наличии обоих маршрутизация всех FSM-переходов производится через dp.transition_program_state, с per-program токеном, полученным через engine_root.split_program_token(program_id). Без wired-режима (dataplane=None) сохраняется унаследованный WATCH-путь как обратносовместимая опция.

Мост унаследованного словаря. На стороне координатора ProgramState использует значения в верхнем регистре, а персистированный JSON-блоб исторически содержит нижний регистр. Функция load_legacy_program_fsm(dp) загружает FSM-таблицу с обоими вариантами регистра (избыточно: каждая строка таблицы экспортируется как под верхним, так и под нижним регистром; целевые состояния содержат оба варианта в виде списка через запятую). Мост позволяет сохранить унаследованный формат данных без миграции.

Пакетные переходы. Методы batch_transition_state и batch_transition_by_ids ранее обходили FSM-валидацию за счёт прямого pipe.set. После миграции при wired-режиме они маршрутизируют через dp.transition_program_state_batch. Токены по элементу строятся через engine_root.split_program_token (при наличии engine_root) или mint_root (резервный путь). Атомарность по элементу гарантируется; атомарность пакета целиком не гарантируется (по проектному решению §4.2), что соответствует унаследованной семантике.

set_in_memory_state. Новая функция в state_manager.py обеспечивает FSM-валидированную мутацию зеркала программы в памяти без записи в Redis. Используется в местах, где Python-объект программы синхронизируется с состоянием, известным из других источников (например, в bus/node после восстановления конверта мигранта). Незаконные мутации зеркала возвращают ValueError вместо бесшумной десинхронизации.

register_external_terminal_state. Отдельная вспомогательная функция в state_manager.py обрабатывает межпроцессную регистрацию терминального состояния (DONE, DISCARDED) на программе, прибывшей из другого engine. Целевое состояние ограничивается терминальным набором. Регистрация не выполняет Redis I/O и не валидирует переход против in-run FSM (входящий мигрант восстанавливается в произвольном унаследованном состоянии). Обходной путь явно поименован и пригоден для поиска через grep, что предотвращает добавление новых анонимных обходов.

9.2. archive_storage (MAP-Elites)

Старая реализация. RedisArchiveStorage.add_elite поддерживал локальный кеш _elite_cache. При WatchError повторно входил в цикл; верхняя граница попыток отсутствовала. Сравнение candidate vs occupant производилось через произвольный Python callable is_better, что препятствовало миграции на серверную атомарность.

Новая реализация. Конструктор принимает dataplane: DataPlane | None и engine_root: EngineRoot | None. Селекторы (ArchiveSelector) дополнительно экспортируют опциональный метод reduce_to_score(program) -> float | None. Если конкретный селектор может свести сравнение к одному числу (например, SumArchiveSelector через сумму взвешенных метрик), add_elite маршрутизирует через dp.try_replace_elite с серверной валидацией score и tiebreak-битом. При невозможности редукции (например, ParetoFrontSelector через многомерное доминирование) сохраняется WATCH-путь с верхней границей _WATCH_MAX_ATTEMPTS = 50 и явным Err(DataPlaneError) при её исчерпании.

Кеш удалён. _elite_cache устранён полностью. Чтение ячеек идёт в Redis через HGET archive cell + GET {prefix}:program:{occupant_id}. Свежесть ячейки контролируется через опциональный аргумент freshness (по умолчанию FreshnessEventual).

9.3. Словарь подсказок (prompts/fetcher, coevolution/stats, coevolution/sync)

Смежная работа. PR #10 «sanitize LLM-derived text» поставляет санитайзеры (sanitize_for_log, sanitize_for_json, sanitize_for_dbtext, clean_identifier, deep_sanitize_for_json); интеграционный коммит настоящего PR применяет sanitize_for_log на каждой строковой интерполяции exception-логов в fetcher.py, coevolution/stats.py и bandit.py (см. интеграционный merge с nightly).

Старая реализация. GigaEvoArchivePromptFetcher.record_outcome производил redis.Redis.get (синхронный), json.loads, мутацию словаря, redis.Redis.set (синхронный). _current_pack хранился в атрибуте экземпляра. RedisPromptStatsProvider создавал redis.asyncio.Redis без socket-таймаутов; жизненный цикл клиента отсутствовал.

Новая реализация. GigaEvoArchivePromptFetcher принимает main_dataplane: DataPlane | None, prompt_dataplane: DataPlane | None, actor: ActorIdentity | None. При наличии основной плоскости записи record_outcome идут через атомарные примитивы:

  • await dp.crdt_inc(CounterKey(f"prompt_stats:trials:{prompt_id}"), actor=actor, delta=1).
  • await dp.crdt_inc(CounterKey(f"prompt_stats:successes:{prompt_id}"), actor=actor, delta=1) (при is_improvement).
  • await dp.crdt_inc(CounterKey(f"prompt_metrics:{prompt_id}:{metric}"), actor=actor, delta=value) (с фиксированной точкой ×1000).
  • await dp.set_add(SetKey(f"prompt_metrics:{prompt_id}:directory"), metric_name).
  • await dp.bounded_list_push(ListKey(f"prompt_fitness:{prompt_id}"), round(child_fitness, 4), cap=_FITNESS_WINDOW).

_current_pack перенесён в _CURRENT_PACK: ContextVar[_PromptPack | None], что обеспечивает изоляцию между конкурентными мутациями в одной задаче.

RedisPromptStatsProvider принимает список dataplanes: list[DataPlane]. Чтение производится через dp.crdt_read, dp.bounded_list_range, dp.set_members, dp.raw_hash_get. Метод get_stats(prompt_id) агрегирует counts/sums по всем переданным экземплярам DataPlane (поддержка межзапусковой агрегации в составе общего плана программы изоляции).

coevolution/sync.MainRunSyncHook смигрирован на dp.raw_hash_get для опроса engine:total_generations. Прямые импорты redis.asyncio устранены.

Все три файла удалены из белого списка TID251 в pyproject.toml. Линтер подтверждает отсутствие прямых вызовов redis в gigaevo/prompts/.

mutation_operator.on_program_ingested: вызов record_outcome стал асинхронным напрямую (без asyncio.to_thread).

9.4. RedisInstanceLock

Старая реализация. RedisInstanceLock реализовывал собственный конвейер script_load + retry on NoScriptError, около 50 строк параллельного кода, дублирующего LuaRegistry.evalsha. Сигнализация потери блокировки осуществлялась через флаг lease.flag.is_set(), который вызывающий должен был помнить опрашивать.

Новая реализация. Конструктор принимает dataplane: DataPlane | None. При wired-режиме все три Lua-вызова (acquire, renew, release) маршрутизируют через dp._lua.evalsha(_SCRIPT_LOCK_*, ...). Локальная логика повторной загрузки и повторной попытки устранена.

InstanceLease оборачивается через dp.wrap_lease(lease) и доступна как lock.wrapped_lease. Задача-продлеватель устанавливает OneShotFlag при любом обнаружении потери (отказ CAS на токене, исключение транспорта). Новый метод lock.observe_loss() возвращает CrashEvent через однократного потребителя.

release() очищает токен в finally, что предотвращает оставление is_held=True после исключения на стороне Redis.

9.5. Реестр многоруких бандитов

Старая реализация. SlidingWindowUCB1 поддерживал счётчики попыток и сумм наград целиком в процессной памяти. Межпроцессная агрегация отсутствовала. Перезапуск процесса приводил к потере всей истории.

Новая реализация. SlidingWindowUCB1 и BanditModelRouter принимают опциональные dataplane: DataPlane | None, actor: ActorIdentity | None, engine_root: EngineRoot | None. При наличии всех трёх компонентов попытки и суммы наград по каждой руке зеркалируются в Redis G-счётчики через dp.crdt_inc с токеном счётчика по вызову, получаемым из engine_root.split_counter_token. Внутрипроцессное скользящее окно UCB сохраняется как авторитетный сигнал отбора (последовательность выбора остаётся побайтно идентичной унаследованной под тем же значением seed); Redis-зеркало служит для аудита и межпроцессного разделения состояния.

refresh_from_redis() поднимает локальные счётчики до межакторной консенсусной суммы; A/B-сравнение между внутрипроцессной и Redis-консенсусной семантикой допустимо.

Политика REJECTED_ACCEPTOR: новый флаг skip_reward_on_acceptor_reject (по умолчанию False для обратной совместимости) исключает синтетическое 0.0-вознаграждение на пути REJECTED_ACCEPTOR (который чаще отражает сбой исполнения DAG, чем вину оператора). Счётчик попыток инкрементируется (для члена исследования UCB1), окно наград остаётся чистым. Прецедент: prompts/fetcher уже пропускает REJECTED_ACCEPTOR из статистики словаря подсказок.

Смежная работа над семантикой бандита. Корректность отбора руки сохраняется параллельной серией исправлений:

  • PR #2 «fix(bandit): return neutral reward for non-finite fitness inputs»: compute_bandit_reward(NaN, _) ранее возвращал NaN, что втекало в среднее SlidingWindowUCB1 и тихо ломало маршрутизацию (score > best_score всегда False, первая рука в порядке словаря всегда побеждала). Триггер: любой путь, продуцирующий нефинитный fitness. Исправление: возврат 0.0 при нефинитности любого из входов.
  • PR #12 «fix(bandit): skip reward updates for unknown arm names»: update_reward индексирует self.arms[arm_name] напрямую, что роняло callback агента через KeyError при метаданных программы из Redis-снапшота старого роутера или пользовательского мутационного оператора. Исправление: проверка членства в начале on_mutation_outcome с одной debug-записью на пропуск.
  • PR #13 «feat(llm): add LLM call outcome classifier / state machine» поставляет общий словарь дисциплины классификации исхода LLM-вызова (rate-limit, timeout, parse-failed, context-overflow, content-filter и т. д.), которым в фазе 2 заменяется ad-hoc try/except ladder в bandit и роутере. Импорты BanditAction и classify_call_result уже введены в bandit.py в составе интеграционного коммита.
  • PR #3 «fix(acceptor): reject NaN and +inf is_valid» добавляет isfinite()-guard в ValidityMetricAcceptor. Ранее NaN и +inf тихо принимались как элиты (NaN-сравнения возвращают False, inf <= 0 тоже False).

10. Упрочнение пути через Hydra (фаза 0)

Аудит конфигурационного слоя выявил семь дефектов высокой серьёзности (HIGH), из которых три активно проявлялись в боевых прогонах. Фаза 0 закрывает их на месте без структурного отказа от Hydra.

10.1. Удаление RCE-резолвера eval

OmegaConf.register_new_resolver("eval", eval) экспонировал builtins.eval как доступный через любую интерполяцию ${eval:...}. Удаление резолвера полностью устраняет поверхность RCE. Резолверы merge и len имели нулевое количество точек вызова и удалены вместе с ними (исключение лишней площади атаки).

10.2. Дезактивация модульных синглтонов трекеров

_tb_default, _wb_default, _redis_default удалены. Фабрики init_tb, init_wandb, init_redis, init_tb_redis, init_wandb_redis конструируют свежий экземпляр на каждый вызов. Multirun-сценарии Hydra более не получают клиент, привязанный к первому запуску.

10.3. Снятие @lru_cache с пула исполнителей

default_exec_runner_pool ранее декорировался @functools.lru_cache(maxsize=1), что создавало кеш-связь между пулом подпроцессов и первым циклом событий. После снятия декоратора владение пулом передаётся через ContextVar, устанавливаемый в run_experiment и сбрасываемый в finally. Каждый вход в run_experiment получает свежий пул; амортизация запуска подпроцессов сохраняется, поскольку пул живёт в течение всего прогона.

10.4. StrictChatOpenAI

Новая фабрика gigaevo.llm.strict_chat_openai.strict_chat_openai валидирует kwargs против ChatOpenAI.model_fields плюс собранные алиасы Pydantic (model_name, openai_api_key, openai_api_base, openai_organization, max_completion_tokens, timeout, stop_sequences). Неизвестные kwargs поднимают StrictChatOpenAIError с именованием нарушителя. Опечатка вида tempetature: 0.5 в YAML более не уходит беззвучно в model_kwargs и в OpenAI API.

20 точек кода _target_: langchain_openai.ChatOpenAI в config/llm/*.yaml смигрированы на gigaevo.llm.strict_chat_openai.strict_chat_openai.

10.5. Унификация ${ref:X} против ${X}

В 14 YAML-файлах плоские интерполяции ${redis_storage}, ${llm}, ${problem_context}, ${metrics_context} смешивались с ${ref:...} формой того же ключа. _ref_resolver мутирует исходный конфиг, что приводило к двойной инстанциации ProblemContext на каждом прогоне конвейера. После унификации все ссылки идут через ${ref:...}, и инстанциация происходит однократно.

10.6. Явный _self_ в defaults-блоках

В 18 YAML-файлах defaults-блок не содержал явного - _self_. Hydra 1.3 эмитировал UserWarning об изменении значения по умолчанию в будущем. После явного указания инвариант сохраняется при обновлении Hydra.

10.7. Значение по умолчанию null для ${oc.env:OPENAI_API_KEY}

Все 19 точек кода ${oc.env:OPENAI_API_KEY} без значения по умолчанию были заменены на ${oc.env:OPENAI_API_KEY,null}. Отсутствующая переменная окружения более не приводит к глубокому InterpolationResolutionError внутри instantiate; обработка отсутствующего ключа делегирована прикладному слою через StrictChatOpenAI, который поднимает чёткое типизированное сообщение.

10.8. Утечки между start() и serve_until_signal

Исправление M3 в run.py: внутренний try/finally вокруг dag_runner.start() + evolution_engine.start() + serve_until_signal. В finally производится evolution_engine.stop() и dag_runner.stop() независимо от пути ошибки. Идемпотентность обоих stop() верифицирована.

10.9. Утечки при частичном instantiate

Исправление M4 в run.py: внешний try/finally вокруг всего тела run_experiment. Каждый компонент локально получает значение по умолчанию None; finally индивидуально закрывает каждый ненулевой компонент в детерминированном порядке. Сбой посреди исполнения instantiate(cfg, recursive=True) более не оставляет утечки потока-писателя или Redis-ping-соединения.

11. Функции wire_* для повторного связывания после Hydra

gigaevo/dataplane/engine_startup.py экспортирует семейство функций wire_* для привязки координатора и engine_root к Hydra-сконструированным объектам после instantiate.

  • build_dataplane(redis_url, *, key_prefix) -> DataPlane: фабрика экземпляра DataPlane.
  • build_engine_root(dp) -> EngineRoot: фабрика токенов по подпространствам (program, cell, counter).
  • build_actor_identity(*, run_id, worker_id) -> ActorIdentity: фабрика ActorIdentity с разрешением приоритета (явный аргумент > переменная окружения > резервное значение uuid4 с {hostname}-{pid}).
  • wire_storage(storage, dp, engine_root) -> bool: привязывает координатор и engine_root к RedisProgramStorage. Идемпотентен на идентичной тройке; поднимает RuntimeError на конфликтующей.
  • wire_bandit_router(router, dp, actor, engine_root) -> bool: привязывает к BanditModelRouter (молчаливое отсутствие действий на статических маршрутизаторах).
  • wire_prompt_fetcher(fetcher, main_dp, prompt_dp, actor) -> bool: привязывает к GigaEvoArchivePromptFetcher (молчаливое отсутствие действий на FixedDirPromptFetcher).
  • wire_archive_storage(archive, dp, engine_root) -> bool: привязывает к RedisArchiveStorage.
  • wire_dag_runner(runner, dp, engine_root) -> bool: привязывает к DagRunner.
  • wire_evolution_engine(engine, dp, engine_root) -> bool: привязывает к EvolutionEngine.

run.py вызывает эти функции после instantiate(cfg, recursive=True) в детерминированном порядке. try/finally гарантирует await dp.shutdown() даже на пути ошибки.

@GrigoryEvko

Copy link
Copy Markdown
Author

12. Дисциплина линтера

12.1. TID251 блокирует прямые импорты redis

pyproject.toml:

[tool.ruff.lint.flake8-tidy-imports.banned-api]
"redis".msg = "Import from gigaevo.dataplane instead. See gigaevo/dataplane/__init__.py"
"redis.asyncio".msg = "Import from gigaevo.dataplane instead. See gigaevo/dataplane/__init__.py"
"redis.exceptions".msg = "Import from gigaevo.dataplane instead. See gigaevo/dataplane/__init__.py"
"pickle.loads".msg = "Pickle deserialise of untrusted bytes is bug class #6 (RCE). Use canonical JSON via gigaevo.dataplane.codec."
"cloudpickle.loads".msg = "cloudpickle.loads of untrusted bytes is bug class #6 (RCE). Restrict to private subprocess IPC channels."

Белый список per-file-ignores содержит явные исключения: gigaevo/dataplane/**/*.py (субстрат), gigaevo/infra/endpoint_pool.py (унаследованная ссылка), gigaevo/database/redis/**/*.py (хранилище), gigaevo/database/redis_program_storage.py, gigaevo/evolution/bus/transport.py (фаза 2), gigaevo/evolution/storage/archive_storage.py (унаследованный резервный путь WATCH), gigaevo/utils/trackers/backends/redis.py (сохранение по варианту B), tests/**/*.py (тестовая инфраструктура), плюс подпроцессные IPC-точки wrapper.py, exec_runner.py для pickle/cloudpickle.loads.

Каждая запись в белом списке либо явно сопровождается комментарием с указанием причины, либо подразумевается как унаследованная ссылка. По мере миграции запись удаляется.

12.2. AST-инспектор широких except

tests/dataplane/test_no_silent_broad_except.py обходит каждый .py файл субстрата (за исключением тестового дерева) и применяет два инварианта:

  1. Голый except: запрещён абсолютно.
  2. except Exception или except BaseException допустим только при наличии маркера # noqa: BLE001 на одной из строк диапазона lineno..end_lineno.

Инспектор сам себя тестирует через TestSelfChecks: семь исходников в памяти, в каждом из которых должна быть обнаружена соответствующая нарушенная инварианта (включая многострочный except (A, Exception): с маркером на закрывающей скобке).

12.3. Дисциплина docstring

Контрольная двухпроходная зачистка набора изменений устранила:

  • Описания археологии («previously», «we now», «the legacy path»).
  • Внутренние идентификаторы и кодовые обозначения («Phase 1», «T-A4», «Crucible»).
  • Многоабзацные описания дизайна на одиночных функциях.
  • Многострочные комментарии в теле тестов, дублирующие название теста.
  • YAML-комментарии, повествующие о миграции.
  • TID251-запись bandit.py (устарела по факту миграции).
  • Дискриминированные обёртки docstring вида «когда привязан → ..., когда не привязан → ...» с заменой на короткую формулировку настоящего поведения.
  • Пустые переименования без семантического выигрыша (test_foo_works → test_foo_succeeds, mock_redis → fake_redis, переформулировки docstring типа «Build» → «Initialize»).

Совокупное чистое удаление двух проходов составило около 1 660 строк.

13. Тестирование

13.1. Структура тестов субстрата

gigaevo/dataplane/tests/
├── test_smoke.py                          # Проверка импорта, конструирование без I/O.
├── test_models.py                         # Versioned, Result, Sourced, Freshness.
├── test_permissions.py                    # Token линейность, EngineRoot.
├── test_codec.py                          # Канонический JSON, content_hash_hex.
├── test_errors.py                         # Структурированная иерархия исключений.
├── test_ids.py                            # NewType валидаторы.
├── test_lattices.py                       # Решёточные аксиомы.
├── test_transitions.py                    # FSM-таблицы.
├── test_scripts.py                        # LuaRegistry NOSCRIPT recovery.
├── test_connection.py                     # Жизненный цикл пула.
├── test_crash.py                          # OneShotFlag, CrashEvent, wrap_lease.
├── test_transition_state.py               # Lua + Python wrapper.
├── test_fsm_exhaustive.py                 # 4×4 = 16 пар + expected_from=None.
├── test_concurrent_writers.py             # Конкурентный фаззинг.
├── test_archive_swap.py                   # archive_swap.lua + try_replace_elite.
├── test_instance_lock.py                  # Все три lock-Lua + wrap_lease.
├── test_crdt_counter.py                   # G-counter, Freshness.
├── test_lwwr.py                           # LWW с HLC-разрешение равенства.
├── test_bounded_list.py                   # LPUSH+LTRIM.
├── test_set_directory.py                  # set_add, set_members.
└── test_no_silent_broad_except.py         # AST-дисциплина.

13.2. Исчерпывающее покрытие FSM

test_fsm_exhaustive.py параметризует тест по всем 4×4 = 16 парам (from, to) из PROGRAM_STATE_TRANSITIONS плюс по тем же 16 парам с expected_from=None. Для легальной пары проверяется Ok с продвижением state; для нелегальной возвращается Err(TransitionError(kind="illegal")) с сохранением исходного state. Будущее изменение FSM-таблицы без обновления Lua-скрипта приводит к падению соответствующего теста.

13.3. Конкурентный фаззинг

test_concurrent_writers.py нагружает каждый Lua-CAS примитив высокой конкуренцией:

  • crdt_inc: 10 акторов × 100 инкрементов конвергируют ровно в 1 000.
  • crdt_inc от одного актора: 500 одновременных инкрементов конвергируют ровно в 500.
  • transition_program_state: 20 параллельных задач с одинаковым (pid, from, to, patch) получают Ok с одной эпохой (идемпотентность по содержимому); 20 параллельных задач с уникальными патчами получают ровно один Ok и 19 Err(stale).
  • acquire_instance_lock: 20 параллельных задач на одном ключе дают ровно один Ok и 19 Err(LockHeld).
  • try_replace_elite: 20 кандидатов разного score → ячейка оказывается у максимального score (инвариант биекции).

13.4. Property-based проверки и идемпотентность

test_codec.py верифицирует канонический JSON через property-based-генератор: для произвольной структуры из примитивов и контейнеров encode → decode → encode стабильно. content_hash_hex инвариантен относительно порядка ключей в словаре.

test_transition_state.py::TestIdempotency проверяет, что повторный вызов с тем же (pid, from, to, patch) возвращает кешированное значение через серверный idempotency_token в transition_state.lua.

13.5. AST-дисциплина

test_no_silent_broad_except.py самопроверяется через семь тестовых случаев в памяти в TestSelfChecks. Каждый случай конструирует исходник с заданной структурой except и проверяет вердикт инспектора. Изменение поведения инспектора без обновления самопроверяющего теста приводит к падению.

13.6. Регрессионные тесты для каждого закрытого класса

Каждое исправление сопровождается регрессионным тестом:

  • Singleton multirun-safety: tests/utils/test_trackers_singletons.py (8 тестов).
  • Pool ContextVar isolation: tests/stages/test_exec_runner_pool.py (4 теста).
  • Strict ChatOpenAI typo rejection: tests/llm/test_strict_chat_openai.py (10 тестов).
  • Resolver double-registration safety: tests/config/test_resolvers.py (4 теста).
  • Prompts vertical round-trip: tests/prompts/test_record_outcome_redis.py (10 тестов).
  • Wire-helpers idempotency: tests/integration/test_mini_run_dataplane.py (15+ тестов).
  • M3/M4 lifecycle: tests/entrypoint/test_run_experiment_lifecycle.py (3 теста).
  • Storage dp-routing: tests/database/test_redis_storage_dataplane.py (40+ тестов).
  • Migrant ingestion: tests/database/test_state_manager.py::TestRegisterExternalTerminalState (3 теста).
  • Batch transitions: tests/database/test_redis_storage_dataplane.py::TestBatchTransitionViaDataplane (5 тестов).
  • Lock dp-routing: tests/database/test_redis_locking.py::TestDataplaneRoutedLocking (4 теста).

13.7. Итог тестов

Полный прогон: 4 789 тестов, время 6:38, нулевые отказы (исключая два ранее существовавших игнор-маркера: test_steady_state_determinism периодически проявляет нестабильность под параллельной нагрузкой, test_manifest.py имеет ранее существовавший ImportError на этапе сбора тестов).

Смежная работа над тестовой инфраструктурой. Время прогона и стабильность теста улучшаются двумя параллельными PR:

  • PR #11 «test: enable pytest-xdist parallel execution» добавляет pytest-xdist>=3.6 в test-extras и -n auto --dist=loadgroup в pytest.ini. Семейства тестов с разделяемым состоянием опцируются в маркер xdist_group. Сокращает локальный прогон с ~70 секунд до ~40 секунд на 16-ядерной машине.
  • PR #19 «test(infra): replace deprecated asyncio.get_event_loop + skip-guard test_manifest» мигрирует 11 точек вызова asyncio.get_event_loop() (deprecated в 3.12, удалён в 3.13) и добавляет pytest.importorskip("tools.experiment.manifest") в test_manifest.py, что разблокирует сбор тестов на чистом checkout main.

14. Тонкие детали

14.1. Защита от враждебных входов

В рамках первоначального аудита и трёх параллельных hunter-resolver-агентов выявлены и устранены следующие поверхности враждебных входов.

  • codec.py: Decimal('-0') ранее производил "-0" каноническое представление, тогда как Decimal('0') производил "0". Численно равные значения порождали различные хеши содержимого. Исправлено: знак нуля сводится к "0". Unicode-ошибки кодирования и декодирования вместе с json.JSONDecodeError унифицированы в CanonicalEncodingError.

  • models.py::HlcTimestamp.unpack_hex: ранее принимал смешанный регистр. Лексикографическое сравнение требует одного регистра. Исправлено: принимаются только hex-строки в нижнем регистре.

  • ids.py::ActorIdentity.__post_init__: отвергает управляющие символы группы C0 (NUL, CR, LF, ESC, DEL), переключатели направления (U+202E, U+2028/9) и прочие разделители. Защищает от инъекции в логи и атак запутывания через идентификатор актора.

  • transitions.py::encode_for_lua: запрещает запятую в значениях состояний FSM (иначе подстрочный поиск в разделённом запятыми списке допустимых целей мог дать ложное разрешение).

  • permissions.py::Token: запечатан через __init_subclass__. Произвольный подкласс не способен переопределить __copy__/__deepcopy__/__reduce__ для дупликации свидетеля.

  • permissions.py::mint_combine: проверяет оба входа до потребления любого. Сбой на втором не оставляет первый в полупотреблённом состоянии. (Примечание: mint_combine удалён в финальной зачистке как неиспользуемый субстрат; защитная семантика задокументирована.)

  • errors._format_field: содержит repr()-ошибки. Если у поля исключения собственный __repr__ падает, рендеринг возвращает <repr-failed: TypeName> без рекурсивного разрастания.

  • crash.py::CrashWatchedHandle.call: docstring фиксирует поведение recover_fn raise: распространяется дословно, без синтезированного CrashEvent.

Смежная работа над поверхностью враждебных входов. Внеплановые векторы LLM-генерируемого текста (одиночные UTF-16 суррогаты, NUL, ANSI-escape, BIDI-overrides) и нефинитных численных входов закрыты двумя параллельными PR:

  • PR #10 «fix: sanitize LLM-derived text before logging, JSON serialization, and database writes» вводит gigaevo/utils/text_sanitize.py с приблизительно 760 новыми тестами против Unicode-confusables, BIDI-меток, вариационных селекторов, эмодзи ZWJ-последовательностей, regex-bypass для CSI/OSC/DCS, downstream-проверками через pydantic, loguru, fakeredis, sqlite3. Применяется на каждой границе ввода-вывода LLM-текста в Program, MigrantEnvelope, MultiModelRouter, MutationStructuredOutput, TokenTracker, ParamSpec.name и в тринадцати точках интерполяции logger.warning/logger.exception.
  • PR #2 и PR #3 добавляют isfinite()-guards на сторонах bandit-reward и acceptor-валидности соответственно (см. раздел 9.5 и Класс №9 выше). Те же isfinite()-проверки императивно введены в archive_swap.lua и counter_inc.lua настоящего PR на границе скрипта.

14.2. Защита от ошибок в Lua

  • transition_state.lua: декодирование patch-а до записи idempotency_token. Ранее некорректный patch оставлял idem-hash установленным, и легитимная повторная попытка возвращала duplicate с устаревшим pre-call blob-ом.

  • transition_state.lua: pcall вокруг декодирования блоба программы; явный ответ {invalid, ...} при повреждённом блобе защищает от состояния dead-letter.

  • transition_state.lua: явное удаление state, epoch, id из patch до слияния защищает идентичность блоба от вредоносного вызывающего.

  • transition_state.lua: MAXLEN ~ 10000 потока соответствует константе STREAM_MAX_LEN на стороне Python. Расхождение между Lua и Python устранено.

  • archive_swap.lua: отвержение NaN/Inf в candidate_score; ранее сравнения </> с NaN тихо возвращали False, и кандидат со значением NaN мог обмануть проверку. Также явное освобождение предыдущей ячейки кандидата на путях вставки и замены обеспечивает инвариант биекции.

  • instance_lock_acquire.lua/renew.lua/release.lua: отвержение пустого token и неположительного ttl_ms на границе скрипта. Раньше нулевой TTL в SET PX приводил к ошибке посередине вызова.

  • counter_inc.lua: отвержение пустого actor_id и nil/NaN/Inf/нецелого delta перед HINCRBY.

  • lwwr_set.lua: отвержение некорректного HLC (неправильная длина, не hex, верхний регистр). Лексикографическое сравнение >= достоверно только для hex-строк одинаковой длины в нижнем регистре.

  • scripts.py::LuaRegistry._reload: объединение первой загрузки. Два конкурентных вызывающих, обнаруживших NoScriptError одновременно для одного и того же скрипта, совершают единственный SCRIPT LOAD через асинхронную блокировку вместо параллельных избыточных загрузок.

14.3. Защита от состояний гонки в координаторе

  • coordinator.py::startup и shutdown: ленивая asyncio.Lock с повторной проверкой после взятия. Два конкурентных вызывающих не способны вместе сконструировать _lua или установить _started. Откат запуска ловит BaseException для покрытия пути asyncio.CancelledError при отмене во время загрузки FSM.

  • release_instance_lock: ожидает завершения renewer-задачи до выполнения DEL. renewer посреди EXPIRE не способен сработать после release.

  • _renew_lease_loop: устанавливает OneShotFlag аренды при любом исключении, отличном от отмены, что предотвращает тихое падение renewer-задачи.

  • Одиннадцать публичных методов координатора маршрутизируют через валидаторы:

    • _validate_key_component: отвергает NUL/CR/LF универсально и : для атомарных ID.
    • _validate_ttl: отвергает NaN/Inf, неположительные значения, bool.
    • _validate_floor: отвергает отрицательные epoch/generation.
    • _check_deadline: возвращает типизированный Err(DeadlineExceeded).
  • Токен потребляется ПОСЛЕ _require_started + _check_deadline. Ранее прерванный вызов более не потребляет свидетеля безмолвно.

  • transition_program_state_batch имеет верхнюю границу 1024 элемента. Случайно неограниченный пакет обнаруживается на границе.

  • try_replace_elite валидирует конечность candidate_score и условие tiebreak_bit ∈ {0, 1}.

  • read_program возвращает типизированный DataPlaneError на сбое декодирования блоба, без проброса сырого исключения.

  • connection.py::_safe_close: asyncio.wait_for(timeout=5s) на pool.aclose(). Полузакрытый узел более не способен заблокировать процесс на бесконечно долгое время.

15. Что отложено в субстрат фазы 2

Настоящий раздел каталогизирует сознательно сохранённые, но пока не используемые компоненты, размещённые в коде в рамках настоящего PR и предназначенные для прямого использования в фазе 2. Перечень указан исчерпывающе, что исключает ошибочное удаление перечисленных имён как «мёртвого кода» при последующем обходе кодовой базы.

  • lwwr_set / lwwr_get + scripts/lwwr_set.lua: LWW-регистр с HLC-разрешением равенства. Субстрат для конкурентных писателей одиночного значения (конфигурация под потребителя в форме потока). Прямой потребитель описан в разделе 3.4 RFC #18 как механизм публикации параметров координационной плоскости.
  • HlcTimestamp: основа для меток времени событий фазы 2 и распределённого HLC фазы 4. Применяется в RFC #18, раздел 3.4, для упорядочивания событий между несколькими драйверами без зависимости от настенного времени узлов.
  • Решётки (BoolLattice, EpochLattice, GenerationLattice, ProductLattice, MonotoneLattice): словарь, который Versioned + Freshness уже используют семантически; BoolLattice обеспечивает условие допуска фазы 2 «хотя бы один обработчик видел эту агрегацию». Описание путей допуска под распределённую топологию приведено в RFC #18, раздел 3.5.
  • 13 идентификаторов NewType (AggregateId, BanditArm, CausationId, CorrelationId, StreamName, ConsumerGroup, ConsumerName, NodeId, IdempotencyToken, EventId, EpochId, GenerationId, StepId): нулевая стоимость во время выполнения; словарь для событий, групп потребителей, потоков и отслеживания каузальности в фазе 2. Прямое сопоставление словаря с примитивами Redis Streams (XADD / XREADGROUP / XACK) выполнено в RFC #18, раздел 3.4. Контракт обработчика (WorkerConfig, node_id, worker_id, теговая подписка) изложен в разделе 3.3 того же RFC.

Реестр выше является авторитативным каталогом сохраняемых имён. Любая ревизия списка фиксируется правкой настоящего раздела.

16. Связь с RFC о распределённом исполнении (issue #18)

Архитектурная база, поставляемая настоящим PR, образует фундамент для практической реализации RFC «Распределённое исполнение в gigaevo-core» (issue #18). Ниже фиксируется покомпонентное соответствие между требованиями RFC и компонентами субстрата, поставляемыми в данном PR.

16.1. Целевая нагрузка и её требования

RFC #18, раздел 1 («Status quo»), фиксирует исходное состояние: оркестрация эволюционного поиска жёстко привязана к одной машине через три структурных решения (loky-пул процессов, локальный CardStore под единственный путь файловой системы, RedisInstanceLock с эксклюзивной блокировкой префикса). Раздел 2.1 RFC формализует целевую нагрузку: оптимизация CUDA-ядер через эволюционный поиск в смежном проекте kernel-evo, эксклюзивный GPU-захват на длительность бенчмарка, гетерогенный парк обработчиков (CPU-узлы компиляции и GPU-узлы исполнения, различающиеся по типу ускорителя H100 / A100 / L40S).

Перечисленные характеристики требуют координационной плоскости с типизированной обработкой потери аренды, идемпотентной записью результатов, теговой маршрутизации задач, атомарного счётчика с актор-партиционированием для межпроцессной агрегации метрик исполнения. Каждое из требований получает прямой ответ в субстрате настоящего PR, как изложено ниже.

16.2. Этапы реализации RFC и положение настоящего PR

RFC #18, раздел 5 («Implementation stages»), фиксирует семь этапов поставки: этап 0 (сетевые исправления, PR #17), этап 1 (WorkerConfig, node_id, worker_id, PR #14), этап 2 (абстракция ExecutorBackend, PR #14), этап 3 (RedisStreamWorkerBackend, ArtifactStore, CLI gigaevo worker, режим --embedded), этап 4 (продлеваемый пульс, Idempotency-Key, watchdog, инвалидация через pub/sub), этап 5 (теги, GPU-локи, NFS / S3 артефакт-сторы), этапы 6 и 7 (мониторинг, наблюдаемость).

Настоящий PR поставляет ортогональную ось «дисциплина координационной плоскости», на которую опираются этапы 3, 4 и 5. Без типизированных примитивов координации, дискриминированных возвратов Result, линейных токенов аренды и фантомного провенанса этапы 3-5 RFC потребовали бы изобретения тех же абстракций ad-hoc внутри RedisStreamWorkerBackend. Поставка субстрата отдельным PR обеспечивает переиспользование одной и той же типизации для координационной плоскости и для прикладных вертикалей.

16.3. Сопоставление требований RFC и компонентов DataPlane PR

Полная таблица соответствий формируется следующим образом:

Требование RFC #18 Компонент настоящего PR
SETNX TTL захват задачи (раздел 3.4, примитив «Claim») scripts/instance_lock_acquire.lua + dp.acquire_instance_lock с типизированным LockHeld и Token[InstanceLockTag]
Продлеваемый пульс 3:1 (раздел 3.4, примитив «Heartbeat») scripts/instance_lock_renew.lua + dp.renew_instance_lock + wrap_lease + CrashWatchedHandle с OneShotFlag для типизированной потери аренды
Освобождение по выполнению или по потере (раздел 3.4, примитив «Release») scripts/instance_lock_release.lua + dp.release_instance_lock с идемпотентным контрактом и валидацией владения
Redis Streams XADD / XREADGROUP / XACK (раздел 3.4, примитив «Stream») Словарь NewType StreamName, ConsumerGroup, ConsumerName, EventId, CausationId, CorrelationId плюс подпространство streams/ под Lua-скрипты
Идемпотентная запись через Idempotency-Key (раздел 3.4, примитив «Idempotency») NewType IdempotencyToken + поле idempotency_token в transition_state.lua + compute_content_hash_hex для серверной дедупликации
Атомарный счётчик под распределённые метрики (раздел 2.3 RFC «уже распределённое») scripts/counter_inc.lua + dp.crdt_inc + ActorIdentity(run_id, worker_id) + make_actor для актор-партиционирования
Ограниченная очередь под метрики скользящего окна scripts/bounded_list_push.lua + dp.bounded_list_push + dp.bounded_list_range
LWW-регистр конфигов координационной плоскости scripts/lwwr_set.lua + dp.lwwr_set / dp.lwwr_get с разрешением равенства HLC через HlcTimestamp
HLC-упорядочивание событий между несколькими драйверами (раздел 3.4) HlcTimestamp с лексикографически сравнимой hex-упаковкой и монотонностью внутри одного процесса
Типизированная обработка отказов координационной плоскости (раздел 3.5) Result[T, E] + полная иерархия DataPlaneError + CrashEvent + OneShotFlag + wrap_lease
Контракт обработчика WorkerConfig (раздел 3.3) NewType NodeId плюс готовая семантика ActorIdentity под пары (run_id, worker_id)
Теговая маршрутизация задач (раздел 3.3, многоверсионные подписки) Разметка ключей по подпространствам через KeyBuilder под структуру tasks:{tag} и worker:{tag}:{node}:{idx}
Контроль изоляции прикладных вертикалей (раздел 3.3) Линейные токены Token[Tag] + EngineRoot с раздельными подтокенами program / cell / counter через mint_split_n
Версионирование схемы под перекатывающее обновление парка обработчиков (раздел 3.5) Подпространство gigaevo/dataplane/upcasters/ (заглушка под фазу 2) + поле schema_version в дискриминированных моделях dp/models.py

16.4. Снимаемые ограничения

RFC #18, раздел 2.2, перечисляет четыре кодифицированных ограничения, препятствующих переходу к распределённой топологии: запрет двух драйверов на одном префиксе через RedisInstanceLock; неявная привязка CardStore к локальной файловой системе; жёсткая зависимость от loky-пула; отсутствие словаря идентификаторов обработчика.

Настоящий PR снимает первое ограничение в архитектурном смысле: блокировка экземпляра смигрирована через LuaRegistry.evalsha, дисциплина владения формализована через Token[InstanceLockTag], потеря аренды типизирована через CrashEvent. Расширение блокировки на лидерскую аренду (lease:gam-rebuilder, lease:scheduler) или на эксклюзивный захват GPU (gpu-lock:{node}:{idx}) производится добавлением фабричной функции поверх существующих Lua-скриптов без модификации субстрата. Второе ограничение остаётся за пределами настоящего PR (ArtifactStore относится к плоскости данных, координационная плоскость к ней не примыкает); третье и четвёртое ограничения адресуются параллельным PR #14 и опираются на словарь NewType настоящего PR для типизации параметров обработчика.

16.5. Уже распределённые подсистемы

RFC #18, раздел 2.3, фиксирует подсистемы, уже работающие распределённо через Redis: блоб состояния для программ, архив под обмен ячейками, реестр многоруких бандитов, программный счётчик. Каждая из четырёх подсистем смигрирована на DataPlane в рамках настоящего PR:

  • Блоб состояния для программ: переход через dp.transition_program_state с серверной валидацией FSM-таблицы PROGRAM_STATE_TRANSITIONS в transition_state.lua. Распределённая корректность достигается атомарностью EVALSHA: одновременные transition_program_state от двух драйверов либо упорядочиваются Redis, либо одна из попыток получает типизированный TransitionError.illegal без побочного эффекта.
  • Архив под обмен ячейками: переход через dp.archive_cell_swap с серверной арбитрацией ровно одного победителя на ячейку через archive_swap.lua. Разрешение равенства лексикографическим сравнением идентификатора кандидата исключает класс «later writer wins» при равных значениях score.
  • Реестр многоруких бандитов: запись через dp.crdt_inc с актор-партиционированием. ActorIdentity(run_id, worker_id) обеспечивает каноническую упорядоченность операций приращения без потерь обновления при междрайверной агрегации.
  • Программный счётчик: атомарный INCR внутри transition_state.lua встроен в ту же транзакцию, что и переход FSM. Расхождение между статусом программы и инкрементом счётчика становится непредставимым.

Распределённая корректность всех четырёх подсистем зафиксирована тестами gigaevo/dataplane/tests/ под стратегии Hypothesis, моделирующие параллельные команды от двух и более драйверов.

16.6. Режим --embedded

RFC #18, раздел 3.2, определяет режим --embedded, в котором драйвер и обработчик исполняются внутри одного процесса при сохранении полной типизации обмена через Redis Streams. Данный режим опирается на свойство координационной плоскости: семантика обмена обязана быть инвариантной относительно физического размещения участников. Настоящий PR поставляет требуемую инвариантность через Result[T, E] (одна и та же типизация ответа независимо от транспорта), Token[Tag] (одна и та же дисциплина владения), Versioned[T] (одно и то же контракт-обещание свежести). Переключение между распределённой и встроенной топологиями выполняется фабрикой обработчика без модификации прикладного кода.

16.7. Сводная оценка

Концептуально настоящий PR закрывает структурный пробел между текущей реализацией (раздел 1 RFC «Status quo») и целевой реализацией (раздел 3 RFC). Этапы 3, 4 и 5 RFC опираются на следующие свойства, гарантируемые настоящим PR:

  • Каждая операция координационной плоскости возвращает Result[T, E] с исчерпывающим набором ошибочных вариантов.
  • Каждая аренда (захват, продление, освобождение) типизирована через линейный токен с дисциплиной однократного потребления.
  • Каждое чтение, требующее свежести, обернуто в Versioned[T] с явным контрактом Freshness.
  • Каждая операция приращения метрики проходит через атомарный счётчик с актор-партиционированием.
  • Каждая запись в каноническом виде имеет вычисляемый хеш содержимого, обеспечивающий серверную дедупликацию.

Совокупность перечисленных свойств обеспечивает, что фаза 3 RFC (RedisStreamWorkerBackend) реализуется как прямое сложение типизированного событийного потока поверх уже существующего субстрата без переизобретения дисциплины координации.

@GrigoryEvko GrigoryEvko changed the title feat(dataplane): typed Redis coordination plane — atomic Lua substrate, FSM migration, Hydra hardening feat(dataplane): typed Redis coordination plane with atomic Lua substrate, FSM migration, and Hydra hardening May 17, 2026
@GrigoryEvko GrigoryEvko force-pushed the feat/dataplane-foundation branch from 6b94be1 to bbae675 Compare May 17, 2026 17:20
@GrigoryEvko

Copy link
Copy Markdown
Author

Heads-up on test-side integration with #10 (fix/llm-output-sanitization) and #14 (loky-executor), surfaced while reconciling all three on a nightly branch:

vs #10: the RedisPromptStatsProvider._get_redis_get_dataplane rename and the DataPlane port of gigaevo.prompts.fetcher break test-side monkeypatches added by #10:

vs #14: tests/stages/test_exec_runner_pool.py on this branch has three tests covering run_exec_runner's pool=None ambient fallback (test_run_exec_runner_reuses_ambient_pool_when_pool_is_none and siblings). #14's loky-based run_exec_runner has no pool parameter; those tests become unreachable. This PR or #14 will need to drop them on rebase.

Verified locally by integrating all three PRs — full suite (5862 tests) passes after the test-side adjustments. Parallel notes on #10 and #14.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants