Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1053476
feat(events): add graph traversal events
laipz8200 May 15, 2026
0364a62
feat(filters): add graph event filter chain
laipz8200 May 15, 2026
63cd51b
feat(engine): emit graph traversal events
laipz8200 May 15, 2026
226c263
refactor(engine): emit raw graph events
laipz8200 May 15, 2026
2ec4ac9
feat(filters): add response stream filter
laipz8200 May 15, 2026
7645a88
refactor(runtime): remove response coordinator state
laipz8200 May 15, 2026
7cd95ab
docs(filters): document explicit event filters
laipz8200 May 15, 2026
fc0b073
docs(examples): add event filter verification example
laipz8200 May 15, 2026
2cb9849
fix(examples): use supported OpenAI slim model
laipz8200 May 15, 2026
d2ef1e9
docs(examples): print event filter chunks live
laipz8200 May 15, 2026
0bebfe3
docs(examples): fold event filter into dsl example
laipz8200 May 15, 2026
2976e42
docs(examples): keep dsl output plain
laipz8200 May 15, 2026
7a22aa0
chore: address review comments
laipz8200 May 15, 2026
90e56d4
chore: ignore local editor config
laipz8200 May 15, 2026
d5de633
docs(layers): remove event filter comparison
laipz8200 May 15, 2026
96327f2
feat(filters): expose public filter exports
laipz8200 May 15, 2026
685cfe3
fix(filters): satisfy protocol contracts after rebase
laipz8200 May 18, 2026
4736715
fix(filters): harden response stream edge cases
WH-2099 May 21, 2026
5bb76e9
fix(filters): defer response stream state restore
WH-2099 May 21, 2026
f7016d4
refactor(events): simplify event filter state
WH-2099 May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dist/

# Plugin Daemon Slim
.slim

# Unit test / coverage reports
htmlcov/
.tox/
Expand All @@ -30,3 +31,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
cover/

# IDE configs
.idea/
.vscode/
2 changes: 1 addition & 1 deletion examples/slim_llm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ start -> llm -> answer
## Files

- `graph.yml`: the DSL graph
- `dsl.py`: imports `graph.yml` with `graphon.dsl.loads()`
- `dsl.py`: imports `graph.yml` and streams response events with `ResponseStreamFilter`
- `code.py`: builds the graph with Python code
- `settings.py`: shared credentials and Slim setup
- `credentials.example.json`: credentials template
Expand Down
47 changes: 42 additions & 5 deletions examples/slim_llm/dsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import argparse
import sys
from collections.abc import Callable
from pathlib import Path
from typing import Any

if __package__ in {None, ""}:
sys.path.insert(0, str(Path(__file__).resolve().parents[2]))
Expand All @@ -15,10 +15,20 @@
use_local_slim_binary,
)
from graphon.dsl import loads
from graphon.filters import (
GraphEventFilterContext,
ResponseStreamFilter,
filter_graph_events,
)
from graphon.graph_events.graph import GraphRunSucceededEvent
from graphon.graph_events.node import NodeRunStreamChunkEvent


def run(query: str) -> str:
def run(
query: str,
*,
on_stream_chunk: Callable[[str], None] | None = None,
) -> str:
use_local_slim_binary()
engine = loads(
GRAPH_FILE.read_text(encoding="utf-8"),
Expand All @@ -27,8 +37,19 @@ def run(query: str) -> str:
start_inputs={"query": query},
)

events = list(engine.run())
final_event: Any = events[-1] if events else None
events = filter_graph_events(
engine.run(),
context=GraphEventFilterContext.from_engine(engine),
filters=[ResponseStreamFilter()],
)
final_event: GraphRunSucceededEvent | None = None
for event in events:
if isinstance(event, NodeRunStreamChunkEvent):
if on_stream_chunk is not None:
on_stream_chunk(event.chunk)
elif isinstance(event, GraphRunSucceededEvent):
final_event = event

if not isinstance(final_event, GraphRunSucceededEvent):
msg = f"Workflow did not succeed: {type(final_event).__name__}"
raise TypeError(msg)
Expand All @@ -45,7 +66,23 @@ def main() -> int:
parser.add_argument("query", nargs="?", default=DEFAULT_QUERY)
args = parser.parse_args()

sys.stdout.write(f"{run(args.query)}\n")
saw_stream = False
last_chunk = ""

def write_stream_chunk(chunk: str) -> None:
nonlocal last_chunk, saw_stream
saw_stream = True
last_chunk = chunk
sys.stdout.write(chunk)
sys.stdout.flush()

answer = run(args.query, on_stream_chunk=write_stream_chunk)
if saw_stream:
if not last_chunk.endswith("\n"):
sys.stdout.write("\n")
else:
sys.stdout.write(f"{answer}\n")

return 0


Expand Down
2 changes: 1 addition & 1 deletion examples/slim_llm/graph.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ graph:
title: OpenAI
model:
provider: langgenius/openai/openai
name: gpt-5.4
name: gpt-4o-mini
mode: chat
completion_params: {}
prompt_template:
Expand Down
2 changes: 1 addition & 1 deletion examples/slim_llm/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"592c8252795b5f75807de2d609a03196ed02596b409f7642b4a07548c7ff57ef"
)
OPENAI_PROVIDER = "openai"
OPENAI_MODEL = "gpt-5.4"
OPENAI_MODEL = "gpt-4o-mini"
DEFAULT_QUERY = "Reply with only the word Graphon."


Expand Down
15 changes: 15 additions & 0 deletions src/graphon/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from graphon.graph_engine.filters import (
GraphEventFilter,
GraphEventFilterContext,
ResponseStreamFilter,
ResumableGraphEventFilter,
filter_graph_events,
)

__all__ = [
"GraphEventFilter",
"GraphEventFilterContext",
"ResponseStreamFilter",
"ResumableGraphEventFilter",
"filter_graph_events",
]
17 changes: 16 additions & 1 deletion src/graphon/graph_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
from .config import GraphEngineConfig
from .filters import (
GraphEventFilter,
GraphEventFilterContext,
ResponseStreamFilter,
ResumableGraphEventFilter,
filter_graph_events,
)
from .graph_engine import GraphEngine

__all__ = ["GraphEngine", "GraphEngineConfig"]
__all__ = [
"GraphEngine",
"GraphEngineConfig",
"GraphEventFilter",
"GraphEventFilterContext",
"ResponseStreamFilter",
"ResumableGraphEventFilter",
"filter_graph_events",
]
49 changes: 14 additions & 35 deletions src/graphon/graph_engine/event_management/event_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
from graphon.runtime.graph_runtime_state import (
GraphExecutionProtocol,
GraphRuntimeState,
ResponseStreamCoordinatorProtocol,
)

from ..error_handler import ErrorHandler
Expand All @@ -60,7 +59,6 @@ def __init__(
graph: Graph,
graph_runtime_state: GraphRuntimeState,
graph_execution: GraphExecutionProtocol,
response_coordinator: ResponseStreamCoordinatorProtocol,
event_collector: EventManager,
edge_processor: EdgeProcessor,
state_manager: GraphStateManager,
Expand All @@ -72,7 +70,6 @@ def __init__(
graph: The workflow graph
graph_runtime_state: Runtime state with variable pool
graph_execution: Graph execution aggregate
response_coordinator: Response stream coordinator
event_collector: Event manager for collecting events
edge_processor: Edge processor for edge traversal
state_manager: Unified state manager
Expand All @@ -82,7 +79,6 @@ def __init__(
self._graph = graph
self._graph_runtime_state = graph_runtime_state
self._graph_execution = graph_execution
self._response_coordinator = response_coordinator
self._event_collector = event_collector
self._edge_processor = edge_processor
self._state_manager = state_manager
Expand Down Expand Up @@ -144,9 +140,6 @@ def _(self, event: NodeRunStartedEvent) -> None:
node_execution.mark_started(event.id)
self._graph_runtime_state.increment_node_run_steps()

# Track in response coordinator for stream ordering
self._response_coordinator.track_node_execution(event.node_id, event.id)

# Collect the event only for the first attempt; retries remain silent
if is_initial_attempt:
self._event_collector.collect(event)
Expand All @@ -159,12 +152,7 @@ def _(self, event: NodeRunStreamChunkEvent) -> None:
event: The stream chunk event

"""
# Process with response coordinator
streaming_events = list(self._response_coordinator.intercept_event(event))

# Collect all events
for stream_event in streaming_events:
self._event_collector.collect(stream_event)
self._event_collector.collect(event)

@_dispatch.register
def _(self, event: NodeRunVariableUpdatedEvent) -> None:
Expand Down Expand Up @@ -201,27 +189,20 @@ def _(self, event: NodeRunSucceededEvent) -> None:
# Store outputs in variable pool
self._store_node_outputs(event.node_id, event.node_run_result.outputs)

# Forward to response coordinator and emit streaming events
streaming_events = self._response_coordinator.intercept_event(event)
for stream_event in streaming_events:
self._event_collector.collect(stream_event)

# Process edges and get ready nodes
node = self._graph.nodes[event.node_id]
if node.execution_type == NodeExecutionType.BRANCH:
ready_nodes, edge_streaming_events = (
self._edge_processor.handle_branch_completion(
event.node_id,
event.node_run_result.edge_source_handle,
)
ready_nodes, edge_events = self._edge_processor.handle_branch_completion(
event.node_id,
event.node_run_result.edge_source_handle,
)
else:
ready_nodes, edge_streaming_events = (
self._edge_processor.process_node_success(event.node_id)
ready_nodes, edge_events = self._edge_processor.process_node_success(
event.node_id
)

# Collect streaming events from edge processing
for edge_event in edge_streaming_events:
# Collect traversal events from edge processing
for edge_event in edge_events:
self._event_collector.collect(edge_event)

# Enqueue ready nodes
Expand Down Expand Up @@ -304,21 +285,19 @@ def _(self, event: NodeRunExceptionEvent) -> None:
node = self._graph.nodes[event.node_id]

if node.error_strategy == ErrorStrategy.DEFAULT_VALUE:
ready_nodes, edge_streaming_events = (
self._edge_processor.process_node_success(event.node_id)
ready_nodes, edge_events = self._edge_processor.process_node_success(
event.node_id
)
elif node.error_strategy == ErrorStrategy.FAIL_BRANCH:
ready_nodes, edge_streaming_events = (
self._edge_processor.handle_branch_completion(
event.node_id,
event.node_run_result.edge_source_handle,
)
ready_nodes, edge_events = self._edge_processor.handle_branch_completion(
event.node_id,
event.node_run_result.edge_source_handle,
)
else:
msg = f"Unsupported error strategy: {node.error_strategy}"
raise NotImplementedError(msg)

for edge_event in edge_streaming_events:
for edge_event in edge_events:
self._event_collector.collect(edge_event)

for node_id in ready_nodes:
Expand Down
15 changes: 15 additions & 0 deletions src/graphon/graph_engine/filters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from graphon.graph_engine.filters.base import (
GraphEventFilter,
GraphEventFilterContext,
ResumableGraphEventFilter,
)
from graphon.graph_engine.filters.chain import filter_graph_events
from graphon.graph_engine.filters.response_stream import ResponseStreamFilter

__all__ = [
"GraphEventFilter",
"GraphEventFilterContext",
"ResponseStreamFilter",
"ResumableGraphEventFilter",
"filter_graph_events",
]
70 changes: 70 additions & 0 deletions src/graphon/graph_engine/filters/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Protocol

from graphon.graph.graph import Graph
from graphon.graph_events.base import GraphEngineEvent
from graphon.runtime.graph_runtime_state_protocol import ReadOnlyGraphRuntimeState
from graphon.runtime.read_only_wrappers import ReadOnlyGraphRuntimeStateWrapper

if TYPE_CHECKING:
from graphon.graph_engine.graph_engine import GraphEngine


@dataclass(frozen=True)
class GraphEventFilterContext:
"""Run-scoped context available to graph event filters."""

graph: Graph
runtime_state: ReadOnlyGraphRuntimeState

@classmethod
def from_engine(cls, engine: GraphEngine) -> GraphEventFilterContext:
return cls(
graph=engine.graph,
runtime_state=ReadOnlyGraphRuntimeStateWrapper(
engine.graph_runtime_state,
),
)


class GraphEventFilter(Protocol):
"""Event-to-event transform used outside GraphEngine execution."""

@property
@abstractmethod
def filter_id(self) -> str:
"""Stable identifier for diagnostics and external state storage."""
raise NotImplementedError

@abstractmethod
def initialize(self, context: GraphEventFilterContext) -> None:
"""Bind run-scoped context before events are processed."""
raise NotImplementedError

@abstractmethod
def on_event(self, event: GraphEngineEvent) -> Iterable[GraphEngineEvent]:
"""Transform one input event into zero or more output events."""
raise NotImplementedError

@abstractmethod
def flush(self) -> Iterable[GraphEngineEvent]:
"""Emit buffered events after the upstream source is exhausted."""
raise NotImplementedError


class ResumableGraphEventFilter(GraphEventFilter, Protocol):
"""Optional filter protocol for output-layer resume state."""

@abstractmethod
def dumps(self) -> str:
"""Serialize this filter's private state."""
raise NotImplementedError

@abstractmethod
def loads(self, data: str) -> None:
"""Restore this filter's private state."""
raise NotImplementedError
Loading
Loading