- Name: Singularity.Workflow (singularity_workflow)
- Version: 0.1.5
- Type: Elixir Library Package (published on Hex.pm)
- Purpose: PostgreSQL-based workflow orchestration library for Elixir
- License: MIT
Singularity.Workflow is a production-ready Elixir library that provides complete workflow orchestration capabilities using:
- PostgreSQL + pgmq extension for persistent task coordination
- PostgreSQL NOTIFY for real-time event messaging (NATS replacement)
- Hierarchical Task DAG (HTDAG) support for goal-driven workflow decomposition
- Parallel execution with automatic dependency resolution
- Dynamic workflows for AI/LLM-generated task graphs
singularity-workflows/
├── lib/ # Main library code (7,280 lines)
│ └── singularity_workflow/
│ ├── executor.ex # Workflow execution engine (781 lines)
│ ├── flow_builder.ex # Dynamic workflow API (381 lines)
│ ├── notifications.ex # PostgreSQL NOTIFY messaging (727 lines)
│ ├── orchestrator.ex # HTDAG orchestration (527 lines)
│ ├── orchestrator_optimizer.ex # Optimization engine (1,040 lines)
│ ├── workflow_composer.ex # High-level composition API (479 lines)
│ ├── dag/ # DAG parsing & execution
│ │ ├── workflow_definition.ex
│ │ ├── run_initializer.ex
│ │ ├── task_executor.ex
│ │ └── dynamic_workflow_loader.ex
│ ├── execution/ # Execution strategy pattern
│ │ ├── strategy.ex
│ │ └── backends/
│ │ ├── direct_backend.ex # Local execution
│ │ ├── oban_backend.ex # Distributed execution
│ │ └── distributed_backend.ex
│ ├── jobs/ # Oban job definitions
│ │ ├── step_job.ex
│ │ └── gpu_step_job.ex
│ ├── orchestrator/ # HTDAG components
│ │ ├── config.ex
│ │ ├── executor.ex
│ │ ├── repository.ex
│ │ ├── schemas.ex
│ │ └── example_decomposer.ex
│ └── [Schema modules] # Ecto schemas
│ ├── workflow_run.ex
│ ├── step_state.ex
│ ├── step_task.ex
│ └── step_dependency.ex
├── test/ # Test suite (10,566 lines)
│ ├── singularity_workflow/ # Unit & integration tests
│ ├── integration/ # End-to-end tests
│ └── support/ # Test helpers & fixtures
├── config/ # Configuration files
│ ├── config.exs # Main configuration
│ ├── test.exs
│ └── dev.exs
├── priv/repo/migrations/ # Database migrations
├── docs/ # Comprehensive documentation
│ ├── ARCHITECTURE.md
│ ├── API_REFERENCE.md
│ ├── HTDAG_ORCHESTRATOR_GUIDE.md
│ ├── DYNAMIC_WORKFLOWS_GUIDE.md
│ ├── TESTING_GUIDE.md
│ └── [6 more guides]
├── scripts/ # Development scripts
├── mix.exs # Mix project configuration
└── .credo.exs # Code quality configuration
- Responsibility: High-level workflow execution orchestration
- Key Functions:
execute/4- Execute static workflows from Elixir modulesexecute_dynamic/5- Execute database-stored dynamic workflowsget_run_status/2- Query workflow execution statuscancel_workflow_run/3,pause_workflow_run/2,resume_workflow_run/2- Lifecycle managementretry_failed_workflow/3- Retry failed workflowslist_workflow_runs/2- Query historical runs with filtering
- Patterns: Delegates to DAG modules for parsing/execution, uses Ecto transactions
- Critical: Entry point for all workflow executions
- Responsibility: Parse and validate workflow step definitions
- Key Features:
- Supports sequential syntax (legacy, auto-converts to DAG)
- Supports explicit DAG syntax with
depends_on: [step_names] - Cycle detection and dependency validation
- Identifies root steps (steps with no dependencies)
- Metadata handling (timeouts, max_attempts, execution mode)
- Data Structure:
%WorkflowDefinition{steps, dependencies, root_steps, slug, step_metadata}
- Responsibility: Initialize workflow runs in the database
- Operations:
- Creates
workflow_runsrecord with initial state - Creates
step_statesentries for each step - Creates
step_dependenciesrelationships - Creates initial
step_tasksfor root steps - Calls PostgreSQL
start_ready_steps()to mark root steps as "started"
- Creates
- Key: Sets up remaining_steps counter for atomic completion tracking
- Responsibility: Execute tasks in a workflow run
- Execution Loop:
- Poll pgmq for queued tasks
- Claim task with FOR UPDATE SKIP LOCKED (row-level locking)
- Execute step function
- Call PostgreSQL
complete_task()function - Repeat until completion or timeout
- Features:
- Multi-worker support (PostgreSQL coordinates via locking)
- Automatic retry with configurable max_attempts
- Timeout handling (task-level and workflow-level)
- Batch task polling for efficiency
- Critical: Core polling/execution loop
- Responsibility: Load workflow definitions from database for dynamic workflows
- Process: Queries
workflows,workflow_steps,workflow_step_dependencies_deftables - Output: Returns
WorkflowDefinitionobject compatible with static workflows
- Responsibility: API for creating/managing workflows at runtime (AI/LLM integration)
- Public API:
create_flow/3- Create new workflow definitionadd_step/5- Add step with dependenciesget_flow/2- Retrieve workflow with stepslist_flows/1- List all workflowsdelete_flow/2- Delete workflow
- Features:
- Comprehensive input validation (slug format, types, constraints)
- Support for map steps (parallel task counts)
- Per-step configuration (timeouts, max_attempts, resources)
- Implementation: Delegates to
FlowOperationsfor actual DB operations
- Responsibility: Low-level workflow creation/manipulation
- Implementation: Uses Elixir instead of PostgreSQL functions (bypasses PG17 parser bug)
- Uses: Direct SQL queries to manipulate workflow tables
- Responsibility: PostgreSQL NOTIFY messaging (NATS replacement)
- Key Functions:
send_with_notify/4- Send message via pgmq + NOTIFYlisten/2- Subscribe to NOTIFY channelunlisten/2- Unsubscribe from channelnotify_only/3- Send NOTIFY without persistencereceive_message/3- Poll pgmq queueacknowledge/3- Mark message as processed
- Architecture:
- Uses pgmq for message persistence
- Uses PostgreSQL NOTIFY for real-time delivery
- Supports request-reply pattern with reply queues
- Structured logging for all operations
- Use Cases: Workflow events, system notifications, inter-service communication
- Responsibility: HTDAG-specific event broadcasting
- Events: Decomposition events, task events, workflow completion, performance metrics
- Integration: Sends to pgmq + NOTIFY with HTDAG-specific payloads
- Responsibility: Transform goals into hierarchical task DAGs
- Key Functions:
decompose_goal/3- Convert goal to task graphcreate_workflow/3- Build workflow from task graphexecute_goal/5- One-shot: decompose + create + execute
- Integration Points: Uses FlowBuilder for workflow creation, Executor for execution
- AI Navigation Notes: Generic HTDAG engine, not specific to any decomposer
- Responsibility: Learn from execution patterns and optimize workflows
- Features:
- Performance analysis (execution times, success rates)
- Dependency optimization for parallelization
- Resource allocation optimization
- Adaptive strategies based on workload
- Pattern learning for future improvements
- Optimization Levels: :basic, :advanced, :aggressive
- Configuration: Preserve structure, max parallelism, timeout thresholds
- Responsibility: High-level convenience API for goal-driven workflows
- Main Function:
compose_from_goal/5- Execute goal with all features (optimization, monitoring) - Features: Enables monitoring, optimization, learning in single call
- Note: Wraps Orchestrator + OrchestratorOptimizer for ease of use
- Responsibility: Strategy pattern for execution mode selection
- Modes:
:local- Execute in current process (DirectBackend):distributed- Execute via Oban for distributed processing (DistributedBackend)
- Purpose: Allows per-step execution mode selection
- Synchronous step execution in current process
- Uses Task.async with timeout
- Default mode for most use cases
- Asynchronous job queuing via Oban
- Supports distributed execution across nodes
- Handles job scheduling and result awaiting
- Internal implementation detail (not exposed to users)
- Wrapper around ObanBackend
- Provides distributed execution capabilities
- GPU job support via
GpuStepJob
- Fields: id, tenant_id, workflow_slug, status, input, output, remaining_steps, error_message, timestamps
- States: started → completed/failed
- Functions: Mark completed, mark failed, changeset validation
- Purpose: Track workflow execution instances
- Fields: run_id, step_slug, status, output, error_message, task_count, completed_tasks
- States: pending → started → completed/failed
- Purpose: Track individual step execution within a run
- Fields: run_id, step_slug, task_index, status, output, message_id, claimed_by
- States: queued → started → completed/failed
- Purpose: Individual task execution records (one per map step instance or step task)
- Fields: run_id, dependent_slug, dependency_slug, waiting_for_count
- Purpose: Track step dependency relationships and completion ordering
- Tracks task execution lineage/ancestry
- Maps parent-child relationships for distributed execution
- Basic worker registration mechanism
- Public API surface, delegates to implementation modules
- Re-exports key functions for convenient access
- Version management
- Low-level messaging utilities
TestClock- Mock time for deterministic testingTestWorkflowPrefix- Unique test workflow namingMoxHelper- Mock setup utilitiesSqlCase- SQL-based testing utilities
- Where: Core workflow execution model
- How: Steps define dependencies via
depends_on: [step_names] - Benefits:
- Enables parallel execution of independent branches
- Automatic dependency resolution
- Cycle detection prevents infinite loops
- Implementation: WorkflowDefinition parses into dependency map, TaskExecutor respects ordering
- Where: Multi-worker task claiming and completion
- How: PostgreSQL tables and functions coordinate execution
- Key Mechanisms:
step_taskstable with row-level locking (FOR UPDATE SKIP LOCKED)start_ready_steps()PostgreSQL function marks ready stepscomplete_task()function cascades completion to dependentsremaining_stepscounter for atomic workflow completion detection
- Benefits: No inter-process communication needed, horizontal scaling
- Trade-off: Requires PostgreSQL, polling latency
- Where:
Execution.Strategymodule - Modes:
- Local (DirectBackend) - synchronous
- Distributed (ObanBackend) - async via background jobs
- Usage: Per-step selection via metadata
- Benefits: Flexible execution model without workflow changes
- Where:
Notifications.Behaviourmodule - Purpose: Enable mocking for test isolation
- Tools: Uses Mox library for mock implementation
- Where: Main
Singularity.Workflowmodule - How:
defdelegateto implementation modules - Purpose: Clean public API surface
- Where: Notifications + OrchestratorNotifications
- Pattern:
- Send message to pgmq queue
- Trigger PostgreSQL NOTIFY
- Listeners receive notification
- Process message from queue
- Benefits: Real-time, decoupled communication, persistent
- Where: HTDAG orchestration
- How: Pass decomposer function to
execute_goal - Benefits: Custom domain logic without code changes
- Example: ExampleDecomposer shows reference implementation
- Where: OrchestratorOptimizer
- Pattern: Analyze metrics → Apply optimizations → Store patterns → Feedback learning
- Levels: Basic (safe) → Advanced (smart) → Aggressive (risky)
- Benefits: Adaptive performance improvement
- Where: Executor, RunInitializer, TaskExecutor
- How: Ecto.Repo.transaction/1 for multi-step operations
- Purpose: Ensure atomicity in database operations
- Where: StepTask + Lineage
- How: Handle both map steps (multiple tasks per step) and single tasks
- Benefits: Flexible bulk processing
| Dependency | Version | Purpose | Usage |
|---|---|---|---|
| jason | ~1.4 | JSON encoding/decoding | Message serialization, workflow I/O |
| telemetry | ~1.0 | Observability/metrics | Performance tracking (structured logging) |
| pgmq | ~0.4 | PostgreSQL message queue | Task coordination, message persistence |
| oban | ~2.17 | Background job processing | Distributed task execution (internal) |
| Dependency | Version | Purpose |
|---|---|---|
| mox | ~1.2 (test only) | Mock library for testing |
| credo | ~1.7 (dev/test) | Code linting & style |
| dialyxir | ~1.4 (dev/test) | Static type checking |
| sobelow | ~0.13 (dev/test) | Security vulnerability scanning |
| excoveralls | ~0.18 (test only) | Code coverage reporting |
| ex_doc | ~0.34 (dev only) | Documentation generation |
- Ecto (database abstraction, assumed in applications using this library)
- Postgrex (PostgreSQL driver, required by Ecto)
- Logger (built-in Elixir logging)
Application using Singularity.Workflow
│
├─ Singularity.Workflow
│ ├─ Ecto (for schema & repo operations)
│ ├─ Postgrex (PostgreSQL driver)
│ ├─ Jason (JSON serialization)
│ ├─ Telemetry (metrics/logging)
│ └─ PGMQ (message coordination)
│ └─ PostgreSQL + pgmq extension
│
└─ Optional: Oban (for distributed execution)
└─ PostgreSQL Oban tables
- Minimal Core: Only essential libraries included
- PostgreSQL-centric: Leverages PostgreSQL capabilities instead of external services
- Test Isolation: Mox enables mock-based testing without external dependencies
- Quality Tools: Credo, Dialyzer, Sobelow ensure production-ready code
- No Framework Lock-in: Works with any Elixir application
- Total Test Lines: 10,566 lines
- Test Files: 26+ test files
- Coverage Tool: ExCoveralls (configured in mix.exs)
- Coverage Command:
mix test.coverage(generates HTML report)
test/
├── singularity_workflow/ # Unit/integration tests
│ ├── executor_test.exs # Core executor tests (sequential/parallel)
│ ├── flow_builder_test.exs # Dynamic workflow creation tests
│ ├── notifications_test.exs # NOTIFY messaging tests
│ ├── orchestrator_test.exs # HTDAG decomposition tests
│ ├── workflow_composer_test.exs # High-level API tests
│ ├── orchestrator_optimizer_test.exs # Optimization tests
│ ├── complete_task_test.exs # PostgreSQL function tests
│ ├── idempotency_test.exs # Idempotency verification
│ │
│ ├── dag/ # DAG module tests
│ │ ├── workflow_definition_test.exs
│ │ ├── run_initializer_test.exs
│ │ ├── task_executor_test.exs
│ │ └── dynamic_workflow_loader_test.exs
│ │
│ ├── orchestrator/ # HTDAG component tests
│ │ ├── config_test.exs
│ │ ├── executor_test.exs
│ │ ├── schemas_test.exs
│ │ └── example_decomposer_test.exs
│ │
│ ├── [Schema tests]
│ │ ├── step_state_test.exs
│ │ ├── step_task_test.exs
│ │ ├── workflow_run_test.exs
│ │ └── step_dependency_test.exs
│ │
│ └── [Utility tests]
│ ├── clock_test.exs
│ ├── test_workflow_prefix_test.exs
│ └── messaging_test.exs
│
├── integration/ # End-to-end tests
│ └── notifications_integration_test.exs
│
└── support/ # Test utilities
├── mox_helper.ex # Mox setup
├── sql_case.ex # SQL testing utilities
└── snapshot.ex # Snapshot testing
test_helper.exs # Test configuration
Pattern: State-based testing instead of interaction-based
- Create workflow/run in database
- Execute operations
- Query database to verify final state
- Assert on database state
Rationale:
- Workflow execution is inherently stateful (database-driven)
- Integration testing validates real PostgreSQL behavior
- Avoids brittle mock-based tests
defmodule TestExecSimpleFlow do
def __workflow_steps__ do
[{:step1, ...}, {:step2, ...}]
end
end- Short names (queue name limit: 47 chars)
- Defined outside test module for reuse
- Multiple fixtures for different scenarios
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Singularity.Workflow.Repo)
Ecto.Adapters.SQL.Sandbox.mode(repo, {:shared, self()})
end- Isolates each test in database transaction
- Allows parallel test execution (async: false due to DB contention)
- Cleans up automatically
Singularity.Workflow.TestClock.reset()- Deterministic timestamps for testing
- Ensures repeatable test results
Singularity.Workflow.TestWorkflowPrefix.cleanup_by_prefix("test_", Repo)- Removes test data from previous runs
- Prevents test pollution
mix test.coverage # Generates HTML report in cover/ directoryworkflow_definition_test.exs- Parse/validate workflow definitionsstep_state_test.exs- Schema changeset testsorchestrator_notifications_test.exs- Event formatting
executor_test.exs- Full workflow executionflow_builder_test.exs- Dynamic workflow creationnotifications_test.exs- NOTIFY messagingcomplete_task_test.exs- PostgreSQL function behavior
notifications_integration_test.exs- Full real-time pipeline
orchestrator_test.exs- Goal decompositionorchestrator_optimizer_test.exs- Learning & optimization
Target: Maximum coverage while maintaining test clarity
- Core modules (Executor, TaskExecutor): >85%
- DAG modules: >90%
- Schemas: >80%
- Utilities: >70%
Coverage Command:
mix test # Run all tests
mix test.coverage # Generate HTML coverage report
mix test --cover # Show terminal coverage summary
mix test --trace # Show detailed output for debugging- Config:
.credo.exs(strict mode enabled) - Checks Enabled:
- Consistency checks (naming, spacing, tabs/spaces)
- Design checks (FIXME/TODO detection)
- Readability checks (function names, module docs, max line length: 120)
- Refactoring opportunities (cyclomatic complexity: max 12, nesting: max 6, arity: max 10)
- Warning checks (deprecated functions, application config in attributes)
- Command:
mix credo --strict
- Config:
priv/plts/dialyzer.plt - Purpose: Static type analysis for type safety
- Command:
mix dialyzer - Note: Checks for consistency with function specs
- Purpose: Identify security vulnerabilities
- Command:
mix sobelow --exit-on-warning
- Tool:
excoveralls - Configuration:
test_coverage: [tool: ExCoveralls]
- Commands:
mix coveralls- Terminal reportmix coveralls.html- HTML reportmix coveralls.detail- Detailed reportmix coveralls.post- Post to external service
- Tool: Built-in
mix format - Config:
.formatter.exs - Command:
mix formatormix quality.fix
# In mix.exs aliases:
quality: [
"format --check-formatted",
"credo --strict",
"dialyzer",
"sobelow --exit-on-warning",
"deps.audit"
]
quality.fix: [
"format",
"credo --strict --fix"
]-
Comprehensive Module Documentation
- Every module has
@moduledocwith examples - Function-level
@docwith parameter descriptions - Decision trees and architectural diagrams in comments
- AI navigation metadata for code understanding
- Every module has
-
Strong Type Safety
- Extensive use of
@specfor function signatures - Proper use of
{:ok, value} | {:error, reason}pattern - Type annotations in schema definitions
- Dialyzer-checked code
- Extensive use of
-
Structured Logging
- Consistent use of Logger with metadata
- Contextual information in every log
- Appropriate log levels (info, warn, error, debug)
- Performance metrics logged
-
Error Handling
- Proper with/else pattern for chaining operations
- Clear error messages and types
- Validation before operations
- Transaction-based atomicity
-
Test-Driven Development
- Comprehensive test suite (10K+ lines)
- Tests mirror production code structure
- Clear test names describing behavior
- Setup/teardown for isolation
-
Complexity Management
- Largest module (OrchestratorOptimizer): 1,040 lines
- Multiple tiers of abstraction (good separation of concerns)
- Some complex algorithms (optimization, learning)
- Within Credo limits (cyclomatic complexity ≤12)
-
Database-Driven Coordination
- Heavy reliance on PostgreSQL for coordination
- Polling-based execution (not event-driven at Elixir level)
- Row-level locking for multi-worker safety
- PostgreSQL functions for atomic operations
-
Documentation Quality
- Comprehensive API reference in docs/
- Architecture documentation
- Deployment guides
- Test structure documentation
- Code examples in module docs
-
Performance Considerations
- Batch polling for efficiency (configurable batch_size)
- Configurable poll intervals
- Connection pooling (pool_size: 10)
- Timeout configuration at multiple levels
| Metric | Value | Assessment |
|---|---|---|
| Total Lines of Code | 7,280 | Well-sized library |
| Total Test Lines | 10,566 | Strong test coverage |
| Number of Modules | 55 | Well-organized |
| Largest Module | 1,040 lines | Acceptable for optimizer |
| Average Module | ~130 lines | Focused modules |
| Test/Code Ratio | 1.45:1 | Good coverage |
| Max Cyclomatic Complexity | 12 | Acceptable (configured) |
| Max Function Arity | 10 | Within limits (configured) |
| Max Nesting Depth | 6 | Reasonable (configured) |
- ✅ Strict Credo enabled
- ✅ Dialyzer type checking
- ✅ Security scanning (Sobelow)
- ✅ Code formatting enforced
- ✅ Comprehensive module documentation
- ✅ Function specs everywhere
- ✅ Structured logging
- ✅ Error handling patterns
- ✅ Chicago-style TDD
# Quality checks in Makefile
quality: # All checks
quality.fix: # Auto-fix formatting issues
test: # Full test suite
test.coverage: # Coverage report
test.watch: # Watch mode (stdin)- Well-architected: Clear separation of concerns with database-driven coordination
- Production-ready: Comprehensive error handling, testing, and documentation
- Extensible: Plugin patterns (decomposers) and multiple execution modes
- Observable: Structured logging, metrics, and notification system
- Type-safe: Extensive use of specs and Dialyzer
- Documented: Every module documented with examples and diagrams
- Tested: 10K+ lines of tests with Chicago-style TDD
- PostgreSQL-centric: Leverage database capabilities instead of external services
- Database-driven DAG: Coordinates distributed execution through PostgreSQL
- Event-driven messaging: NOTIFY + pgmq replaces external message brokers
- Polling-based execution: Simple but effective multi-worker coordination
- Strategy pattern: Flexible execution modes without coupling
- Language: Elixir 1.14+
- Database: PostgreSQL 12+ with pgmq extension
- Job Queue: Oban (internal, for distributed execution)
- Testing: ExUnit with Mox for mocking
- Code Quality: Credo, Dialyzer, Sobelow, ExCoveralls
- Workflow Orchestration: Multi-step task coordination
- Data Pipelines: ETL workflows with parallel branches
- AI/LLM Integration: Dynamic workflow generation
- Microservices Orchestration: Cross-service task coordination
- Batch Processing: Bulk task execution with map steps