Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions gigaevo/config/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gigaevo.entrypoint.default_pipelines import (
ContextPipelineBuilder,
DefaultPipelineBuilder,
_resolve_context_flag,
)
from gigaevo.entrypoint.evolution_context import EvolutionContext
from gigaevo.evolution.strategies.map_elites import IslandConfig
Expand Down Expand Up @@ -161,9 +162,18 @@ def build_dag_from_builder(builder: Any) -> Any:
def select_pipeline_builder(
problem_context: ProblemContext,
evolution_context: EvolutionContext,
*,
force: str | None = None,
) -> ContextPipelineBuilder | DefaultPipelineBuilder:
"""Select appropriate pipeline builder based on problem type."""
if problem_context.is_contextual:
"""Select appropriate pipeline builder based on problem type.

``force`` lets an operator bypass the ``problem_context.is_contextual``
heuristic from the CLI / Hydra config (e.g.
``+pipeline_builder.force=default`` or ``=context``) — useful when the
auto-detection doesn't match what the run actually needs. Accepts
``"context"``, ``"default"``, or ``None`` (auto, the default).
"""
if _resolve_context_flag(force, problem_context.is_contextual):
return ContextPipelineBuilder(evolution_context)
return DefaultPipelineBuilder(evolution_context)

Expand Down
46 changes: 43 additions & 3 deletions gigaevo/entrypoint/default_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,26 @@
StageFactory = Callable[[], Stage]


def _resolve_context_flag(force: str | None, is_contextual: bool) -> bool:
"""Resolve the context-wiring decision for builders that branch on
``problem_ctx.is_contextual``.

Operators can override the filesystem-inferred heuristic from Hydra via
``+pipeline_builder.force=context`` (or ``"default"``); ``None`` (the
default) falls back to the inferred value. Mirrors
:func:`gigaevo.config.helpers.select_pipeline_builder`.
"""
if force == "context":
return True
if force == "default":
return False
if force is not None:
raise ValueError(
f"force= must be 'context', 'default', or None; got {force!r}"
)
return is_contextual


class PipelineBuilder:
"""Mutable builder for pipeline nodes/edges/deps producing a DAGBlueprint."""

Expand All @@ -70,6 +90,18 @@ def __init__(self, ctx: EvolutionContext, *, dag_timeout: float = 3600.0):

# Stage operations - add, replace, remove
def add_stage(self, name: str, factory: StageFactory) -> PipelineBuilder:
# Fail loudly on duplicate add: silently overwriting an existing
# ``self._nodes[name]`` makes copy-paste mistakes in subclass
# ``_contribute_*`` / ``_add_*`` methods invisible — the new factory
# silently shadows the parent's, producing a working but wrong
# pipeline. Callers that intentionally swap a factory must use
# :meth:`replace_stage` (explicit), mirroring the
# ``StageRegistry.clear() + register()`` discipline.
if name in self._nodes:
raise ValueError(
f"Stage '{name}' already registered on this builder. "
f"Use replace_stage('{name}', ...) to swap factories explicitly."
)
self._nodes[name] = factory
return self

Expand Down Expand Up @@ -384,7 +416,11 @@ def _contribute_default_edges(self) -> None:
self.add_data_flow_edge("MemoryContextStage", "MutationContextStage", "memory")

def _contribute_default_deps(self) -> None:
self._deps = {
# Merge into ``self._deps`` rather than reassigning the whole dict —
# subclasses (or callers that constructed the builder and added deps
# via ``add_exec_dep`` before ``_contribute_default_deps`` ran) would
# otherwise have their contributions silently wiped.
defaults: dict[str, list[ExecutionOrderDependency]] = {
"CallProgramFunction": [
ExecutionOrderDependency.on_success("ValidateCodeStage")
],
Expand Down Expand Up @@ -416,6 +452,8 @@ def _contribute_default_deps(self) -> None:
ExecutionOrderDependency.always_after("EnsureMetricsStage"),
],
}
for stage_name, deps in defaults.items():
self._deps.setdefault(stage_name, []).extend(deps)


class ContextPipelineBuilder(DefaultPipelineBuilder):
Expand Down Expand Up @@ -523,14 +561,15 @@ def __init__(
*,
dag_timeout: float = 3600.0,
optimization_time_budget: float | None = None,
force: str | None = None,
):
super().__init__(ctx, dag_timeout=dag_timeout)
self._optimization_time_budget = (
optimization_time_budget
if optimization_time_budget is not None
else dag_timeout * DEFAULT_OPTIMIZATION_TIME_BUDGET_FRACTION
)
has_context = ctx.problem_ctx.is_contextual
has_context = _resolve_context_flag(force, ctx.problem_ctx.is_contextual)
if has_context:
self._add_context_stage_and_edges()
self._add_cma_optimization(has_context=has_context)
Expand Down Expand Up @@ -649,14 +688,15 @@ def __init__(
*,
dag_timeout: float = 7200.0,
optimization_time_budget: float | None = None,
force: str | None = None,
):
super().__init__(ctx, dag_timeout=dag_timeout)
self._optimization_time_budget = (
optimization_time_budget
if optimization_time_budget is not None
else dag_timeout * DEFAULT_OPTIMIZATION_TIME_BUDGET_FRACTION
)
has_context = ctx.problem_ctx.is_contextual
has_context = _resolve_context_flag(force, ctx.problem_ctx.is_contextual)
if has_context:
self._add_context_stage_and_edges()
self._add_optuna_optimization(has_context=has_context)
Expand Down
15 changes: 15 additions & 0 deletions gigaevo/programs/stages/stage_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ def format_type_name(annotation) -> str:
# Auto-detect import path if not provided
final_import_path = import_path or f"{stage_class.__module__}.{class_name}"

# Fail loudly on duplicate registration: silently overwriting
# ``cls._stages[class_name]`` makes copy-paste bugs (two modules
# decorating distinct ``Stage`` subclasses with the same name)
# invisible until something downstream looks up ``import_path``
# and gets the wrong one. Tests that re-register intentionally
# must call ``StageRegistry.clear()`` between registrations.
existing = cls._stages.get(class_name)
if existing is not None:
raise ValueError(
f"Stage '{class_name}' already registered "
f"(existing import_path={existing.import_path!r}, "
f"new import_path={final_import_path!r}). "
f"Call StageRegistry.clear() before re-registering."
)

cls._stages[class_name] = StageInfo(
name=class_name,
description=description,
Expand Down
89 changes: 89 additions & 0 deletions tests/entrypoint/test_default_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@ def test_replace_stage(self):

assert bp.nodes["S"] is factory2

def test_add_stage_duplicate_raises(self):
"""Adding the same stage name twice via ``add_stage`` silently
shadowed the first factory — exactly the
``StageRegistry.register`` collision pattern. Now it raises so a
subclass copy-pasting a parent ``add_stage`` call fails loudly
instead of producing a working-but-wrong pipeline. Operators that
intentionally swap must use ``replace_stage``."""
import pytest

ctx = _make_ctx()
builder = PipelineBuilder(ctx)
builder.add_stage("S", MagicMock())
with pytest.raises(ValueError, match="already registered"):
builder.add_stage("S", MagicMock())

def test_remove_stage_cleans_edges_and_deps(self):
ctx = _make_ctx()
builder = PipelineBuilder(ctx)
Expand Down Expand Up @@ -407,6 +422,25 @@ def test_inherits_all_default_stages(self):
default_stages = set(default_bp.nodes.keys())
assert default_stages.issubset(set(cma_bp.nodes.keys()))

def test_force_default_skips_context_when_inferred_contextual(self):
"""``force='default'`` wins over auto-detected ``is_contextual=True``."""
ctx = _make_ctx(is_contextual=True)
bp = CMAOptPipelineBuilder(ctx, force="default").build_blueprint()
assert "AddContext" not in bp.nodes

def test_force_context_adds_context_when_inferred_non_contextual(self):
"""``force='context'`` wins over auto-detected ``is_contextual=False``."""
ctx = _make_ctx(is_contextual=False)
bp = CMAOptPipelineBuilder(ctx, force="context").build_blueprint()
assert "AddContext" in bp.nodes

def test_force_invalid_raises(self):
import pytest

ctx = _make_ctx()
with pytest.raises(ValueError, match="must be 'context', 'default'"):
CMAOptPipelineBuilder(ctx, force="bogus")


# ===================================================================
# OptunaOptPipelineBuilder
Expand Down Expand Up @@ -484,6 +518,23 @@ def test_default_dag_timeout_is_7200(self):
bp = builder.build_blueprint()
assert bp.dag_timeout == 7200.0

def test_force_default_skips_context_when_inferred_contextual(self):
ctx = _make_ctx(is_contextual=True)
bp = OptunaOptPipelineBuilder(ctx, force="default").build_blueprint()
assert "AddContext" not in bp.nodes

def test_force_context_adds_context_when_inferred_non_contextual(self):
ctx = _make_ctx(is_contextual=False)
bp = OptunaOptPipelineBuilder(ctx, force="context").build_blueprint()
assert "AddContext" in bp.nodes

def test_force_invalid_raises(self):
import pytest

ctx = _make_ctx()
with pytest.raises(ValueError, match="must be 'context', 'default'"):
OptunaOptPipelineBuilder(ctx, force="bogus")

def test_optuna_stage_factory_is_callable(self):
ctx = _make_ctx()
builder = OptunaOptPipelineBuilder(ctx)
Expand Down Expand Up @@ -618,3 +669,41 @@ def test_nonempty_deps_dict_is_preserved(self):
bp = builder.build_blueprint()
assert bp.exec_order_deps is not None
assert "A" in bp.exec_order_deps


class TestDefaultDepsMergeNotOverwrite:
"""``_contribute_default_deps`` previously assigned the whole ``_deps``
dict literal, silently wiping any deps a subclass had injected at the
top of its override. Verify the merge keeps both."""

def test_subclass_dep_for_unrelated_stage_survives(self):
injected = ExecutionOrderDependency.on_success("Foo")

class _CustomBuilder(DefaultPipelineBuilder):
def _contribute_default_deps(self):
# Add a dep BEFORE calling super — old code's ``self._deps =``
# in the parent would wipe this.
self._deps.setdefault("MyCustomStage", []).append(injected)
super()._contribute_default_deps()

ctx = _make_ctx()
builder = _CustomBuilder(ctx)
assert "MyCustomStage" in builder._deps
assert injected in builder._deps["MyCustomStage"]
assert "CallProgramFunction" in builder._deps # from defaults

def test_subclass_dep_for_default_stage_is_extended(self):
custom_dep = ExecutionOrderDependency.on_success("SomeOtherStage")

class _CustomBuilder(DefaultPipelineBuilder):
def _contribute_default_deps(self):
self._deps.setdefault("CallProgramFunction", []).append(custom_dep)
super()._contribute_default_deps()

ctx = _make_ctx()
builder = _CustomBuilder(ctx)
deps = builder._deps["CallProgramFunction"]
names = [d.stage_name for d in deps]
# Both the subclass's custom dep and the default's own dep coexist.
assert "SomeOtherStage" in names
assert "ValidateCodeStage" in names
19 changes: 19 additions & 0 deletions tests/test_stage_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,25 @@ def test_custom_import_path(self) -> None:
info = StageRegistry.get_stage("SimpleStage")
assert info.import_path == "my.custom.path.SimpleStage"

def test_duplicate_registration_raises(self) -> None:
"""Re-registering a class name silently shadowed the previous entry
with whatever was decorated last, hiding copy-paste bugs across
modules. Now it raises so operators see the conflict at import."""
import pytest

StageRegistry.register(description="first")(SimpleStage)
with pytest.raises(ValueError, match="already registered"):
StageRegistry.register(description="second")(SimpleStage)

def test_clear_then_reregister_works(self) -> None:
"""Tests that intentionally re-register can call clear() between."""
StageRegistry.register(description="first")(SimpleStage)
StageRegistry.clear()
StageRegistry.register(description="second")(SimpleStage)
info = StageRegistry.get_stage("SimpleStage")
assert info is not None
assert info.description == "second"

def test_auto_import_path(self) -> None:
StageRegistry.register()(SimpleStage)
info = StageRegistry.get_stage("SimpleStage")
Expand Down