Skip to content

Commit dce25a3

Browse files
committed
fix(api): filter Graphon workflow response events
Apply Graphon ResponseStreamFilter to workflow-based app runners so workflow execution consumes filtered graph events in both streaming and blocking paths. Pin Graphon to the PR revision while the change waits for the 0.5.0 release, and update unit coverage for the filtered event stream behavior.
1 parent 730a0be commit dce25a3

10 files changed

Lines changed: 76 additions & 10 deletions

File tree

api/core/app/apps/advanced_chat/app_runner.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,11 @@ def run(self):
246246
for layer in self._graph_engine_layers:
247247
workflow_entry.graph_engine.layer(layer)
248248

249-
generator = workflow_entry.run()
249+
generator = self._iter_workflow_events(
250+
workflow_entry,
251+
workflow_entry.run(),
252+
stream=self.application_generate_entity.stream,
253+
)
250254

251255
for event in generator:
252256
self._handle_event(workflow_entry, event)

api/core/app/apps/workflow/app_runner.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,11 @@ def run(self):
169169
for layer in self._graph_engine_layers:
170170
workflow_entry.graph_engine.layer(layer)
171171

172-
generator = workflow_entry.run()
172+
generator = self._iter_workflow_events(
173+
workflow_entry,
174+
workflow_entry.run(),
175+
stream=self.application_generate_entity.stream,
176+
)
173177

174178
for event in generator:
175179
self._handle_event(workflow_entry, event)

api/core/app/apps/workflow_app_runner.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import time
3-
from collections.abc import Mapping, Sequence
3+
from collections.abc import Iterable, Mapping, Sequence
44
from typing import Any, cast
55

66
from pydantic import ValidationError
@@ -51,6 +51,7 @@
5151
from core.workflow.workflow_run_outputs import project_node_outputs_for_workflow_run
5252
from graphon.entities.graph_config import NodeConfigDictAdapter
5353
from graphon.entities.pause_reason import HumanInputRequired
54+
from graphon.filters import GraphEventFilterContext, ResponseStreamFilter, filter_graph_events
5455
from graphon.graph import Graph
5556
from graphon.graph_engine.layers import GraphEngineLayer
5657
from graphon.graph_events import (
@@ -381,6 +382,21 @@ def _get_graph_and_variable_pool_for_single_node_run(
381382

382383
return graph, variable_pool
383384

385+
@staticmethod
386+
def _iter_workflow_events(
387+
workflow_entry: WorkflowEntry,
388+
events: Iterable[GraphEngineEvent],
389+
*,
390+
stream: bool,
391+
) -> Iterable[GraphEngineEvent]:
392+
_ = stream
393+
394+
return filter_graph_events(
395+
events,
396+
context=GraphEventFilterContext.from_engine(workflow_entry.graph_engine),
397+
filters=[ResponseStreamFilter()],
398+
)
399+
384400
@staticmethod
385401
def _build_agent_strategy_info(event: NodeRunStartedEvent) -> AgentStrategyInfo | None:
386402
raw_agent_strategy = event.extras.get("agent_strategy")

api/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ dify-trace-mlflow = { workspace = true }
9797
dify-trace-opik = { workspace = true }
9898
dify-trace-tencent = { workspace = true }
9999
dify-trace-weave = { workspace = true }
100+
graphon = { git = "https://github.com/langgenius/graphon.git", rev = "853bd461c42489a3fcf26cf36f2831ba03c444a6" }
100101

101102
[tool.uv]
102103
default-groups = ["storage", "tools", "vdb-all", "trace-all"]

api/tests/unit_tests/core/app/apps/advanced_chat/test_app_runner_conversation_variables.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def test_missing_conversation_variables_are_added(self):
100100
mock_app_generate_entity.single_iteration_run = None
101101
mock_app_generate_entity.single_loop_run = None
102102
mock_app_generate_entity.trace_manager = None
103+
mock_app_generate_entity.stream = False
103104

104105
# Create runner
105106
runner = AdvancedChatAppRunner(
@@ -245,6 +246,7 @@ def test_no_variables_creates_all(self):
245246
mock_app_generate_entity.single_iteration_run = None
246247
mock_app_generate_entity.single_loop_run = None
247248
mock_app_generate_entity.trace_manager = None
249+
mock_app_generate_entity.stream = False
248250

249251
# Create runner
250252
runner = AdvancedChatAppRunner(
@@ -405,6 +407,7 @@ def test_all_variables_exist_no_changes(self):
405407
mock_app_generate_entity.single_iteration_run = None
406408
mock_app_generate_entity.single_loop_run = None
407409
mock_app_generate_entity.trace_manager = None
410+
mock_app_generate_entity.stream = False
408411

409412
# Create runner
410413
runner = AdvancedChatAppRunner(

api/tests/unit_tests/core/app/apps/advanced_chat/test_app_runner_input_moderation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def build_runner():
6464
gen.single_iteration_run = None
6565
gen.single_loop_run = None
6666
gen.trace_manager = None
67+
gen.stream = False
6768

6869
runner = AdvancedChatAppRunner(
6970
application_generate_entity=gen,

api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,39 @@ def _graph_init(**kwargs):
234234
assert graph is not None
235235
assert variable_pool.get(["sys", "conversation_id"]).value == "conv-1"
236236

237+
@pytest.mark.parametrize("stream", [False, True])
238+
def test_iter_workflow_events_filters_response_stream(self, stream: bool):
239+
runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app")
240+
graph_runtime_state = GraphRuntimeState(
241+
variable_pool=VariablePool.from_bootstrap(system_variables=default_system_variables()),
242+
start_at=0.0,
243+
)
244+
workflow_entry = SimpleNamespace(
245+
graph_engine=SimpleNamespace(
246+
graph=SimpleNamespace(nodes={}),
247+
graph_runtime_state=graph_runtime_state,
248+
)
249+
)
250+
251+
events = iter(
252+
[
253+
GraphRunStartedEvent(),
254+
NodeRunStreamChunkEvent(
255+
id="exec",
256+
node_id="llm",
257+
node_type=BuiltinNodeTypes.LLM,
258+
selector=["llm", "text"],
259+
chunk="raw",
260+
is_final=False,
261+
),
262+
GraphRunSucceededEvent(outputs={"answer": "done"}),
263+
]
264+
)
265+
266+
filtered_events = list(runner._iter_workflow_events(workflow_entry, events, stream=stream))
267+
268+
assert [type(event) for event in filtered_events] == [GraphRunStartedEvent, GraphRunSucceededEvent]
269+
237270
def test_handle_graph_run_events_and_pause_notifications(self, monkeypatch: pytest.MonkeyPatch):
238271
published: list[object] = []
239272

api/tests/unit_tests/core/app/apps/test_workflow_app_runner_single_node.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def test_run_uses_single_node_execution_branch(
5353
app_generate_entity.trace_manager = None
5454
app_generate_entity.single_iteration_run = single_iteration_run
5555
app_generate_entity.single_loop_run = single_loop_run
56+
app_generate_entity.stream = False
5657

5758
workflow = MagicMock(spec=Workflow)
5859
workflow.tenant_id = "tenant"

api/tests/unit_tests/core/workflow/graph_engine/test_tool_in_chatflow.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from graphon.filters import GraphEventFilterContext, ResponseStreamFilter, filter_graph_events
12
from graphon.graph_engine import GraphEngine, GraphEngineConfig
23
from graphon.graph_engine.command_channels import InMemoryChannel
34
from graphon.graph_events import (
@@ -31,7 +32,13 @@ def test_tool_in_chatflow():
3132
config=GraphEngineConfig(),
3233
)
3334

34-
events = list(engine.run())
35+
events = list(
36+
filter_graph_events(
37+
engine.run(),
38+
context=GraphEventFilterContext.from_engine(engine),
39+
filters=[ResponseStreamFilter()],
40+
)
41+
)
3542

3643
# Check for successful completion
3744
success_events = [e for e in events if isinstance(e, GraphRunSucceededEvent)]

api/uv.lock

Lines changed: 2 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)