Date: 2025-10-25 Status: ✅ Production Ready
singularity_workflow now supports dynamic workflow creation - perfect for AI/LLM agents that generate workflows at runtime!
| Approach | Use Case | Example |
|---|---|---|
| Static (Code) | Hand-written workflows | Elixir modules with __workflow_steps__/0 |
| Dynamic (Database) | AI-generated workflows | FlowBuilder API + step functions map |
Both use the same execution engine - identical performance and features!
# 1. AI generates workflow structure
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow("ai_data_pipeline", repo,
max_attempts: 5,
timeout: 60
)
# 2. AI adds steps with dependencies
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_data_pipeline", "fetch_data", [], repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_data_pipeline", "process_batch", ["fetch_data"], repo,
step_type: "map",
initial_tasks: 100 # Process 100 items in parallel
)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("ai_data_pipeline", "save_results", ["process_batch"], repo)
# 3. Provide step implementations
step_functions = %{
fetch_data: fn _input ->
{:ok, Enum.to_list(1..100)} # Returns array for map step
end,
process_batch: fn input ->
# Each of 100 tasks processes one item
item = Map.get(input, "item")
{:ok, %{processed: item * 2}}
end,
save_results: fn input ->
# Runs after all 100 tasks complete
{:ok, %{saved: true}}
end
}
# 4. Execute!
{:ok, result} = Singularity.Workflow.Executor.execute_dynamic(
"ai_data_pipeline",
%{"initial" => "data"},
step_functions,
repo
)Creates a workflow definition.
{:ok, workflow} = Singularity.Workflow.FlowBuilder.create_flow(
"my_workflow", # Unique identifier
repo,
max_attempts: 3, # Default retry count
timeout: 30 # Default timeout in seconds
)Returns:
{:ok, workflow_map}- Success{:error, :invalid_workflow_slug}- Invalid name (must be^[a-zA-Z_][a-zA-Z0-9_]*$)
Adds a step to a workflow.
{:ok, step} = Singularity.Workflow.FlowBuilder.add_step(
"my_workflow", # Workflow slug
"step_name", # Step identifier
["dep1", "dep2"], # Dependencies (can be empty [])
repo,
step_type: "single", # "single" or "map"
initial_tasks: nil, # For map steps: number of tasks
max_attempts: 5, # Override workflow default
timeout: 120 # Override workflow default (seconds)
)Step Types:
"single"- One task (default)"map"- Multiple tasks (requiresinitial_tasksor determines from dependency output)
Map Step Rules:
- Can have 0 or 1 dependency (NOT multiple)
- If 0 deps: Maps over workflow input array
- If 1 dep: Maps over dependency's output array
Executes a dynamic workflow.
{:ok, result} = Singularity.Workflow.Executor.execute_dynamic(
"workflow_slug", # String workflow identifier
%{"input" => "data"}, # Initial input
step_functions, # Map of step_slug atoms => functions
repo,
timeout: 300_000, # Optional: execution timeout (ms)
poll_interval: 200 # Optional: polling interval (ms)
)Step Functions Map:
step_functions = %{
step_slug: fn input -> {:ok, output} end,
another_step: fn input -> {:ok, output} end
}Returns:
{:ok, result}- Workflow completed{:error, reason}- Workflow failed{:error, {:workflow_not_found, slug}}- Workflow doesn't exist
defmodule AIWorkflowGenerator do
def generate_from_prompt(prompt, repo) do
# 1. Claude analyzes prompt and generates workflow structure
workflow_spec = ask_claude("""
Generate a workflow for: #{prompt}
Return JSON with:
{
"workflow_slug": "...",
"steps": [
{"slug": "...", "depends_on": [...], "type": "single|map"}
]
}
""")
# 2. Create workflow
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow(workflow_spec.workflow_slug, repo)
# 3. Add steps
for step <- workflow_spec.steps do
Singularity.Workflow.FlowBuilder.add_step(
workflow_spec.workflow_slug,
step.slug,
step.depends_on,
repo,
step_type: step.type
)
end
# 4. Claude generates step implementations
step_functions = generate_step_functions(workflow_spec, prompt)
# 5. Execute
Singularity.Workflow.Executor.execute_dynamic(
workflow_spec.workflow_slug,
%{"prompt" => prompt},
step_functions,
repo
)
end
enddefmodule MultiAgentWorkflow do
def create_agent_workflow(agents, repo) do
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow("multi_agent_task", repo)
# Create steps for each agent
prev_step = nil
for agent <- agents do
deps = if prev_step, do: [prev_step], else: []
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step(
"multi_agent_task",
agent.slug,
deps,
repo
)
prev_step = agent.slug
end
# Map agent functions
step_functions =
agents
|> Enum.map(fn agent -> {String.to_atom(agent.slug), agent.function} end)
|> Map.new()
Singularity.Workflow.Executor.execute_dynamic("multi_agent_task", %{}, step_functions, repo)
end
enddefmodule ABTestWorkflows do
def create_variant(variant_name, config, repo) do
workflow_slug = "ab_test_#{variant_name}"
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow(workflow_slug, repo)
# Build workflow based on variant config
for {step_name, step_config} <- config.steps do
Singularity.Workflow.FlowBuilder.add_step(
workflow_slug,
step_name,
step_config.depends_on,
repo
)
end
workflow_slug
end
def run_ab_test(input, repo) do
# Create variants
variant_a = create_variant("a", @config_a, repo)
variant_b = create_variant("b", @config_b, repo)
# Run both in parallel
tasks = [
Task.async(fn ->
Singularity.Workflow.Executor.execute_dynamic(variant_a, input, @step_fns, repo)
end),
Task.async(fn ->
Singularity.Workflow.Executor.execute_dynamic(variant_b, input, @step_fns, repo)
end)
]
Task.await_many(tasks)
end
endDynamic workflows are stored in these tables:
-- Workflow definitions
workflows (
workflow_slug TEXT PRIMARY KEY,
max_attempts INTEGER DEFAULT 3,
timeout INTEGER DEFAULT 30,
created_at TIMESTAMPTZ
)
-- Steps within workflows
workflow_steps (
workflow_slug TEXT,
step_slug TEXT,
step_type TEXT DEFAULT 'single', -- 'single' or 'map'
step_index INTEGER,
deps_count INTEGER,
initial_tasks INTEGER,
max_attempts INTEGER,
timeout INTEGER,
PRIMARY KEY (workflow_slug, step_slug)
)
-- Step dependencies
workflow_step_dependencies_def (
workflow_slug TEXT,
dep_slug TEXT, -- dependency step
step_slug TEXT, -- dependent step
PRIMARY KEY (workflow_slug, dep_slug, step_slug)
){:ok, workflows} = Singularity.Workflow.FlowBuilder.list_flows(repo)
Enum.each(workflows, fn w ->
IO.puts("#{w["workflow_slug"]} - #{w["max_attempts"]} attempts")
end){:ok, workflow} = Singularity.Workflow.FlowBuilder.get_flow("my_workflow", repo)
# => %{
# "workflow_slug" => "my_workflow",
# "steps" => [
# %{"step_slug" => "step1", "depends_on" => []},
# %{"step_slug" => "step2", "depends_on" => ["step1"]}
# ]
# }:ok = Singularity.Workflow.FlowBuilder.delete_flow("old_workflow", repo)- Runtime generation - Claude/GPT generates workflows on-the-fly
- Natural language input - "Process these files then email results"
- Adaptive workflows - Change structure based on data
- Multi-tenant - Different workflow per user/tenant
- Same execution engine - Identical performance as code workflows
- Admin UIs - Build visual workflow editors
- A/B testing - Test different workflow structures
- Debugging - Inspect workflow in database
- Less type-safe - No compile-time checking
- Runtime overhead - DB lookup to load definition
- Migration complexity - Schema changes need careful handling
- Validate step functions exist - Check all step_slugs have implementations
- Use transactions - Wrap create_flow + add_step in DB transaction
- Cache definitions - Load once, execute many times
- Sanitize slugs - Validate names from AI (use is_valid_slug)
Dynamic workflows have identical execution performance to static workflows:
- ✅ Same pgmq coordination
- ✅ Same PostgreSQL functions
- ✅ Same parallel execution
- ✅ Same error handling
Only difference: 1-2ms DB query overhead to load definition initially.
# Before (static)
defmodule MyWorkflow do
def __workflow_steps__ do
[{:step1, &__MODULE__.step1/1, depends_on: []}]
end
def step1(input), do: {:ok, input}
end
Singularity.Workflow.Executor.execute(MyWorkflow, input, repo)
# After (dynamic)
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow("my_workflow", repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("my_workflow", "step1", [], repo)
step_functions = %{
step1: fn input -> {:ok, input} end
}
Singularity.Workflow.Executor.execute_dynamic("my_workflow", input, step_functions, repo)Both execute identically!
# User prompt
prompt = """
Extract data from CSV files, validate each row,
transform the valid rows, and load into PostgreSQL.
Process rows in parallel batches of 50.
"""
# Claude generates workflow
{:ok, _} = Singularity.Workflow.FlowBuilder.create_flow("claude_etl", repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("claude_etl", "extract_csv", [], repo)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("claude_etl", "validate_rows", ["extract_csv"], repo,
step_type: "map",
initial_tasks: 50
)
{:ok, _} = Singularity.Workflow.FlowBuilder.add_step("claude_etl", "load_db", ["validate_rows"], repo)
# Implementations
step_functions = %{
extract_csv: fn _input ->
rows = File.stream!("data.csv") |> CSV.decode() |> Enum.to_list()
{:ok, rows}
end,
validate_rows: fn input ->
row = Map.get(input, "item")
if valid?(row), do: {:ok, transform(row)}, else: {:error, :invalid}
end,
load_db: fn input ->
Repo.insert_all(DataTable, input["validate_rows"])
{:ok, %{loaded: true}}
end
}
# Execute
{:ok, result} = Singularity.Workflow.Executor.execute_dynamic("claude_etl", %{}, step_functions, repo)Dynamic workflows enable AI/LLM agents to create workflows at runtime while using the same battle-tested execution engine as code-based workflows.
Perfect for:
- 🤖 AI workflow generation from natural language
- 🧪 A/B testing different workflow structures
- 🏗️ Visual workflow builders
- 🎯 User-specific workflow customization
- 🔄 Runtime workflow adaptation
Same performance, same features, more flexibility! ✅