Skip to content

Commit 5d4fb4c

Browse files
committed
Add dynamic node/task summaries to LangGraph plugin
Adds a per-node/per-task summary_fn(args, kwargs) -> str | None (and a plugin-wide default_summary_fn) that computes a Temporal summary at runtime from the node's input. - execute_in="activity" nodes: the result sets the activity summary (user_metadata, shown on each scheduled-activity event). - execute_in="workflow" nodes: the result updates the workflow's current details via workflow.set_current_details (last-writer-wins). A static summary activity option already flowed through to execute_activity; this is now documented. Setting both a static summary and summary_fn on the same node raises ValueError. summary_fn runs in workflow context (must be deterministic and must not raise) and is replay-safe, since summaries ride in user_metadata.
1 parent c37fa7e commit 5d4fb4c

5 files changed

Lines changed: 379 additions & 7 deletions

File tree

temporalio/contrib/langgraph/README.md

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,47 @@ await g.ainvoke({...}, context=Context(user_id="alice"))
143143

144144
Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity boundary.
145145

146+
## Summaries
147+
148+
Summaries are short, human-readable labels that show up in the Temporal UI and CLI, making it easier to see what each step of a run is doing.
149+
150+
### Static summary
151+
152+
`summary` is an ordinary Activity option, so a fixed per-node label works today — pass it like any other option:
153+
154+
```python
155+
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary": "Planning step"})
156+
```
157+
158+
It is attached to the node's scheduled-activity event (`execute_in="activity"` only).
159+
160+
### Dynamic summary (`summary_fn`)
161+
162+
To derive the label from the node's input at runtime, supply a `summary_fn`. It receives the node's `(args, kwargs)` and returns a summary string, or `None`/`""` for no summary. For a `StateGraph` node `args[0]` is the state; for a Functional `@task` it is the task's arguments.
163+
164+
```python
165+
def summarize(args, kwargs) -> str | None:
166+
state = args[0]
167+
return f"stage={state['stage']} doc={state['doc_id']}"
168+
169+
# Graph API: per-node
170+
g.add_node("plan", plan, metadata={"execute_in": "activity", "summary_fn": summarize})
171+
172+
# Functional API: per-task
173+
plugin = LangGraphPlugin(
174+
tasks=[plan],
175+
activity_options={"plan": {"execute_in": "activity", "summary_fn": summarize}},
176+
)
177+
178+
# Plugin-wide default, overridable per-node/per-task
179+
plugin = LangGraphPlugin(graphs={"g": g}, default_summary_fn=summarize)
180+
```
181+
182+
- For `execute_in="activity"` nodes the result sets the activity `summary` (one per scheduled-activity event, visible in history).
183+
- For `execute_in="workflow"` nodes there is no activity, so the result updates the workflow's current details via [`workflow.set_current_details()`](https://python.temporal.io/temporalio.workflow.html#set_current_details). This is a single workflow-level slot (last-writer-wins): it reflects the most recent workflow-bound node and is queryable via `__temporal_workflow_metadata`.
184+
185+
`summary_fn` runs in workflow context on every replay, so it **must be deterministic and must not raise** (an exception fails the workflow task). Setting both a static `summary` and a `summary_fn` on the same node raises `ValueError`; a static `summary` on a node takes precedence over `default_summary_fn`.
186+
146187
## Streaming
147188

148189
When `streaming_topic` is set on `LangGraphPlugin`, calls to `langgraph.config.get_stream_writer()` inside a node publish to the named topic on the workflow's [`WorkflowStream`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams). Activity-side nodes publish via `WorkflowStreamClient` (a signal carrying batched items, controlled by `streaming_batch_interval`); workflow-side nodes publish synchronously to the in-workflow stream (no signal). External subscribers consume the stream with `WorkflowStreamClient.create(...).topic(...).subscribe(...)`.

temporalio/contrib/langgraph/_activity.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def thread_safe_writer(value: Any) -> None:
109109
def wrap_execute_activity(
110110
afunc: Callable[[ActivityInput], Awaitable[ActivityOutput]],
111111
task_id: str = "",
112+
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
112113
**execute_activity_kwargs: Any,
113114
) -> Callable[..., Any]:
114115
"""Wrap an activity function to be called via workflow.execute_activity with caching."""
@@ -156,9 +157,15 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
156157
input = ActivityInput(
157158
args=args, kwargs=kwargs, langgraph_config=langgraph_config
158159
)
159-
output = await workflow.execute_activity(
160-
afunc, input, **execute_activity_kwargs
161-
)
160+
# Compute a dynamic activity summary (if configured) on the schedule
161+
# path only; a cache hit above returns before reaching here, so no
162+
# activity is scheduled and no summary is needed.
163+
call_kwargs = dict(execute_activity_kwargs)
164+
if summary_fn is not None:
165+
summary = summary_fn(args, kwargs)
166+
if summary:
167+
call_kwargs["summary"] = summary
168+
output = await workflow.execute_activity(afunc, input, **call_kwargs)
162169
if output.langgraph_interrupts is not None:
163170
raise GraphInterrupt(output.langgraph_interrupts)
164171

temporalio/contrib/langgraph/_plugin.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@
3535
_ACTIVITY_OPTION_KEYS: frozenset[str] = frozenset(
3636
{"execute_in", *inspect.signature(workflow.execute_activity).parameters}
3737
)
38+
# Node/task option keys beyond the raw execute_activity parameters:
39+
# 'summary_fn' is a callable consumed in the workflow (not a Temporal
40+
# option), so it must be split out of Graph API metadata too.
41+
_LANGGRAPH_OPTION_KEYS: frozenset[str] = _ACTIVITY_OPTION_KEYS | frozenset(
42+
{"summary_fn"}
43+
)
3844

3945

4046
class LangGraphPlugin(SimplePlugin):
@@ -70,6 +76,18 @@ class LangGraphPlugin(SimplePlugin):
7076
default_activity_options: Activity options applied to every
7177
activity-bound node and task, overridable per-node (Graph API
7278
``metadata``) or per-task (``activity_options[name]``).
79+
default_summary_fn: Callable applied to every node and task to
80+
compute a summary, overridable per-node (Graph API
81+
``metadata['summary_fn']``) or per-task
82+
(``activity_options[name]['summary_fn']``). It receives the
83+
node's ``(args, kwargs)`` and returns a summary string (or
84+
``None`` for no summary). For ``execute_in='activity'`` nodes
85+
the result sets the activity ``summary`` (shown on each
86+
scheduled-activity event); for ``execute_in='workflow'`` nodes
87+
it updates the workflow's current details (last-writer-wins).
88+
Must be deterministic and must not raise, as it runs in
89+
workflow context on every replay. Cannot be combined with a
90+
static ``summary`` on the same node.
7391
streaming_topic: When set, ``langgraph.config.get_stream_writer()``
7492
inside a node publishes to this topic on the workflow's
7593
:class:`WorkflowStream`. The workflow must construct
@@ -103,6 +121,8 @@ def __init__(
103121
# TODO: Remove activity_options when we have support for @task(metadata=...)
104122
activity_options: dict[str, dict[str, Any]] | None = None,
105123
default_activity_options: dict[str, Any] | None = None,
124+
default_summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None]
125+
| None = None,
106126
streaming_topic: str | None = None,
107127
streaming_batch_interval: timedelta = timedelta(milliseconds=100),
108128
):
@@ -133,6 +153,7 @@ def __init__(
133153
self.activities: list = []
134154
self._streaming_topic = streaming_topic
135155
self._streaming_batch_interval = streaming_batch_interval
156+
self._default_summary_fn = default_summary_fn
136157

137158
# Graph API: Wrap graph nodes as Temporal Activities.
138159
if graphs:
@@ -168,12 +189,14 @@ def __init__(
168189
# the node function via config["metadata"].
169190
node_meta = node.metadata or {}
170191
node_opts = {
171-
k: v for k, v in node_meta.items() if k in _ACTIVITY_OPTION_KEYS
192+
k: v
193+
for k, v in node_meta.items()
194+
if k in _LANGGRAPH_OPTION_KEYS
172195
}
173196
node.metadata = {
174197
k: v
175198
for k, v in node_meta.items()
176-
if k not in _ACTIVITY_OPTION_KEYS
199+
if k not in _LANGGRAPH_OPTION_KEYS
177200
}
178201
if "execute_in" not in node_opts:
179202
raise ValueError(
@@ -253,6 +276,16 @@ def execute(
253276
"""Prepare a node or task to execute as an activity or inline in the workflow."""
254277
opts = kwargs or {}
255278
execute_in = opts.pop("execute_in")
279+
node_summary_fn = opts.pop("summary_fn", None)
280+
if node_summary_fn is not None and opts.get("summary") is not None:
281+
raise ValueError(
282+
f"{activity_name}: set either 'summary' or 'summary_fn', not both."
283+
)
284+
# Per-node summary_fn wins; a static summary suppresses the plugin
285+
# default; otherwise fall back to the plugin-wide default_summary_fn.
286+
summary_fn = node_summary_fn or (
287+
None if opts.get("summary") is not None else self._default_summary_fn
288+
)
256289

257290
if execute_in == "activity":
258291
wrapped = wrap_activity(
@@ -262,9 +295,13 @@ def execute(
262295
)
263296
a = activity.defn(name=activity_name)(wrapped)
264297
self.activities.append(a)
265-
return wrap_execute_activity(a, task_id=task_id(func), **opts)
298+
return wrap_execute_activity(
299+
a, task_id=task_id(func), summary_fn=summary_fn, **opts
300+
)
266301
elif execute_in == "workflow":
267-
return wrap_workflow(func, streaming_topic=self._streaming_topic)
302+
return wrap_workflow(
303+
func, streaming_topic=self._streaming_topic, summary_fn=summary_fn
304+
)
268305
else:
269306
raise ValueError(f"Invalid execute_in value: {execute_in}")
270307

temporalio/contrib/langgraph/_workflow.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ def wrap_workflow(
2020
func: Callable[..., Any],
2121
*,
2222
streaming_topic: str | None = None,
23+
summary_fn: Callable[[tuple[Any, ...], dict[str, Any]], str | None] | None = None,
2324
) -> Callable[..., Awaitable[Any]]:
2425
"""Wrap a function as a workflow-side LangGraph node.
2526
@@ -28,9 +29,18 @@ def wrap_workflow(
2829
function with the writer installed. Workflow-side nodes publish
2930
synchronously to the in-workflow ``WorkflowStream`` (no signal
3031
round-trip); activity-side nodes go through ``WorkflowStreamClient``.
32+
33+
Workflow-side nodes have no activity to carry a summary, so a
34+
truthy ``summary_fn`` result updates the workflow's current details
35+
via :func:`temporalio.workflow.set_current_details` (last-writer-wins).
3136
"""
3237

3338
async def wrapper(*args: Any, **kwargs: Any) -> Any:
39+
if summary_fn is not None:
40+
summary = summary_fn(args, kwargs)
41+
if summary:
42+
workflow.set_current_details(summary)
43+
3444
async def run(stream_writer: Callable[[Any], None] | None) -> Any:
3545
token = None
3646
if stream_writer is not None:

0 commit comments

Comments
 (0)