1010custom `ProjectionStrategy` for the parent ↔ subgraph boundary, and the
1111`merge` reducer for dict accumulation.
1212
13- This is the second demo in the series. It exercises three graph features
14- that `01-linear-pipeline/` didn't:
13+ Three graph features that `00-hello-world` only touched lightly:
1514
1615 1. **Conditional edges.** The entry node classifies the question and the
1716 graph routes to one of two branches based on that classification.
2221 from its own schema's defaults. To pass the user's question in (and
2322 shape what comes back out), we write a `ProjectionStrategy` by hand.
2423
25- And for good measure it also demonstrates the **merge reducer** for dict
26- accumulation, alongside the **append reducer** already seen in 01-linear-pipeline.
24+ LLM calls go through ``openarmature.llm.OpenAIProvider`` (same pattern as
25+ ``00-hello-world``) so the example reads as the recommended path rather
26+ than as "openai with some openarmature on top."
27+
28+ **Configuration** (env vars; OpenAI defaults shown):
29+
30+ - ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root
31+ only** — the provider adds the path itself.
32+ - ``LLM_MODEL`` defaults to ``gpt-4o-mini``.
33+ - ``LLM_API_KEY`` required (empty for local servers that don't authenticate).
2734
2835Run with:
29- uv run python main.py "what year did the moon landing happen" # → quick branch
30- uv run python main.py "is espresso actually more caffeinated than drip?" # → research branch
36+
37+ uv sync --group examples
38+ cd examples/01-routing-and-subgraphs
39+ LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen"
40+ LLM_API_KEY=sk-... uv run python main.py "is espresso actually more caffeinated than drip?"
3141"""
3242
3343from __future__ import annotations
3444
3545import asyncio
46+ import os
3647import sys
3748from collections .abc import Mapping
3849from typing import Annotated , Any
3950
40- from openai import AsyncOpenAI
41- from openai .types .chat import (
42- ChatCompletionMessageParam ,
43- ChatCompletionSystemMessageParam ,
44- ChatCompletionUserMessageParam ,
45- )
4651from pydantic import Field
4752
4853from openarmature .graph import (
5459 append ,
5560 merge ,
5661)
62+ from openarmature .llm import OpenAIProvider , SystemMessage , UserMessage
63+
64+ # Lazy-initialized so importing this module (test harnesses, doc builders,
65+ # IDE inspection) doesn't open an httpx.AsyncClient connection pool.
66+ _provider_instance : OpenAIProvider | None = None
5767
58- VLLM_BASE_URL = "http://localhost:8000/v1"
59- MODEL = "dark-side-of-the-code/Mistral-Small-24B-Instruct-2501-AWQ"
6068
61- client = AsyncOpenAI (base_url = VLLM_BASE_URL , api_key = "not-needed" )
69+ def _get_provider () -> OpenAIProvider :
70+ global _provider_instance
71+ if _provider_instance is None :
72+ _provider_instance = OpenAIProvider (
73+ base_url = os .environ .get ("LLM_BASE_URL" , "https://api.openai.com" ),
74+ model = os .environ .get ("LLM_MODEL" , "gpt-4o-mini" ),
75+ api_key = os .environ .get ("LLM_API_KEY" ) or None ,
76+ )
77+ return _provider_instance
6278
6379
6480# ----------------------------------------------------------------------------
7793# - Boundaries are auditable. To find "what does the subgraph see?" you read
7894# one projection class, not a scattered naming convention.
7995#
80- # Both schemas below use the same reducer patterns we introduced in
81- # 01-linear-pipeline: `append` on a ` trace` list, `merge` on a dict. Fields without
82- # an `Annotated[..., reducer]` get `last_write_wins` by default.
96+ # Both schemas below use the standard reducer set: `append` on the
97+ # ` trace` list, `merge` on a dict. Fields without an
98+ # `Annotated[..., reducer]` get `last_write_wins` by default.
8399
84100
85101class AssistantState (State ):
@@ -103,30 +119,27 @@ class ResearchState(State):
103119
104120
105121# ----------------------------------------------------------------------------
106- # LLM helper (not openarmature — plumbing)
122+ # LLM helper
107123# ----------------------------------------------------------------------------
124+ # Thin wrapper over Provider.complete that takes a system + user pair and
125+ # returns the assistant's reply as a string. Keeps the node bodies focused
126+ # on graph logic (state in → state update out) rather than provider
127+ # plumbing. Production code would typically inline the call.
108128
109129
110130async def _chat (system : str , user : str ) -> str :
111- messages : list [ChatCompletionMessageParam ] = [
112- ChatCompletionSystemMessageParam (role = "system" , content = system ),
113- ChatCompletionUserMessageParam (role = "user" , content = user ),
114- ]
115- resp = await client .chat .completions .create (
116- model = MODEL ,
117- messages = messages ,
118- temperature = 0.3 ,
119- stream = False ,
131+ response = await _get_provider ().complete (
132+ [SystemMessage (content = system ), UserMessage (content = user )],
120133 )
121- return (resp . choices [ 0 ] .message .content or "" ).strip ()
134+ return (response .message .content or "" ).strip ()
122135
123136
124137# ----------------------------------------------------------------------------
125138# Outer-graph nodes
126139# ----------------------------------------------------------------------------
127- # Every node is the same shape as in 01-linear-pipeline : `async def(state) -> dict`,
128- # returning ONLY the fields it wants to change. The engine applies per-field
129- # reducers and re-validates.
140+ # Standard node shape: `async def(state) -> dict`, returning ONLY the
141+ # fields it wants to change. The engine applies per-field reducers and
142+ # re-validates.
130143#
131144# Three things worth noticing as you read these:
132145#
@@ -334,9 +347,8 @@ def build_research_subgraph() -> CompiledGraph[ResearchState]:
334347#
335348# - `project_in`: DELIBERATELY LIMITED. It builds a fresh subgraph state
336349# from its schema's defaults — `subgraph_state_cls()`. The parent's
337- # state is ignored. This is the spec's default and it's an explicit
338- # design choice (v0.1.1 §2 Subgraph): subgraphs don't see the outer
339- # world unless the author opts in.
350+ # state is ignored. Subgraphs don't see the outer world unless the
351+ # author opts in — encapsulation is the point.
340352#
341353# For this demo we absolutely need the question in the subgraph. So we write
342354# a projection class that implements the `ProjectionStrategy` Protocol (see
@@ -444,15 +456,19 @@ def build_graph() -> CompiledGraph[AssistantState]:
444456async def main () -> None :
445457 question = " " .join (sys .argv [1 :]) or "is espresso actually more caffeinated than drip coffee?"
446458 graph = build_graph ()
447- final = await graph .invoke (AssistantState (question = question ))
448-
449- print (f"question: { final .question } " )
450- print (f"route: { final .route } " )
451- print ()
452- print (f"answer:\n { final .answer } " )
453- print ()
454- print (f"trace: { final .trace } " )
455- print (f"tallies: { final .tallies } " )
459+ try :
460+ final = await graph .invoke (AssistantState (question = question ))
461+ print (f"question: { final .question } " )
462+ print (f"route: { final .route } " )
463+ print ()
464+ print (f"answer:\n { final .answer } " )
465+ print ()
466+ print (f"trace: { final .trace } " )
467+ print (f"tallies: { final .tallies } " )
468+ finally :
469+ await graph .drain ()
470+ if _provider_instance is not None :
471+ await _provider_instance .aclose ()
456472
457473
458474if __name__ == "__main__" :
0 commit comments