diff --git a/extropy/core/models/simulation.py b/extropy/core/models/simulation.py index 2a3ceaa..3ac2d79 100644 --- a/extropy/core/models/simulation.py +++ b/extropy/core/models/simulation.py @@ -357,6 +357,10 @@ class ReasoningResponse(BaseModel): position: str | None = Field( default=None, description="Classified position (filled by Pass 2)" ) + public_position: str | None = Field( + default=None, + description="Public-facing position when THINK/SAY diverges (high fidelity)", + ) sentiment: float | None = Field( default=None, description="Sentiment value (-1 to 1)" ) diff --git a/extropy/simulation/engine.py b/extropy/simulation/engine.py index 850efcc..e56b0da 100644 --- a/extropy/simulation/engine.py +++ b/extropy/simulation/engine.py @@ -926,7 +926,8 @@ def _process_reasoning_chunk( public_conviction = max(0.0, min(1.0, public_conviction)) public_will_share = response.will_share - public_position = response.position + candidate_public_position = response.public_position or response.position + public_position = candidate_public_position if ( old_public_conviction is not None @@ -934,8 +935,8 @@ def _process_reasoning_chunk( ): if ( old_public_position is not None - and response.position is not None - and old_public_position != response.position + and candidate_public_position is not None + and old_public_position != candidate_public_position ): new_conviction = ( public_conviction if public_conviction is not None else 0.0 @@ -943,7 +944,7 @@ def _process_reasoning_chunk( if new_conviction < _MODERATE_CONVICTION: logger.info( f"[CONVICTION] Agent {agent_id}: public flip from {old_public_position} " - f"to {response.position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " + f"to {candidate_public_position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " f"new conviction={float_to_conviction(public_conviction)})" ) public_position = old_public_position @@ -2041,6 +2042,7 @@ def _export_results(self) -> None: "population_size": len(self.agents), "strong_model": self.config.strong, "fast_model": self.config.fast, + "fidelity": self.config.fidelity, "seed": self.seed, "multi_touch_threshold": self.config.multi_touch_threshold, "completed_at": datetime.now().isoformat(), diff --git a/extropy/simulation/reasoning.py b/extropy/simulation/reasoning.py index 1cba318..4977078 100644 --- a/extropy/simulation/reasoning.py +++ b/extropy/simulation/reasoning.py @@ -754,6 +754,107 @@ def _sentiment_to_tone(sentiment: float) -> str: return "strongly opposed" +async def _classify_text_async( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, + rate_limiter: Any = None, +) -> tuple[dict[str, Any], TokenUsage]: + """Classify free text into structured outcomes using Pass 2 schema.""" + if not text.strip(): + return {}, TokenUsage() + + pass2_prompt = build_pass2_prompt(text, scenario) + usage = TokenUsage() + + for attempt in range(config.max_retries): + try: + if rate_limiter: + estimated_input = len(pass2_prompt) // 4 + estimated_output = 80 + await rate_limiter.routine.acquire( + estimated_input_tokens=estimated_input, + estimated_output_tokens=estimated_output, + ) + + call_start = time.time() + pass2_response, usage = await asyncio.wait_for( + simple_call_async( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + ), + timeout=20.0, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response), usage + except asyncio.TimeoutError: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + + return {}, usage + + +def _classify_text_sync( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, +) -> dict[str, Any]: + """Synchronous counterpart for classifying text into outcomes.""" + if not text.strip(): + return {} + + pass2_prompt = build_pass2_prompt(text, scenario) + for attempt in range(config.max_retries): + try: + call_start = time.time() + pass2_response = simple_call( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + log=True, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" + ) + + return {} + + # ============================================================================= # Two-pass reasoning (async) # ============================================================================= @@ -836,6 +937,7 @@ async def _reason_agent_two_pass_async( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -851,60 +953,40 @@ async def _reason_agent_two_pass_async( # === Pass 2: Classification (if needed) === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} - pass2_usage = TokenUsage() + pass2_usage_private = TokenUsage() + pass2_usage_public = TokenUsage() if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) - - for attempt in range(config.max_retries): - try: - if rate_limiter: - # Dynamic token estimate from prompt length - estimated_input = len(pass2_prompt) // 4 - estimated_output = 80 # classification is small - await rate_limiter.routine.acquire( - estimated_input_tokens=estimated_input, - estimated_output_tokens=estimated_output, - ) - - call_start = time.time() - pass2_response, pass2_usage = await asyncio.wait_for( - simple_call_async( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - ), - timeout=20.0, - ) - call_elapsed = time.time() - call_start - - logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + private_text = private_thought or reasoning + outcomes, pass2_usage_private = await _classify_text_async( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - if pass2_response: - outcomes = dict(pass2_response) - # Extract primary position from outcomes - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except asyncio.TimeoutError: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - # Pass 2 failure is non-fatal — we still have Pass 1 data - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) + if public_position is None: + public_position = position # Merge sentiment into outcomes for backwards compat if sentiment is not None: @@ -912,6 +994,7 @@ async def _reason_agent_two_pass_async( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -923,8 +1006,12 @@ async def _reason_agent_two_pass_async( actions=actions, pass1_input_tokens=pass1_usage.input_tokens, pass1_output_tokens=pass1_usage.output_tokens, - pass2_input_tokens=pass2_usage.input_tokens, - pass2_output_tokens=pass2_usage.output_tokens, + pass2_input_tokens=( + pass2_usage_private.input_tokens + pass2_usage_public.input_tokens + ), + pass2_output_tokens=( + pass2_usage_private.output_tokens + pass2_usage_public.output_tokens + ), ) @@ -1019,8 +1106,10 @@ async def _reason_agent_merged_async( # Extract position from outcomes position = None + public_position = None if position_outcome and position_outcome in response: position = response[position_outcome] + public_position = position # Build outcomes dict (everything except the Pass 1 fields) pass1_fields = { @@ -1038,8 +1127,27 @@ async def _reason_agent_merged_async( if sentiment is not None: outcomes["sentiment"] = sentiment + pass2_usage_public = TokenUsage() + pass2_schema = build_pass2_schema(scenario.outcomes) + if config.fidelity == "high" and pass2_schema: + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=config.fast or None, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] + + if public_position is None: + public_position = position + return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -1051,8 +1159,8 @@ async def _reason_agent_merged_async( actions=actions, pass1_input_tokens=usage.input_tokens, pass1_output_tokens=usage.output_tokens, - pass2_input_tokens=0, - pass2_output_tokens=0, + pass2_input_tokens=pass2_usage_public.input_tokens, + pass2_output_tokens=pass2_usage_public.output_tokens, ) @@ -1133,6 +1241,7 @@ def reason_agent( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -1143,41 +1252,37 @@ def reason_agent( # === Pass 2: Classification === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) + private_text = private_thought or reasoning classify_model = config.fast or None + outcomes = _classify_text_sync( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes = _classify_text_sync( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - for attempt in range(config.max_retries): - try: - call_start = time.time() - pass2_response = simple_call( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - log=True, - ) - call_elapsed = time.time() - call_start - - logger.info( - f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s" - ) - - if pass2_response: - outcomes = dict(pass2_response) - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" - ) + if public_position is None: + public_position = position if sentiment is not None: outcomes["sentiment"] = sentiment @@ -1189,6 +1294,7 @@ def reason_agent( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, diff --git a/tests/test_engine.py b/tests/test_engine.py index 7068bb3..796571c 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -298,6 +298,45 @@ def test_firm_agent_accepts_moderate_flip( assert final_state.public_position == "reject" assert final_state.position == "adopt" + def test_public_position_prefers_explicit_public_field( + self, + minimal_scenario, + simple_agents, + simple_network, + minimal_pop_spec, + tmp_path, + ): + """Public state should use response.public_position when provided.""" + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir=str(tmp_path / "output"), + ) + engine = SimulationEngine( + scenario=minimal_scenario, + population_spec=minimal_pop_spec, + agents=simple_agents, + network=simple_network, + config=config, + ) + + old_state = AgentState( + agent_id="a0", + position="undecided", + conviction=CONVICTION_MAP[ConvictionLevel.VERY_UNCERTAIN], + ) + response = _make_reasoning_response( + position="reject", + public_position="adopt", + conviction=CONVICTION_MAP[ConvictionLevel.MODERATE], + ) + + engine._process_reasoning_chunk( + timestep=1, results=[("a0", response)], old_states={"a0": old_state} + ) + + final_state = engine.state_manager.get_agent_state("a0") + assert final_state.public_position == "adopt" + class TestConvictionGatedSharing: """Test that very_uncertain agents don't share.""" @@ -1547,6 +1586,7 @@ def test_cost_in_meta_json( meta = json.load(f) assert "cost" in meta + assert meta["fidelity"] == config.fidelity cost = meta["cost"] assert cost["pivotal_input_tokens"] == 1_000_000 assert cost["pivotal_output_tokens"] == 500_000 diff --git a/tests/test_reasoning_execution.py b/tests/test_reasoning_execution.py new file mode 100644 index 0000000..496ec03 --- /dev/null +++ b/tests/test_reasoning_execution.py @@ -0,0 +1,197 @@ +"""Execution-path tests for two-pass reasoning.""" + +import asyncio +from datetime import datetime +from unittest.mock import AsyncMock, patch + +from extropy.core.llm import TokenUsage +from extropy.core.models import ExposureRecord, ReasoningContext, SimulationRunConfig +from extropy.core.models.scenario import ( + Event, + EventType, + ExposureChannel, + ExposureRule, + InteractionConfig, + InteractionType, + OutcomeConfig, + OutcomeDefinition, + OutcomeType, + ScenarioMeta, + ScenarioSpec, + SeedExposure, + SimulationConfig, + SpreadConfig, +) +from extropy.simulation.reasoning import _reason_agent_two_pass_async + + +def _make_scenario() -> ScenarioSpec: + return ScenarioSpec( + meta=ScenarioMeta( + name="test", + description="Test scenario", + population_spec="population.v1.yaml", + study_db="study.db", + population_id="default", + network_id="default", + created_at=datetime(2024, 1, 1), + ), + event=Event( + type=EventType.PRODUCT_LAUNCH, + content="A new policy is announced.", + source="City Hall", + credibility=0.9, + ambiguity=0.3, + emotional_valence=-0.1, + ), + seed_exposure=SeedExposure( + channels=[ + ExposureChannel( + name="broadcast", + description="Broadcast", + reach="broadcast", + credibility_modifier=1.0, + ) + ], + rules=[ + ExposureRule( + channel="broadcast", + timestep=0, + when="true", + probability=1.0, + ) + ], + ), + interaction=InteractionConfig( + primary_model=InteractionType.PASSIVE_OBSERVATION, + description="Observe", + ), + spread=SpreadConfig(share_probability=0.3), + outcomes=OutcomeConfig( + suggested_outcomes=[ + OutcomeDefinition( + name="adoption", + description="Primary stance", + type=OutcomeType.CATEGORICAL, + required=True, + options=["adopt", "reject", "wait"], + ) + ] + ), + simulation=SimulationConfig(max_timesteps=3), + ) + + +def _make_context() -> ReasoningContext: + return ReasoningContext( + agent_id="a0", + persona="I'm Alex, a resident in the city.", + event_content="A new policy is announced.", + exposure_history=[ + ExposureRecord( + timestep=0, + channel="broadcast", + content="A new policy is announced.", + credibility=0.9, + ) + ], + peer_opinions=[], + agent_name="Alex", + timestep=0, + timestep_unit="day", + ) + + +def test_two_pass_high_fidelity_classifies_private_and_public_positions_separately(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="high", + ) + + pass1_response = { + "reasoning": "I can see both sides, but I'm conflicted.", + "private_thought": "I privately reject this policy.", + "public_statement": "Let's stay open-minded and give it a chance.", + "reasoning_summary": "I'm conflicted.", + "sentiment": -0.2, + "conviction": 62, + "will_share": True, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=10, output_tokens=7)), + ({"adoption": "reject"}, TokenUsage(input_tokens=4, output_tokens=2)), + ({"adoption": "adopt"}, TokenUsage(input_tokens=5, output_tokens=3)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "reject" + assert response.public_position == "adopt" + assert response.pass2_input_tokens == 9 + assert response.pass2_output_tokens == 5 + assert mocked_call.await_count == 3 + assert "I privately reject this policy" in mocked_call.await_args_list[1].kwargs[ + "prompt" + ] + assert "give it a chance" in mocked_call.await_args_list[2].kwargs["prompt"] + + +def test_two_pass_medium_fidelity_uses_private_classification_for_public_position(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="medium", + ) + + pass1_response = { + "reasoning": "I am leaning toward waiting.", + "private_thought": "I should wait and watch.", + "public_statement": "I'm not sure yet.", + "reasoning_summary": "Leaning to wait.", + "sentiment": 0.0, + "conviction": 45, + "will_share": False, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=8, output_tokens=6)), + ({"adoption": "wait"}, TokenUsage(input_tokens=3, output_tokens=2)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "wait" + assert response.public_position == "wait" + assert mocked_call.await_count == 2