Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions tutorials/workflow/python/history-propagation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Dapr Workflow History Propagation — Patient Intake

This example demonstrates how Dapr workflows can propagate their execution
history to child workflows and activities, so downstream consumers can
inspect the full (or partial) execution context of their caller. See the
[Workflow history propagation](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-history-propagation/)
docs for the concept overview.

The scenario is a patient intake / e-prescribing pipeline: a compliance
audit and a pharmacy dispense step refuse to act unless they can see
proof — in the propagated history — that the required upstream checks
(insurance, allergies, drug interactions) actually ran.

## Workflow architecture

```
PatientIntake (workflow)
├── VerifyInsurance (activity, no propagation)
└── PrescribeMedication (child workflow, PropagationScope.LINEAGE)
├── CheckAllergies (activity, no propagation)
├── ScreenDrugInteractions (activity, no propagation)
├── ComplianceAudit (child workflow, PropagationScope.LINEAGE)
│ → sees PatientIntake + PrescribeMedication events
└── DispenseMedication (activity, PropagationScope.OWN_HISTORY)
→ sees PrescribeMedication events only
→ refuses to dispense if the screening lineage is missing
```

### Propagation scope

| Mode | What it sends | Use case |
|------|---------------|----------|
| `PropagationScope.LINEAGE` | Caller's own events + any ancestor events it received | Full chain-of-custody verification (compliance audits) |
| `PropagationScope.OWN_HISTORY` | Caller's own events only (no ancestor chain) | Trust boundary — downstream only sees the immediate caller (pharmacy dispense) |

### Scenarios

`ComplianceAudit` always runs with `PropagationScope.LINEAGE`, so it sees the
full ancestor chain — `VerifyInsurance` from PatientIntake plus `CheckAllergies`
and `ScreenDrugInteractions` from PrescribeMedication — and approves only when
every upstream check completed. The demo then runs `DispenseMedication` twice
to show the `OWN_HISTORY` trust boundary in action:

1. **Lineage forwarded → pharmacy dispenses.** `PrescribeMedication` calls
`DispenseMedication` with `PropagationScope.OWN_HISTORY`. The pharmacy sees
PrescribeMedication's screening events — but not the PatientIntake chain —
and fills the prescription.

2. **Lineage withheld → pharmacy refuses.** `PrescribeMedication` calls
`DispenseMedication` **without** history propagation. With no propagated
history to prove the prescription was screened, the pharmacy refuses to
dispense and returns a `refused` result explaining what was missing.

## Python API surface

```python
# Parent workflow — propagate LINEAGE when calling a child workflow
result = yield ctx.call_child_workflow(
compliance_audit,
input=rec_json,
propagation=wf.PropagationScope.LINEAGE,
)

# Parent workflow — propagate OWN_HISTORY when calling an activity
dispense = yield ctx.call_activity(
dispense_medication,
input=rec_json,
propagation=wf.PropagationScope.OWN_HISTORY,
)

# Child workflow (or activity) — read the propagated history
history = ctx.get_propagated_history() # PropagatedHistory | None

if history is not None:
intake_wf = history.get_last_workflow_by_name('PatientIntake')
insurance = intake_wf.get_last_activity_by_name('VerifyInsurance')
print(insurance.completed) # bool
print(insurance.output) # JSON string
```

Key symbols exported from `dapr.ext.workflow`:

- `PropagationScope` — enum with `LINEAGE` and `OWN_HISTORY`
- `PropagatedHistory` — top-level history object; `.get_workflows()`,
`.get_last_workflow_by_name(name)`, `.events`, `.scope`, `.get_app_ids()`
- `WorkflowResult` — per-workflow slice; `.get_last_activity_by_name(name)`
- `ActivityResult` — `.completed`, `.output`
- `PropagationNotFoundError` — raised when a named workflow/activity is
not present in the history

> **Replay safety:** workflow code runs many times during durable
> execution. Guard side-effecting calls — including `print()` — with
> `if not ctx.is_replaying:` so they only fire on the live execution.

## Running this example

Requires Dapr `1.18+`.

Install the Python dependencies:

<!-- STEP
name: Install dependencies
expected_stdout_lines:
- "patient-intake deps OK"
output_match_mode: substring
background: false
timeout_seconds: 180
-->

```bash
pip3 install -r requirements.txt && echo "patient-intake deps OK"
```

<!-- END_STEP -->

Run the demo:

<!-- STEP
name: Run history-propagation demo
expected_stdout_lines:
- "SCENARIO 1: lineage forwarded"
- "[ComplianceAudit] APPROVED"
- "[DispenseMedication] DISPENSED"
- "SCENARIO 2: lineage withheld"
- "[DispenseMedication] REFUSED"
- "pharmacy refused to dispense"
- "missing lineage: no propagated history received from prescriber"
output_match_mode: substring
background: false
timeout_seconds: 180
sleep: 15
-->

```bash
dapr run -f .
```

<!-- END_STEP -->

In scenario 1 (lineage forwarded) you'll see the pharmacy dispense:

```
[ComplianceAudit] Received propagated history: 15 events (scope: LINEAGE)
[ComplianceAudit] APPROVED (risk=0.10)
[DispenseMedication] Dispense request: amoxicillin 500mg ... (propagated history: 12 events, scope=OWN_HISTORY)
[DispenseMedication] DISPENSED: rx-P-1042-...
```

In scenario 2 (lineage withheld) the pharmacy refuses:

```
[PrescribeMedication] Step 4: CallActivity(DispenseMedication)
-> NO history propagation (negative scenario)
[DispenseMedication] Dispense request: penicillin 500mg ... (propagated history: none)
[DispenseMedication] REFUSED — no propagated history; cannot verify screening for P-2087
[PrescribeMedication] Step 4 BLOCKED: pharmacy refused to dispense (missing lineage: no propagated history received from prescriber)
```

In standalone mode the sidecar logs `propagating unsigned workflow history to ...`
warnings — these are expected and harmless for a local `dapr run` demo.
143 changes: 143 additions & 0 deletions tutorials/workflow/python/history-propagation/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
# Copyright 2026 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Workflow history propagation quickstart — patient intake / e-prescribing.

A root PatientIntake workflow orders a prescription via a child
PrescribeMedication workflow, which in turn runs a ComplianceAudit child
workflow and a DispenseMedication activity. The compliance audit and the
dispensing step inspect the propagated execution history of their callers
to verify that the required upstream checks (insurance, allergies, drug
interactions) actually ran before they make a decision.

Two scenarios are scheduled back-to-back:
1. Lineage forwarded — PrescribeMedication propagates its history to
DispenseMedication; the pharmacy dispenses.
2. Lineage withheld — PrescribeMedication does NOT propagate history;
the pharmacy refuses to dispense.

Both workflows run to completion and have their state purged so the app
exits on its own — no Ctrl+C needed.
"""

from __future__ import annotations

import dapr.ext.workflow as wf

from models import PatientRecord
from workflow import patient_intake, wfr


def _banner(msg: str) -> str:
line = '=' * (len(msg) + 4)
return f'{line}\n= {msg} =\n{line}'


def _run_scenario(client: wf.DaprWorkflowClient, title: str, instance_id: str, rec: PatientRecord) -> None:
"""Schedule one PatientIntake run, wait for it, print the result, purge state."""
print(flush=True)
print(_banner(title), flush=True)

started_id = client.schedule_new_workflow(
workflow=patient_intake,
input=rec.to_json(),
instance_id=instance_id,
)
print(f' [main] Started workflow: {started_id}', flush=True)

state = client.wait_for_workflow_completion(
instance_id=started_id,
timeout_in_seconds=30,
)
if state is None:
print(' [main] Workflow not found!', flush=True)
return
if state.runtime_status.name != 'COMPLETED':
print(
f' [main] Workflow ended with status: {state.runtime_status.name}',
flush=True,
)
else:
print(f' [main] Result: {state.serialized_output}', flush=True)

try:
client.purge_workflow(started_id)
except Exception as exc:
print(f' [main] failed to purge: {exc}', flush=True)


def main() -> None:
wfr.start()

print(_banner('WORKFLOW HISTORY PROPAGATION DEMO — PATIENT INTAKE'), flush=True)
print(flush=True)
print(' Flow: PatientIntake -> VerifyInsurance', flush=True)
print(' -> PrescribeMedication (child wf, lineage)', flush=True)
print(' -> CheckAllergies -> ScreenDrugInteractions', flush=True)
print(
' -> ComplianceAudit (child wf, lineage) '
'<-- sees PatientIntake + PrescribeMedication events',
flush=True,
)
print(
' -> DispenseMedication (activity, own only) '
'<-- sees only PrescribeMedication events',
flush=True,
)

client = wf.DaprWorkflowClient()

# Scenario 1 (happy path): PrescribeMedication forwards its own history to
# the pharmacy, which verifies the upstream screening and dispenses.
_run_scenario(
client,
'SCENARIO 1: lineage forwarded — pharmacy dispenses',
'intake-ok',
PatientRecord(
patient_id='P-1042',
name='Jane Doe',
dob='1985-06-12',
mrn='MRN-77231',
condition='bacterial sinusitis',
medication='amoxicillin',
dosage=500,
forward_lineage=True,
),
)

# Scenario 2 (negative): PrescribeMedication dispenses WITHOUT propagating
# its history, so the pharmacy receives no lineage and refuses to dispense.
_run_scenario(
client,
'SCENARIO 2: lineage withheld — pharmacy refuses',
'intake-missing-lineage',
PatientRecord(
patient_id='P-2087',
name='John Roe',
dob='1979-03-04',
mrn='MRN-55810',
condition='strep throat',
medication='penicillin',
dosage=500,
forward_lineage=False,
),
)

print(flush=True)
print(_banner('COMPLETE'), flush=True)

client.close()
wfr.shutdown()


if __name__ == '__main__':
main()
9 changes: 9 additions & 0 deletions tutorials/workflow/python/history-propagation/dapr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: 1
common:
resourcesPath: ../../resources
apps:
- appID: patient-app
appDirPath: ./
command: ["python3", "app.py"]
appLogDestination: console
daprdLogDestination: console
2 changes: 2 additions & 0 deletions tutorials/workflow/python/history-propagation/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include ../../../../docker.mk
include ../../../../validate.mk
Loading
Loading