Skip to content
Open
1 change: 1 addition & 0 deletions .local-home/.nvm
Submodule .nvm added at 62387b
33 changes: 33 additions & 0 deletions api/core/app/workflow/layers/iteration_variable_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

import logging
from typing import final, override

from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import GraphEngineEvent
from graphon.graph_events.node import NodeRunVariableUpdatedEvent
from graphon.runtime.variable_pool import VariablePool

logger = logging.getLogger(__name__)


@final
class IterationVariableSyncLayer(GraphEngineLayer):
"""Eagerly propagate variable mutations from a child engine to the parent pool."""

def __init__(self, parent_variable_pool: VariablePool) -> None:
super().__init__()
self._parent_variable_pool = parent_variable_pool

@override
def on_event(self, event: GraphEngineEvent) -> None:
if isinstance(event, NodeRunVariableUpdatedEvent):
self._parent_variable_pool.add(event.variable.selector, event.variable)

@override
def on_graph_start(self) -> None:
pass

@override
def on_graph_end(self, error: Exception | None) -> None:
pass
6 changes: 5 additions & 1 deletion api/core/workflow/workflow_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from core.app.apps.exc import GenerateTaskStoppedError
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom, build_dify_run_context
from core.app.file_access import DatabaseFileAccessController
from core.app.workflow.layers.iteration_variable_sync import IterationVariableSyncLayer
from core.app.workflow.layers.llm_quota import LLMQuotaLayer
from core.app.workflow.layers.observability import ObservabilityLayer
from core.workflow.node_factory import (
Expand Down Expand Up @@ -76,8 +77,9 @@ def build_child_engine(
variable_pool: VariablePool | None = None,
) -> GraphEngine:
"""Build a child engine with a fresh runtime state and only child-safe layers."""
uses_isolated_pool = variable_pool is not None
child_graph_runtime_state = GraphRuntimeState(
variable_pool=variable_pool if variable_pool is not None else parent_graph_runtime_state.variable_pool,
variable_pool=variable_pool if uses_isolated_pool else parent_graph_runtime_state.variable_pool,
start_at=time.perf_counter(),
execution_context=parent_graph_runtime_state.execution_context,
)
Expand Down Expand Up @@ -108,6 +110,8 @@ def build_child_engine(
child_engine_builder=self,
)
child_engine.layer(LLMQuotaLayer())
if uses_isolated_pool:
child_engine.layer(IterationVariableSyncLayer(parent_graph_runtime_state.variable_pool))
return child_engine


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionStatus
from graphon.graph_events.node import NodeRunVariableUpdatedEvent
from graphon.node_events.base import NodeRunResult
from graphon.runtime import VariablePool
from graphon.variables.types import SegmentType
from graphon.variables.variables import ArrayStringVariable

from core.app.workflow.layers.iteration_variable_sync import IterationVariableSyncLayer


def _make_array_variable(name: str, values: list[str]) -> ArrayStringVariable:
return ArrayStringVariable(
id="conv-var-1",
name=name,
value=values,
value_type=SegmentType.ARRAY_STRING,
selector=["conversation", name],
)


def _make_var_update_event(variable: ArrayStringVariable) -> NodeRunVariableUpdatedEvent:
return NodeRunVariableUpdatedEvent(
id="exec-1",
node_id="var-assigner",
node_type=BuiltinNodeTypes.VARIABLE_ASSIGNER,
node_run_result=NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED),
variable=variable,
)


class TestIterationVariableSyncLayer:
def test_variable_update_applied_to_parent_pool(self):
parent_pool = VariablePool()
parent_pool.add(["conversation", "list"], _make_array_variable("list", []))

layer = IterationVariableSyncLayer(parent_pool)
updated_var = _make_array_variable("list", ["file1.txt"])
layer.on_event(_make_var_update_event(updated_var))

result = parent_pool.get(["conversation", "list"])
assert result is not None
assert result.value == ["file1.txt"]

def test_sequential_appends_all_visible(self):
"""Simulate 4 sequential iterations each appending one filename."""
parent_pool = VariablePool()
parent_pool.add(["conversation", "list"], _make_array_variable("list", []))

layer = IterationVariableSyncLayer(parent_pool)
filenames = ["a.txt", "b.txt", "c.txt", "d.txt"]

for name in filenames:
current = parent_pool.get(["conversation", "list"])
assert current is not None
new_values = list(current.value) + [name]

layer.on_event(_make_var_update_event(_make_array_variable("list", new_values)))

snapshot = parent_pool.model_copy(deep=True)
snapshot_var = snapshot.get(["conversation", "list"])
assert snapshot_var is not None
assert snapshot_var.value == new_values

final = parent_pool.get(["conversation", "list"])
assert final is not None
assert final.value == ["a.txt", "b.txt", "c.txt", "d.txt"]

def test_non_variable_events_are_ignored(self):
"""Events that are not variable-updates must be silently ignored."""
from graphon.graph_events.graph import GraphRunStartedEvent

parent_pool = VariablePool()
parent_pool.add(["conversation", "list"], _make_array_variable("list", ["x"]))

layer = IterationVariableSyncLayer(parent_pool)
layer.on_event(GraphRunStartedEvent())

result = parent_pool.get(["conversation", "list"])
assert result is not None
assert result.value == ["x"]
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def test_build_child_engine_constructs_graph_engine_with_quota_layer_only(self):
patch.object(workflow_entry, "GraphEngineConfig", return_value=sentinel.graph_engine_config),
patch.object(workflow_entry, "InMemoryChannel", return_value=sentinel.command_channel),
patch.object(workflow_entry, "LLMQuotaLayer", return_value=sentinel.llm_quota_layer),
patch.object(
workflow_entry, "IterationVariableSyncLayer", return_value=sentinel.iter_sync_layer
),
):
result = builder.build_child_engine(
workflow_id="workflow-id",
Expand Down Expand Up @@ -148,7 +151,10 @@ def test_build_child_engine_constructs_graph_engine_with_quota_layer_only(self):
config=sentinel.graph_engine_config,
child_engine_builder=builder,
)
assert child_engine.layer.call_args_list == [((sentinel.llm_quota_layer,), {})]
assert child_engine.layer.call_args_list == [
((sentinel.llm_quota_layer,), {}),
((sentinel.iter_sync_layer,), {}),
]

@pytest.mark.parametrize("node_cls", [_FakeLLMNode, _FakeQuestionClassifierNode])
def test_build_child_engine_runs_llm_quota_layer_for_child_model_nodes(self, node_cls):
Expand Down