Skip to content
7 changes: 6 additions & 1 deletion gigaevo/database/redis_program_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from gigaevo.programs.program_state import ProgramState, validate_transition
from gigaevo.utils.json import dumps as _dumps
from gigaevo.utils.json import loads as _loads
from gigaevo.utils.text_sanitize import sanitize_for_log
from gigaevo.utils.trackers.base import LogWriter

T = TypeVar("T")
Expand Down Expand Up @@ -109,7 +110,11 @@ def _safe_deserialize(
try:
return Program.from_dict(_loads(raw), exclude=exclude)
except Exception as e:
logger.warning("[RedisProgramStorage] Corrupt data in {}: {}", ctx, e)
logger.warning(
"[RedisProgramStorage] Corrupt data in {}: {}",
ctx,
sanitize_for_log(str(e)),
)
return None

async def _mget_by_keys(
Expand Down
3 changes: 2 additions & 1 deletion gigaevo/database/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from gigaevo.programs.core_types import ProgramStageResult, StageState
from gigaevo.programs.program import Program
from gigaevo.programs.program_state import ProgramState, validate_transition
from gigaevo.utils.text_sanitize import sanitize_for_log

# States after which the DagRunner never accesses the program again.
# Evict per-program locks for these states to prevent unbounded memory growth.
Expand Down Expand Up @@ -100,7 +101,7 @@ async def set_program_state(
logger.error(
"[ProgramStateManager] Invalid state transition for {}: {}",
program.short_id,
e,
sanitize_for_log(str(e)),
)
raise

Expand Down
15 changes: 13 additions & 2 deletions gigaevo/evolution/bus/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

from abc import ABC, abstractmethod
import json
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

from loguru import logger
from pydantic import BaseModel
import redis.asyncio as aioredis

from gigaevo.utils.text_sanitize import deep_sanitize_for_json

if TYPE_CHECKING:
from gigaevo.evolution.bus.topology import Topology

Expand All @@ -28,10 +30,19 @@ class MigrantEnvelope(BaseModel):
generation: int

def to_stream_fields(self) -> dict[str, str]:
# Belt-and-suspenders: program_data carries LLM-generated code plus
# stage errors whose origins span Python / Triton / CUDA C++ /
# CUTLASS / Mojo / Pallas / CuTe. Any one of those toolchains can
# emit text that contains a lone UTF-16 surrogate; json.dumps then
# raises UnicodeEncodeError and the migration write aborts. Scrub
# surrogates at the boundary.
safe_program_data = cast(
dict[str, Any], deep_sanitize_for_json(self.program_data)
)
return {
"source_run_id": self.source_run_id,
"program_id": self.program_id,
"program_data": json.dumps(self.program_data),
"program_data": json.dumps(safe_program_data),
"published_at": str(self.published_at),
"generation": str(self.generation),
}
Expand Down
8 changes: 6 additions & 2 deletions gigaevo/evolution/mutation/mutation_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from gigaevo.problems.context import ProblemContext
from gigaevo.programs.metrics.formatter import MetricsFormatter
from gigaevo.programs.program import Program
from gigaevo.utils.text_sanitize import sanitize_for_log

if TYPE_CHECKING:
from gigaevo.database.program_storage import ProgramStorage
Expand Down Expand Up @@ -95,7 +96,7 @@ def _canonicalize_code(code: str) -> str:
logger.warning(
"[LLMMutationOperator] Failed to canonicalize code due to syntax error: {}. "
"Returning original code.",
e,
sanitize_for_log(str(e)),
)
return code

Expand Down Expand Up @@ -156,7 +157,10 @@ async def mutate_single(
if structured_output:
mutation_metadata[MutationSpec.META_OUTPUT] = structured_output
archetype = result.get("archetype", "unknown")
logger.debug("[LLMMutationOperator] Mutation archetype: {}", archetype)
logger.debug(
"[LLMMutationOperator] Mutation archetype: {}",
sanitize_for_log(str(archetype)),
)
if result.get("changes"):
logger.debug(
"[LLMMutationOperator] Mutation returned {} tracked change(s)",
Expand Down
8 changes: 7 additions & 1 deletion gigaevo/llm/agents/insights.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator

from gigaevo.llm.agents.base import LangGraphAgent
from gigaevo.llm.models import MultiModelRouter
from gigaevo.programs.metrics.formatter import MetricsFormatter
from gigaevo.programs.program import OPTIMIZATION_STAGES, Program
from gigaevo.utils.text_sanitize import sanitize_for_log


class ProgramInsight(BaseModel):
Expand All @@ -24,6 +25,11 @@ class ProgramInsight(BaseModel):
tag: str = Field(description="Tag for the insight")
severity: str = Field(description="Severity of the insight")

@field_validator("type", "insight", "tag", "severity", mode="after")
@classmethod
def _scrub_text(cls, value: str) -> str:
return sanitize_for_log(value)


class ProgramInsights(BaseModel):
"""Collection of program insights."""
Expand Down
8 changes: 7 additions & 1 deletion gigaevo/llm/agents/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@

from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, field_validator

from gigaevo.llm.agents.base import LangGraphAgent
from gigaevo.llm.models import MultiModelRouter
from gigaevo.programs.metrics.formatter import MetricsFormatter
from gigaevo.programs.program import OPTIMIZATION_STAGES, Program
from gigaevo.utils.text_sanitize import sanitize_for_log


class TransitionInsight(BaseModel):
Expand All @@ -27,6 +28,11 @@ class TransitionInsight(BaseModel):
description="Specific explanation with evidence (≤30 words)"
)

@field_validator("strategy", "description", mode="after")
@classmethod
def _scrub_text(cls, value: str) -> str:
return sanitize_for_log(value)


class TransitionInsights(BaseModel):
"""Collection of transition insights."""
Expand Down
28 changes: 17 additions & 11 deletions gigaevo/llm/agents/memory_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from gigaevo.evolution.mutation.constants import MUTATION_CONTEXT_METADATA_KEY
from gigaevo.programs.program import Program
from gigaevo.utils.text_sanitize import sanitize_for_log

try:
from gigaevo.memory.runtime_config import (
Expand Down Expand Up @@ -71,7 +72,7 @@ def _resolve_memory_backend_class(use_api: bool) -> type[Any]:

def _create_memory_backend(self) -> Any | None:
if _RUNTIME_IMPORT_ERROR is not None:
message = (
message = sanitize_for_log(
"gigaevo.memory is unavailable"
f"{': ' + str(_RUNTIME_IMPORT_ERROR) if _RUNTIME_IMPORT_ERROR else ''}"
)
Expand Down Expand Up @@ -210,17 +211,19 @@ def _create_memory_backend(self) -> Any | None:
logger.info(
"[MemorySelectorAgent] Using memory backend "
"(class={}, use_api={}, namespace={}, channel={}, checkpoint={})",
type(memory).__module__,
sanitize_for_log(type(memory).__module__),
use_api,
namespace,
channel,
memory_dir,
sanitize_for_log(str(namespace)),
sanitize_for_log(str(channel)),
sanitize_for_log(str(memory_dir)),
)
return memory
except Exception as exc:
self._backend_error = str(exc)
safe_exc = sanitize_for_log(str(exc))
self._backend_error = safe_exc
logger.warning(
"[MemorySelectorAgent] Failed to initialize red memory backend: {}", exc
"[MemorySelectorAgent] Failed to initialize red memory backend: {}",
safe_exc,
)
return None

Expand Down Expand Up @@ -259,7 +262,7 @@ async def select(
if self.memory is None:
logger.warning(
"[MemorySelectorAgent] Memory backend unavailable: {}",
self._backend_error or "unknown error",
sanitize_for_log(self._backend_error or "unknown error"),
)
return MemorySelection(cards=[], card_ids=[])

Expand All @@ -280,7 +283,10 @@ async def select(
self._search_with_ids, query
)
except Exception as exc:
logger.warning("[MemorySelectorAgent] Red memory search failed: {}", exc)
logger.warning(
"[MemorySelectorAgent] Red memory search failed: {}",
sanitize_for_log(str(exc)),
)
return MemorySelection(cards=[], card_ids=[])

cards = self._parse_search_result(result_text, max_cards=max_cards)
Expand All @@ -295,7 +301,7 @@ async def select(
logger.debug(
"[MemorySelectorAgent] Selected {} memory idea(s) via red agent (ids={})",
len(cards),
card_ids,
[sanitize_for_log(cid) for cid in card_ids],
)
else:
logger.debug(
Expand Down Expand Up @@ -358,7 +364,7 @@ def _search_with_ids(self, query: str) -> tuple[str, list[str]]:
except Exception as exc:
logger.warning(
"[MemorySelectorAgent] Direct GAM research failed, falling back to plain search: {}",
exc,
sanitize_for_log(str(exc)),
)

assert self.memory is not None # caller checks self.memory before calling
Expand Down
49 changes: 40 additions & 9 deletions gigaevo/llm/agents/mutation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from loguru import logger
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator

from gigaevo.evolution.mutation.base import MutationSpec
from gigaevo.evolution.mutation.constants import (
Expand All @@ -18,6 +18,7 @@
from gigaevo.llm.agents.base import LangGraphAgent
from gigaevo.llm.models import MultiModelRouter, get_selected_model
from gigaevo.programs.program import Program
from gigaevo.utils.text_sanitize import sanitize_for_log

if TYPE_CHECKING:
from gigaevo.programs.metrics.context import MetricsContext
Expand All @@ -42,6 +43,14 @@ class MutationChange(BaseModel):
)
)

@field_validator("description", "explanation", mode="after")
@classmethod
def _scrub_text(cls, value: str) -> str:
# LLM-generated free-form text; sanitize so downstream log sinks,
# JSON encoders, and asyncpg TEXT columns never see ANSI escape
# sequences, BIDI overrides, lone UTF-16 surrogates, or NUL bytes.
return sanitize_for_log(value)


class MutationStructuredOutput(BaseModel):
"""Structured output from the mutation LLM.
Expand Down Expand Up @@ -76,6 +85,20 @@ class MutationStructuredOutput(BaseModel):
)
)

@field_validator("archetype", "justification", "code", mode="after")
@classmethod
def _scrub_text(cls, value: str) -> str:
# ``code`` is sanitized too — Python source has no legitimate use
# for ANSI/BIDI/C0-other-than-TAB-LF or NUL, and an LLM injecting
# one would otherwise break ast.parse() error formatting, log
# rendering, and asyncpg storage.
return sanitize_for_log(value)

@field_validator("insights_used", mode="after")
@classmethod
def _scrub_insights(cls, value: list[str]) -> list[str]:
return [sanitize_for_log(v) for v in value]


# Re-export from canonical location for backward compatibility
MUTATION_OUTPUT_METADATA_KEY = MutationSpec.META_OUTPUT
Expand Down Expand Up @@ -202,7 +225,9 @@ def _dump_prompt_to_file(
f.write(user)
f.write("\n")
except Exception as exc:
logger.debug(f"[MutationAgent] prompt dump failed: {exc}")
logger.debug(
"[MutationAgent] prompt dump failed: {}", sanitize_for_log(str(exc))
)

async def arun(self, input: list[Program], mutation_mode: str) -> dict:
"""Execute mutation agent.
Expand Down Expand Up @@ -258,8 +283,9 @@ async def acall_llm(self, state: MutationState) -> MutationState:
)

except Exception as e:
logger.error(f"[MutationAgent] Structured LLM call failed: {e}")
state["error"] = str(e)
safe_msg = sanitize_for_log(str(e))
logger.error("[MutationAgent] Structured LLM call failed: {}", safe_msg)
state["error"] = safe_msg
state["llm_response"] = None

return state
Expand Down Expand Up @@ -387,8 +413,10 @@ def parse_response(self, state: MutationState) -> MutationState:
model_used = state.get("metadata", {}).get("model_used")

if structured_output is None:
error_msg = state.get("error", "No structured output received")
logger.error(f"[MutationAgent] No structured output: {error_msg}")
error_msg = sanitize_for_log(
state.get("error", "No structured output received")
)
logger.error("[MutationAgent] No structured output: {}", error_msg)
state["parsed_output"] = {
"code": "",
"structured_output": None,
Expand Down Expand Up @@ -450,14 +478,17 @@ def parse_response(self, state: MutationState) -> MutationState:
)

except Exception as e:
logger.error(f"[MutationAgent] Failed to parse structured response: {e}")
state["error"] = str(e)
safe_msg = sanitize_for_log(str(e))
logger.error(
"[MutationAgent] Failed to parse structured response: {}", safe_msg
)
state["error"] = safe_msg
state["parsed_output"] = {
"code": "",
"structured_output": (
structured_output.model_dump() if structured_output else None
),
"error": str(e),
"error": safe_msg,
"model_used": model_used,
}

Expand Down
5 changes: 3 additions & 2 deletions gigaevo/llm/bandit.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import numpy as np

from gigaevo.llm.models import MultiModelRouter, _StructuredOutputRouter
from gigaevo.utils.text_sanitize import sanitize_for_log
from gigaevo.utils.trackers.base import LogWriter

if TYPE_CHECKING:
Expand Down Expand Up @@ -296,7 +297,7 @@ def on_mutation_outcome(
self._bandit.update_reward(model_name, normalized)
logger.debug(
"[BanditModelRouter] Reward for {} ({}): raw=0.0 norm={:.4f}",
model_name,
sanitize_for_log(str(model_name)),
outcome.value,
normalized,
)
Expand Down Expand Up @@ -326,7 +327,7 @@ def on_mutation_outcome(
self._bandit.update_reward(model_name, normalized)
logger.debug(
"[BanditModelRouter] Reward for {} ({}): raw={:.4f} norm={:.4f}",
model_name,
sanitize_for_log(str(model_name)),
outcome.value,
raw,
normalized,
Expand Down
Loading