Skip to content

Commit 739bec3

Browse files
authored
Merge pull request #5 from wavezync/feat/dx-cli-tools
feat: add Mix tasks for CLI workflow management
2 parents 5cb6afe + 75e680f commit 739bec3

14 files changed

+1300
-0
lines changed

README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,35 @@ Durable.provide_input(id, "input_name", data)
485485
Durable.list_children(parent_id)
486486
```
487487

488+
## Mix Tasks
489+
490+
Durable includes mix tasks for managing workflows from the command line.
491+
492+
```bash
493+
# Show queue status and workflow summary
494+
mix durable.status
495+
496+
# List workflow executions (with filters)
497+
mix durable.list # all executions
498+
mix durable.list --status running # filter by status
499+
mix durable.list --workflow MyApp.OrderWorkflow # filter by workflow
500+
mix durable.list --limit 20 --format json # limit results, JSON output
501+
502+
# Start a workflow
503+
mix durable.run MyApp.OrderWorkflow # no input
504+
mix durable.run MyApp.OrderWorkflow --input '{"id": 123}' # with JSON input
505+
mix durable.run MyApp.OrderWorkflow --queue high_priority # specific queue
506+
507+
# Cancel a workflow
508+
mix durable.cancel <execution_id>
509+
mix durable.cancel <execution_id> --reason "no longer needed"
510+
511+
# Clean up old executions
512+
mix durable.cleanup --older-than 30d # completed/failed older than 30 days
513+
mix durable.cleanup --older-than 7d --status completed # only completed, older than 7 days
514+
mix durable.cleanup --older-than 24h --dry-run # preview what would be deleted
515+
```
516+
488517
## Guides
489518

490519
- [Branching](guides/branching.md) - Conditional flow control

lib/mix/helpers.ex

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
defmodule Durable.Mix.Helpers do
2+
@moduledoc false
3+
4+
# Shared utilities for Durable mix tasks.
5+
6+
@doc """
7+
Ensures the application is started.
8+
"""
9+
def ensure_started do
10+
Mix.Task.run("app.start")
11+
end
12+
13+
@doc """
14+
Parses --name option, returns Durable instance name atom.
15+
"""
16+
def get_durable_name(opts) do
17+
case Keyword.get(opts, :name) do
18+
nil -> Durable
19+
name -> Module.concat([name])
20+
end
21+
end
22+
23+
@doc """
24+
Formats rows into an aligned table with headers.
25+
"""
26+
def format_table(rows, headers) do
27+
all = [headers | rows]
28+
29+
widths =
30+
Enum.reduce(all, List.duplicate(0, length(headers)), fn row, widths ->
31+
row
32+
|> Enum.map(&String.length(to_string(&1)))
33+
|> Enum.zip(widths)
34+
|> Enum.map(fn {a, b} -> max(a, b) end)
35+
end)
36+
37+
format_row = fn row ->
38+
row
39+
|> Enum.zip(widths)
40+
|> Enum.map_join(" ", fn {val, width} ->
41+
String.pad_trailing(to_string(val), width)
42+
end)
43+
end
44+
45+
header_line = format_row.(headers)
46+
data_lines = Enum.map(rows, format_row)
47+
[header_line | data_lines]
48+
end
49+
50+
@doc """
51+
Truncates a UUID to the first 8 characters.
52+
"""
53+
def truncate_id(nil), do: "—"
54+
55+
def truncate_id(id) when is_binary(id) do
56+
String.slice(id, 0, 8)
57+
end
58+
59+
@doc """
60+
Formats a duration between two datetimes as a human-readable string.
61+
"""
62+
def format_duration(nil, _), do: "—"
63+
def format_duration(_, nil), do: "—"
64+
65+
def format_duration(started_at, completed_at) do
66+
diff = DateTime.diff(completed_at, started_at, :second)
67+
format_seconds(diff)
68+
end
69+
70+
@doc """
71+
Formats a number of seconds into a human-readable duration string.
72+
"""
73+
def format_seconds(seconds) when seconds < 60, do: "#{seconds}s"
74+
75+
def format_seconds(seconds) when seconds < 3600 do
76+
m = div(seconds, 60)
77+
s = rem(seconds, 60)
78+
if s == 0, do: "#{m}m", else: "#{m}m #{s}s"
79+
end
80+
81+
def format_seconds(seconds) do
82+
h = div(seconds, 3600)
83+
m = div(rem(seconds, 3600), 60)
84+
if m == 0, do: "#{h}h", else: "#{h}h #{m}m"
85+
end
86+
87+
@doc """
88+
Formats a DateTime as "YYYY-MM-DD HH:MM:SS" or "—" for nil.
89+
"""
90+
def format_datetime(nil), do: "—"
91+
92+
def format_datetime(%DateTime{} = dt) do
93+
Calendar.strftime(dt, "%Y-%m-%d %H:%M:%S")
94+
end
95+
96+
@doc """
97+
Formats an integer with comma separators.
98+
"""
99+
def format_number(n) when is_integer(n) and n < 0 do
100+
"-" <> format_number(-n)
101+
end
102+
103+
def format_number(n) when is_integer(n) do
104+
n
105+
|> Integer.to_string()
106+
|> String.graphemes()
107+
|> Enum.reverse()
108+
|> Enum.chunk_every(3)
109+
|> Enum.map_join(",", &Enum.reverse/1)
110+
|> String.reverse()
111+
|> String.reverse()
112+
end
113+
114+
@doc """
115+
Strips the "Elixir." prefix from a module name string.
116+
"""
117+
def strip_elixir_prefix(module_str) when is_binary(module_str) do
118+
String.replace_prefix(module_str, "Elixir.", "")
119+
end
120+
end

lib/mix/tasks/durable.cancel.ex

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
defmodule Mix.Tasks.Durable.Cancel do
2+
@shortdoc "Cancels a workflow execution"
3+
4+
@moduledoc """
5+
Cancels a running, pending, or waiting workflow execution.
6+
7+
## Usage
8+
9+
mix durable.cancel WORKFLOW_ID [options]
10+
11+
## Options
12+
13+
* `--reason REASON` - Cancellation reason
14+
* `--name NAME` - The Durable instance name (default: Durable)
15+
16+
## Examples
17+
18+
mix durable.cancel abc12345-...
19+
mix durable.cancel abc12345-... --reason "User requested cancellation"
20+
"""
21+
22+
use Mix.Task
23+
24+
alias Durable.Executor
25+
alias Durable.Mix.Helpers
26+
27+
@impl Mix.Task
28+
def run(args) do
29+
Helpers.ensure_started()
30+
31+
{opts, positional, _} =
32+
OptionParser.parse(args, strict: [reason: :string, name: :string])
33+
34+
case positional do
35+
[workflow_id | _] ->
36+
cancel_workflow(workflow_id, opts)
37+
38+
[] ->
39+
Mix.shell().error("Usage: mix durable.cancel WORKFLOW_ID [--reason REASON]")
40+
end
41+
end
42+
43+
defp cancel_workflow(workflow_id, opts) do
44+
durable_name = Helpers.get_durable_name(opts)
45+
reason = Keyword.get(opts, :reason)
46+
47+
case Executor.cancel_workflow(workflow_id, reason, durable: durable_name) do
48+
:ok ->
49+
Mix.shell().info("Workflow #{Helpers.truncate_id(workflow_id)} cancelled.")
50+
51+
{:error, :not_found} ->
52+
Mix.shell().error("Workflow #{workflow_id} not found.")
53+
54+
{:error, :already_completed} ->
55+
Mix.shell().error(
56+
"Workflow #{workflow_id} has already completed and cannot be cancelled."
57+
)
58+
end
59+
end
60+
end

lib/mix/tasks/durable.cleanup.ex

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
defmodule Mix.Tasks.Durable.Cleanup do
2+
@shortdoc "Deletes old workflow executions"
3+
4+
@moduledoc """
5+
Deletes old workflow executions from the database.
6+
7+
Cascade deletes handle associated step executions, pending inputs, and events.
8+
9+
## Usage
10+
11+
mix durable.cleanup --older-than DURATION [options]
12+
13+
## Options
14+
15+
* `--older-than DURATION` - Required. Delete executions older than this duration.
16+
Supports: `30d` (days), `24h` (hours), `60m` (minutes)
17+
* `--status STATUS` - Only delete executions with this status (default: completed, failed).
18+
Can be specified multiple times.
19+
* `--dry-run` - Show how many records would be deleted without deleting
20+
* `--batch-size N` - Number of records to delete per batch (default: 1000)
21+
* `--name NAME` - The Durable instance name (default: Durable)
22+
23+
## Examples
24+
25+
mix durable.cleanup --older-than 30d
26+
mix durable.cleanup --older-than 24h --status completed --dry-run
27+
mix durable.cleanup --older-than 7d --batch-size 500
28+
"""
29+
30+
use Mix.Task
31+
32+
import Ecto.Query
33+
34+
alias Durable.Config
35+
alias Durable.Mix.Helpers
36+
alias Durable.Repo
37+
alias Durable.Storage.Schemas.WorkflowExecution
38+
39+
@default_statuses [:completed, :failed]
40+
@default_batch_size 1000
41+
42+
@impl Mix.Task
43+
def run(args) do
44+
Helpers.ensure_started()
45+
46+
{opts, _, _} =
47+
OptionParser.parse(args,
48+
strict: [
49+
older_than: :string,
50+
status: [:string, :keep],
51+
dry_run: :boolean,
52+
batch_size: :integer,
53+
name: :string
54+
]
55+
)
56+
57+
with {:ok, cutoff} <- parse_older_than(opts),
58+
{:ok, statuses} <- parse_statuses(opts) do
59+
durable_name = Helpers.get_durable_name(opts)
60+
config = Config.get(durable_name)
61+
dry_run = Keyword.get(opts, :dry_run, false)
62+
batch_size = Keyword.get(opts, :batch_size, @default_batch_size)
63+
64+
if dry_run do
65+
run_dry(config, cutoff, statuses)
66+
else
67+
run_cleanup(config, cutoff, statuses, batch_size)
68+
end
69+
end
70+
end
71+
72+
defp parse_older_than(opts) do
73+
case Keyword.get(opts, :older_than) do
74+
nil ->
75+
Mix.shell().error("--older-than is required. Example: --older-than 30d")
76+
:error
77+
78+
duration_str ->
79+
parse_duration(duration_str)
80+
end
81+
end
82+
83+
defp parse_duration(str) do
84+
case Regex.run(~r/^(\d+)([dhm])$/, str) do
85+
[_, num_str, unit] ->
86+
num = String.to_integer(num_str)
87+
seconds = duration_to_seconds(num, unit)
88+
cutoff = DateTime.add(DateTime.utc_now(), -seconds, :second)
89+
{:ok, cutoff}
90+
91+
nil ->
92+
Mix.shell().error(
93+
"Invalid duration: #{str}. Use format like 30d (days), 24h (hours), or 60m (minutes)."
94+
)
95+
96+
:error
97+
end
98+
end
99+
100+
defp duration_to_seconds(num, "d"), do: num * 86_400
101+
defp duration_to_seconds(num, "h"), do: num * 3_600
102+
defp duration_to_seconds(num, "m"), do: num * 60
103+
104+
defp parse_statuses(opts) do
105+
case Keyword.get_values(opts, :status) do
106+
[] ->
107+
{:ok, @default_statuses}
108+
109+
status_strings ->
110+
statuses =
111+
Enum.map(status_strings, fn s ->
112+
String.to_existing_atom(s)
113+
end)
114+
115+
{:ok, statuses}
116+
end
117+
rescue
118+
ArgumentError ->
119+
Mix.shell().error("Invalid status provided.")
120+
:error
121+
end
122+
123+
defp run_dry(config, cutoff, statuses) do
124+
count = count_matching(config, cutoff, statuses)
125+
status_str = Enum.map_join(statuses, ", ", &to_string/1)
126+
127+
Mix.shell().info(
128+
"Dry run: #{Helpers.format_number(count)} executions would be deleted " <>
129+
"(status: #{status_str}, older than #{Helpers.format_datetime(cutoff)})."
130+
)
131+
end
132+
133+
defp run_cleanup(config, cutoff, statuses, batch_size) do
134+
total = do_batch_delete(config, cutoff, statuses, batch_size, 0)
135+
Mix.shell().info("Deleted #{Helpers.format_number(total)} workflow executions.")
136+
end
137+
138+
defp do_batch_delete(config, cutoff, statuses, batch_size, acc) do
139+
ids_query =
140+
from(w in WorkflowExecution,
141+
where: w.status in ^statuses and w.inserted_at < ^cutoff,
142+
select: w.id,
143+
limit: ^batch_size
144+
)
145+
146+
delete_query = from(w in WorkflowExecution, where: w.id in subquery(ids_query))
147+
{deleted, _} = Repo.delete_all(config, delete_query)
148+
149+
if deleted > 0 do
150+
do_batch_delete(config, cutoff, statuses, batch_size, acc + deleted)
151+
else
152+
acc
153+
end
154+
end
155+
156+
defp count_matching(config, cutoff, statuses) do
157+
query =
158+
from(w in WorkflowExecution,
159+
where: w.status in ^statuses and w.inserted_at < ^cutoff,
160+
select: count(w.id)
161+
)
162+
163+
Repo.one(config, query)
164+
end
165+
end

0 commit comments

Comments
 (0)