Skip to content

Commit b877b74

Browse files
committed
feat: add workflow orchestration (call_workflow / start_workflow)
Add parent-child workflow composition with two primitives: - call_workflow/3: synchronous child execution with wait/resume - start_workflow/3: fire-and-forget child execution Includes cascade cancellation, idempotent resume, parent notification on child completion/failure, and nested workflow support (A→B→C).
1 parent 90587e6 commit b877b74

6 files changed

Lines changed: 1109 additions & 12 deletions

File tree

lib/durable.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,24 @@ defmodule Durable do
284284
Durable.Wait.send_event(workflow_id, event_name, payload)
285285
end
286286

287+
@doc """
288+
Lists child workflow executions for a parent workflow.
289+
290+
## Options
291+
292+
- `:status` - Filter by status
293+
294+
## Examples
295+
296+
children = Durable.list_children(parent_workflow_id)
297+
running_children = Durable.list_children(parent_workflow_id, status: :running)
298+
299+
"""
300+
@spec list_children(String.t(), keyword()) :: [map()]
301+
def list_children(parent_workflow_id, opts \\ []) do
302+
Durable.Query.list_child_executions(parent_workflow_id, opts)
303+
end
304+
287305
# Scheduling API
288306

289307
@doc """

lib/durable/executor.ex

Lines changed: 197 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ defmodule Durable.Executor do
8080
})
8181
|> Repo.update(config)
8282

83+
# Cascade cancel to child workflows
84+
cancel_child_workflows(config, workflow_id)
85+
8386
:ok
8487

8588
_execution ->
@@ -308,9 +311,18 @@ defmodule Durable.Executor do
308311

309312
case StepRunner.execute(step, data, exec.id, config) do
310313
{:ok, new_data} ->
311-
# Save data as context and continue to next step with new_data
314+
# Save data as context and continue to next step
315+
# save_data_as_context merges orchestration keys from process dict
312316
{:ok, exec} = save_data_as_context(config, exec, new_data)
313-
execute_steps_recursive(remaining_steps, exec, step_index, workflow_def, config, new_data)
317+
# Pass the DB-persisted context forward (includes orchestration keys)
318+
execute_steps_recursive(
319+
remaining_steps,
320+
exec,
321+
step_index,
322+
workflow_def,
323+
config,
324+
exec.context
325+
)
314326

315327
{:decision, target_step, new_data} ->
316328
handle_decision_result(
@@ -330,6 +342,10 @@ defmodule Durable.Executor do
330342
{:ok, exec} = save_data_as_context(config, exec, data)
331343
handle_wait_result(config, exec, wait_type, opts)
332344

345+
{:call_workflow, opts} ->
346+
{:ok, exec} = save_data_as_context(config, exec, data)
347+
handle_call_workflow(config, exec, opts)
348+
333349
{:error, error} ->
334350
handle_step_failure(exec, error, workflow_def, config)
335351
end
@@ -376,6 +392,41 @@ defmodule Durable.Executor do
376392
defp handle_wait_result(config, exec, :wait_for_all, opts),
377393
do: {:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)}
378394

395+
defp handle_wait_result(config, exec, :call_workflow, opts),
396+
do: handle_call_workflow(config, exec, opts)
397+
398+
# ============================================================================
399+
# Workflow Orchestration (call_workflow)
400+
# ============================================================================
401+
402+
defp handle_call_workflow(config, execution, opts) do
403+
child_id = Keyword.fetch!(opts, :child_id)
404+
event_name = Durable.Orchestration.child_event_name(child_id)
405+
timeout_at = calculate_timeout_at(opts)
406+
407+
# Create pending event to wait for child completion
408+
attrs = %{
409+
workflow_id: execution.id,
410+
event_name: event_name,
411+
step_name: execution.current_step,
412+
timeout_at: timeout_at,
413+
timeout_value: serialize_timeout_value(Keyword.get(opts, :timeout_value, :child_timeout)),
414+
wait_type: :single
415+
}
416+
417+
{:ok, _} =
418+
%PendingEvent{}
419+
|> PendingEvent.changeset(attrs)
420+
|> Repo.insert(config)
421+
422+
{:ok, execution} =
423+
execution
424+
|> Ecto.Changeset.change(status: :waiting)
425+
|> Repo.update(config)
426+
427+
{:waiting, execution}
428+
end
429+
379430
defp execute_branch(
380431
branch_step,
381432
remaining_steps,
@@ -522,7 +573,15 @@ defmodule Durable.Executor do
522573
case StepRunner.execute(step, data, exec.id, config) do
523574
{:ok, new_data} ->
524575
{:ok, exec} = save_data_as_context(config, exec, new_data)
525-
execute_branch_steps_sequential(rest, exec, step_index, workflow_def, config, new_data)
576+
577+
execute_branch_steps_sequential(
578+
rest,
579+
exec,
580+
step_index,
581+
workflow_def,
582+
config,
583+
exec.context
584+
)
526585

527586
{:decision, target_step, new_data} ->
528587
# Decisions within branches - save and return for outer handler
@@ -549,6 +608,10 @@ defmodule Durable.Executor do
549608
{:ok, exec} = save_data_as_context(config, exec, data)
550609
{:waiting, handle_wait_for_all(config, exec, opts) |> elem(1)}
551610

611+
{:call_workflow, opts} ->
612+
{:ok, exec} = save_data_as_context(config, exec, data)
613+
handle_call_workflow(config, exec, opts)
614+
552615
{:error, error} ->
553616
handle_step_failure(exec, error, workflow_def, config)
554617
end
@@ -820,6 +883,15 @@ defmodule Durable.Executor do
820883
}}}
821884
end
822885

886+
defp handle_parallel_step_result({:call_workflow, _opts}, returns_key) do
887+
{:ok, returns_key,
888+
{:error,
889+
%{
890+
type: "parallel_call_workflow_not_supported",
891+
message: "call_workflow not supported in parallel blocks"
892+
}}}
893+
end
894+
823895
defp await_parallel_tasks(tasks, :fail_fast), do: await_tasks_fail_fast(tasks)
824896
defp await_parallel_tasks(tasks, :complete_all), do: await_tasks_complete_all(tasks)
825897
defp await_parallel_tasks(tasks, _), do: await_tasks_complete_all(tasks)
@@ -961,12 +1033,42 @@ defmodule Durable.Executor do
9611033
end
9621034

9631035
# Saves data as the workflow context in DB (for persistence/resume)
1036+
# Also merges orchestration keys from process dict to ensure child workflow
1037+
# references are persisted through DB round-trips
9641038
defp save_data_as_context(config, execution, data) do
1039+
merged = merge_orchestration_context(data)
1040+
9651041
execution
966-
|> Ecto.Changeset.change(context: data)
1042+
|> Ecto.Changeset.change(context: merged)
9671043
|> Repo.update(config)
9681044
end
9691045

1046+
# Merge orchestration keys (__child:*, __fire_forget:*, __child_done:*) from
1047+
# process dict into the data to persist. These keys are set by
1048+
# Durable.Orchestration.call_workflow/start_workflow via put_context.
1049+
defp merge_orchestration_context(data) do
1050+
process_ctx = Process.get(:durable_context, %{})
1051+
1052+
orchestration_keys =
1053+
process_ctx
1054+
|> Enum.filter(fn {key, _} -> orchestration_key?(key) end)
1055+
|> Map.new()
1056+
1057+
Map.merge(data, orchestration_keys)
1058+
end
1059+
1060+
defp orchestration_key?(key) when is_atom(key) do
1061+
orchestration_key?(Atom.to_string(key))
1062+
end
1063+
1064+
defp orchestration_key?(key) when is_binary(key) do
1065+
String.starts_with?(key, "__child:") or
1066+
String.starts_with?(key, "__fire_forget:") or
1067+
String.starts_with?(key, "__child_done:")
1068+
end
1069+
1070+
defp orchestration_key?(_), do: false
1071+
9701072
defp mark_completed(config, execution, final_data) do
9711073
{:ok, execution} =
9721074
execution
@@ -978,21 +1080,105 @@ defmodule Durable.Executor do
9781080
|> Ecto.Changeset.change(locked_by: nil, locked_at: nil)
9791081
|> Repo.update(config)
9801082

1083+
maybe_notify_parent(config, execution, :completed, final_data)
1084+
9811085
{:ok, execution}
9821086
end
9831087

9841088
defp mark_failed(config, execution, error) do
985-
execution
986-
|> WorkflowExecution.status_changeset(:failed, %{
987-
error: error,
988-
completed_at: DateTime.utc_now()
989-
})
990-
|> Ecto.Changeset.change(locked_by: nil, locked_at: nil)
991-
|> Repo.update(config)
1089+
{:ok, execution} =
1090+
execution
1091+
|> WorkflowExecution.status_changeset(:failed, %{
1092+
error: error,
1093+
completed_at: DateTime.utc_now()
1094+
})
1095+
|> Ecto.Changeset.change(locked_by: nil, locked_at: nil)
1096+
|> Repo.update(config)
1097+
1098+
maybe_notify_parent(config, execution, :failed, error)
9921099

9931100
{:error, error}
9941101
end
9951102

1103+
# ============================================================================
1104+
# Parent Notification (Orchestration)
1105+
# ============================================================================
1106+
1107+
defp maybe_notify_parent(_config, %{parent_workflow_id: nil}, _status, _data), do: :ok
1108+
1109+
defp maybe_notify_parent(config, execution, status, data) do
1110+
event_name = Durable.Orchestration.child_event_name(execution.id)
1111+
payload = Durable.Orchestration.build_result_payload(status, data)
1112+
1113+
# Find and fulfill the pending event on the parent workflow
1114+
query =
1115+
from(p in PendingEvent,
1116+
where:
1117+
p.workflow_id == ^execution.parent_workflow_id and
1118+
p.event_name == ^event_name and
1119+
p.status == :pending
1120+
)
1121+
1122+
case Repo.one(config, query) do
1123+
nil ->
1124+
# Parent not waiting (fire-and-forget case, or already timed out)
1125+
:ok
1126+
1127+
pending_event ->
1128+
# Fulfill the pending event
1129+
{:ok, _} =
1130+
pending_event
1131+
|> PendingEvent.receive_changeset(payload)
1132+
|> Repo.update(config)
1133+
1134+
# Find the child ref from parent's context to store result under the right key
1135+
parent = Repo.get(config, WorkflowExecution, execution.parent_workflow_id)
1136+
result_context = build_parent_result_context(parent, execution.id, payload)
1137+
1138+
# Resume the parent workflow
1139+
resume_workflow(execution.parent_workflow_id, result_context)
1140+
end
1141+
end
1142+
1143+
# Build context update for parent with child result stored under the right key
1144+
defp build_parent_result_context(parent, child_id, payload) do
1145+
parent_context = parent.context || %{}
1146+
1147+
# Find which ref this child belongs to by looking for __child:ref = child_id
1148+
ref =
1149+
Enum.find_value(parent_context, fn
1150+
{"__child:" <> ref_str, ^child_id} -> ref_str
1151+
_ -> nil
1152+
end)
1153+
1154+
if ref do
1155+
%{
1156+
"__child_done:#{ref}" => payload,
1157+
Durable.Orchestration.child_event_name(child_id) => payload
1158+
}
1159+
else
1160+
%{Durable.Orchestration.child_event_name(child_id) => payload}
1161+
end
1162+
end
1163+
1164+
# ============================================================================
1165+
# Cascade Cancellation (Orchestration)
1166+
# ============================================================================
1167+
1168+
defp cancel_child_workflows(config, parent_id) do
1169+
query =
1170+
from(w in WorkflowExecution,
1171+
where: w.parent_workflow_id == ^parent_id,
1172+
where: w.status in [:pending, :running, :waiting]
1173+
)
1174+
1175+
children = Repo.all(config, query)
1176+
1177+
Enum.each(children, fn child ->
1178+
cancel_workflow(child.id, "parent_cancelled", durable: config.name)
1179+
end)
1180+
end
1181+
9961182
# ============================================================================
9971183
# Compensation/Saga Support
9981184
# ============================================================================

lib/durable/executor/step_runner.ex

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ defmodule Durable.Executor.StepRunner do
2626
| {:wait_for_input, keyword()}
2727
| {:wait_for_any, keyword()}
2828
| {:wait_for_all, keyword()}
29+
| {:call_workflow, keyword()}
2930

3031
@doc """
3132
Executes a step with retry logic.
@@ -139,7 +140,14 @@ defmodule Durable.Executor.StepRunner do
139140

140141
# Handle wait primitives (throws)
141142
defp handle_result({:throw, {wait_type, opts}}, ctx)
142-
when wait_type in [:sleep, :wait_for_event, :wait_for_input, :wait_for_any, :wait_for_all] do
143+
when wait_type in [
144+
:sleep,
145+
:wait_for_event,
146+
:wait_for_input,
147+
:wait_for_any,
148+
:wait_for_all,
149+
:call_workflow
150+
] do
143151
%{step_exec: step_exec, config: config} = ctx
144152
{:ok, _} = update_step_execution(config, step_exec, :waiting)
145153
{wait_type, opts}

0 commit comments

Comments
 (0)