Skip to content

Commit d577d20

Browse files
authored
Add history propagation (dapr#1025)
* Add history propagation Signed-off-by: Albert Callarisa <albert@diagrid.io> * Addressed comments Signed-off-by: Albert Callarisa <albert@diagrid.io> --------- Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent f5a2fe8 commit d577d20

30 files changed

Lines changed: 2662 additions & 244 deletions

examples/workflow/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,3 +536,26 @@ timeout_seconds: 60
536536
dapr run --app-id wf-pydantic-example -- python3 pydantic_models.py
537537
```
538538
<!--END_STEP-->
539+
540+
### History Propagation
541+
542+
This example demonstrates how a parent workflow can propagate its execution
543+
history to a child workflow and to an activity, and how the receivers query
544+
that history through `ctx.get_propagated_history()`.
545+
546+
It shows:
547+
- `propagation=PropagationScope.OWN_HISTORY` on a child workflow call —
548+
forwards the caller's events only.
549+
- `propagation=PropagationScope.LINEAGE` on an activity call — forwards the
550+
caller's events *plus* anything the caller itself received from its parent.
551+
- `PropagatedHistory.get_workflow_by_name(...)` and `WorkflowResult.get_activity_by_name(...)`
552+
on the receiving side.
553+
554+
> **Requires** a Dapr sidecar with workflow history propagation support
555+
> (durabletask-go PR #85 / runtime 1.18+ ). With an older sidecar the
556+
> propagation field is silently dropped and `get_propagated_history()`
557+
> returns `None`.
558+
559+
```sh
560+
dapr run --app-id workflow-history-propagation -- python3 history_propagation.py
561+
```
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
"""History propagation example.
14+
15+
The parent workflow runs a couple of activities, then calls a child workflow
16+
with ``propagation=PropagationScope.OWN_HISTORY`` and an activity with
17+
``propagation=PropagationScope.LINEAGE``. The child workflow and the
18+
downstream activity read the parent's recorded history via
19+
``ctx.get_propagated_history()`` and inspect specific events by name.
20+
21+
This requires a Dapr sidecar built with history propagation enabled
22+
(durabletask-go PR #85 and later). With an older sidecar, the propagation
23+
field is silently dropped and ``get_propagated_history()`` returns ``None``.
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import json
29+
30+
import dapr.ext.workflow as wf
31+
32+
wfr = wf.WorkflowRuntime()
33+
34+
35+
@wfr.activity(name='validate_merchant')
36+
def validate_merchant(ctx: wf.WorkflowActivityContext, merchant_id: str) -> dict:
37+
print(f'*** validating merchant {merchant_id}', flush=True)
38+
return {'merchant_id': merchant_id, 'valid': True}
39+
40+
41+
@wfr.activity(name='log_summary')
42+
def log_summary(ctx: wf.WorkflowActivityContext, _: None) -> str:
43+
"""Activity that reads the parent workflow's propagated history."""
44+
history = ctx.get_propagated_history()
45+
if history is None:
46+
print('*** log_summary: no propagated history (sidecar may not support it)', flush=True)
47+
return 'no-history'
48+
49+
workflows = history.get_workflows()
50+
if not workflows:
51+
print('*** log_summary: propagated history has no workflows', flush=True)
52+
return 'empty-history'
53+
54+
parent = workflows[-1]
55+
try:
56+
validate = parent.get_activity_by_name('validate_merchant')
57+
except wf.PropagationNotFoundError:
58+
print('*** log_summary: parent did not run validate_merchant', flush=True)
59+
return 'parent-missing-validate'
60+
61+
print(
62+
f'*** log_summary saw parent on app {parent.app_id} '
63+
f'with validate_merchant -> completed={validate.completed} output={validate.output}',
64+
flush=True,
65+
)
66+
return 'logged'
67+
68+
69+
@wfr.workflow(name='process_payment')
70+
def process_payment(ctx: wf.DaprWorkflowContext, _: None):
71+
"""Child workflow: introspect the parent's history before deciding."""
72+
history = ctx.get_propagated_history()
73+
if history is None:
74+
print('*** process_payment: no propagated history', flush=True)
75+
return 'no-history'
76+
77+
workflows = history.get_workflows()
78+
if not workflows:
79+
print('*** process_payment: propagated history has no workflows', flush=True)
80+
return 'empty-history'
81+
82+
parent = workflows[-1]
83+
try:
84+
validate = parent.get_activity_by_name('validate_merchant')
85+
except wf.PropagationNotFoundError:
86+
print('*** process_payment: parent did not run validate_merchant', flush=True)
87+
return 'parent-missing-validate'
88+
89+
if not validate.completed:
90+
print('*** process_payment: parent validate_merchant is not complete yet', flush=True)
91+
return 'parent-incomplete'
92+
93+
merchant = json.loads(validate.output or '{}')
94+
print(
95+
f'*** process_payment received parent context for merchant {merchant.get("merchant_id")!r}',
96+
flush=True,
97+
)
98+
return 'paid'
99+
100+
101+
@wfr.workflow(name='merchant_checkout')
102+
def merchant_checkout(ctx: wf.DaprWorkflowContext, merchant_id: str):
103+
"""Parent workflow: runs an activity, then propagates its history."""
104+
yield ctx.call_activity(validate_merchant, input=merchant_id)
105+
106+
child_result = yield ctx.call_child_workflow(
107+
process_payment,
108+
input=None,
109+
propagation=wf.PropagationScope.OWN_HISTORY,
110+
)
111+
print(f'*** child workflow result: {child_result}', flush=True)
112+
113+
audit = yield ctx.call_activity(
114+
log_summary,
115+
input=None,
116+
propagation=wf.PropagationScope.LINEAGE,
117+
)
118+
print(f'*** audit activity result: {audit}', flush=True)
119+
return {'child': child_result, 'audit': audit}
120+
121+
122+
if __name__ == '__main__':
123+
wfr.start()
124+
125+
wf_client = wf.DaprWorkflowClient()
126+
instance_id = wf_client.schedule_new_workflow(workflow=merchant_checkout, input='merchant-42')
127+
128+
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
129+
print(
130+
f'*** workflow completed: status={state.runtime_status.name} output={state.serialized_output}',
131+
flush=True,
132+
)
133+
134+
wfr.shutdown()

ext/dapr-ext-workflow/dapr/ext/workflow/__init__.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@
1717
from dapr.ext.workflow._durabletask.task import TaskFailedError
1818
from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient
1919
from dapr.ext.workflow.dapr_workflow_context import DaprWorkflowContext, when_all, when_any
20+
from dapr.ext.workflow.propagation import (
21+
ActivityResult,
22+
ChildWorkflowResult,
23+
PropagatedHistory,
24+
PropagationNotFoundError,
25+
PropagationScope,
26+
WorkflowResult,
27+
)
2028
from dapr.ext.workflow.retry_policy import RetryPolicy
2129
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
2230
from dapr.ext.workflow.workflow_runtime import WorkflowRuntime, alternate_name
@@ -34,4 +42,10 @@
3442
'alternate_name',
3543
'RetryPolicy',
3644
'TaskFailedError',
45+
'PropagationScope',
46+
'PropagatedHistory',
47+
'PropagationNotFoundError',
48+
'WorkflowResult',
49+
'ActivityResult',
50+
'ChildWorkflowResult',
3751
]

ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/attestation_pb2.py

Lines changed: 49 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)