Skip to content

Latest commit

 

History

History
671 lines (508 loc) · 16.8 KB

File metadata and controls

671 lines (508 loc) · 16.8 KB
Error in user YAML: (<unknown>): mapping values are not allowed in this context at line 1 column 52
---
description: Workflow Coordination & Agent Tracking: **Date:** January 27, 2026 **Version:** 4.8.2 **Status:** ✅ Complete and Ready for Use --- ## Overview The Empathy Frame
---

Workflow Coordination & Agent Tracking

Date: January 27, 2026 Version: 4.8.2 Status: ✅ Complete and Ready for Use


Overview

The Empathy Framework now supports agent tracking and coordination in BaseWorkflow, enabling workflows to:

  • Track agent liveness via TTL-based heartbeat updates (Pattern 1)
  • Coordinate between agents via TTL-based ephemeral signals (Pattern 2)

These features integrate seamlessly with the existing workflow infrastructure, requiring only optional flags to enable.


Quick Start

Enable Heartbeat Tracking

from empathy_os.workflows.base import BaseWorkflow, ModelTier

class MyWorkflow(BaseWorkflow):
    name = "my-workflow"
    stages = ["stage1", "stage2"]
    tier_map = {
        "stage1": ModelTier.CHEAP,
        "stage2": ModelTier.CAPABLE,
    }

# Enable heartbeat tracking
workflow = MyWorkflow(
    enable_heartbeat_tracking=True,
    agent_id="my-workflow-001",  # Optional: auto-generated if None
)

# Execute workflow - heartbeats published automatically
result = await workflow.execute(input_data)

What happens automatically:

  1. Heartbeat started at workflow launch
  2. Progress updates published before/after each stage
  3. Final heartbeat on completion/failure
  4. Automatic cleanup via TTL (30s default)

Enable Coordination Signals

# Enable coordination between workflows
producer = MyWorkflow(
    enable_coordination=True,
    agent_id="producer",
)

consumer = MyWorkflow(
    enable_coordination=True,
    agent_id="consumer",
)

# Producer signals completion to consumer
await producer.execute()

# Consumer waits for producer signal
signal = consumer.wait_for_signal(
    signal_type="task_complete",
    source_agent="producer",
    timeout=30.0
)

Workflow API

Initialization Parameters

BaseWorkflow(
    # ... existing parameters ...
    enable_heartbeat_tracking: bool = False,
    enable_coordination: bool = False,
    agent_id: str | None = None,
)
Parameter Type Default Description
enable_heartbeat_tracking bool False Enable TTL-based heartbeat tracking
enable_coordination bool False Enable coordination signals
agent_id str | None Auto-generated Agent identifier (format: {workflow_name}-{run_id})

Requirements:

  • Redis backend must be running
  • If Redis is unavailable, features gracefully degrade (warnings logged)

Coordination Methods

send_signal()

Send a coordination signal to another agent.

workflow.send_signal(
    signal_type: str,
    target_agent: str | None = None,
    payload: dict[str, Any] | None = None,
    ttl_seconds: int | None = None,
) -> str

Parameters:

  • signal_type: Type of signal (e.g., "task_complete", "checkpoint", "error")
  • target_agent: Target agent ID (None for broadcast to all agents)
  • payload: Optional signal payload data
  • ttl_seconds: Optional TTL override (default 60 seconds)

Returns:

  • Signal ID if coordination is enabled, empty string otherwise

Example:

# Targeted signal
workflow.send_signal(
    signal_type="task_complete",
    target_agent="orchestrator",
    payload={"result": "success", "data": {...}}
)

# Broadcast to all agents
workflow.send_signal(
    signal_type="abort",
    target_agent=None,  # Broadcast
    payload={"reason": "user_cancelled"}
)

wait_for_signal()

Wait for a coordination signal from another agent (blocking).

workflow.wait_for_signal(
    signal_type: str,
    source_agent: str | None = None,
    timeout: float = 30.0,
    poll_interval: float = 0.5,
) -> CoordinationSignal | None

Parameters:

  • signal_type: Type of signal to wait for
  • source_agent: Optional source agent filter
  • timeout: Maximum wait time in seconds
  • poll_interval: Poll interval in seconds

Returns:

  • CoordinationSignal if received, None if timeout

Example:

# Wait for orchestrator approval
signal = workflow.wait_for_signal(
    signal_type="approval",
    source_agent="orchestrator",
    timeout=60.0
)

if signal:
    proceed_with_deployment(signal.payload)
else:
    raise TimeoutError("No approval received")

check_signal()

Check for a coordination signal without blocking (non-blocking).

workflow.check_signal(
    signal_type: str,
    source_agent: str | None = None,
    consume: bool = True,
) -> CoordinationSignal | None

Parameters:

  • signal_type: Type of signal to check for
  • source_agent: Optional source agent filter
  • consume: If True, remove signal after reading

Returns:

  • CoordinationSignal if available, None otherwise

Example:

# Non-blocking check for abort signal
signal = workflow.check_signal(signal_type="abort")
if signal:
    raise WorkflowAbortedException(signal.payload["reason"])

Coordination Patterns

Pattern: Producer-Consumer

Producer workflow generates data and signals consumer:

class ProducerWorkflow(BaseWorkflow):
    name = "producer"
    stages = ["generate", "validate", "notify"]

    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        if stage_name == "notify":
            # Signal completion to consumer
            self.send_signal(
                signal_type="task_complete",
                target_agent="consumer",
                payload={"data": result_data}
            )
        # ... rest of stage logic

class ConsumerWorkflow(BaseWorkflow):
    name = "consumer"
    stages = ["wait", "process", "report"]

    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        if stage_name == "wait":
            # Wait for producer signal
            signal = self.wait_for_signal(
                signal_type="task_complete",
                source_agent="producer",
                timeout=30.0
            )
            if signal is None:
                raise TimeoutError("Producer timeout")

            return signal.payload, 0, 0
        # ... rest of stage logic

# Run concurrently
producer = ProducerWorkflow(enable_coordination=True, agent_id="producer")
consumer = ConsumerWorkflow(enable_coordination=True, agent_id="consumer")

producer_task = asyncio.create_task(producer.execute())
consumer_task = asyncio.create_task(consumer.execute())

await asyncio.gather(producer_task, consumer_task)

Pattern: Checkpoint Synchronization

Multiple agents synchronize at checkpoints:

class WorkerWorkflow(BaseWorkflow):
    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        if stage_name == "checkpoint":
            # Signal checkpoint reached
            self.send_signal(
                signal_type="checkpoint",
                target_agent="orchestrator",
                payload={"status": "ready"}
            )

            # Wait for proceed signal
            signal = self.wait_for_signal(
                signal_type="proceed",
                source_agent="orchestrator",
                timeout=60.0
            )
            # ... continue

class OrchestratorWorkflow(BaseWorkflow):
    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        if stage_name == "wait_checkpoint":
            # Wait for all workers
            expected_workers = 3
            checkpoints = []

            for _ in range(expected_workers):
                signal = self.wait_for_signal(
                    signal_type="checkpoint",
                    timeout=120.0
                )
                checkpoints.append(signal)

            # Broadcast proceed to all workers
            self.send_signal(
                signal_type="proceed",
                target_agent=None,  # Broadcast
                payload={"timestamp": datetime.now().isoformat()}
            )

Pattern: Abort on Error

One workflow signals abort, all others check periodically:

class WorkerWorkflow(BaseWorkflow):
    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        # Check for abort before each stage
        abort_signal = self.check_signal(signal_type="abort")
        if abort_signal:
            reason = abort_signal.payload.get("reason", "unknown")
            raise WorkflowAbortedException(f"Aborted: {reason}")

        # On error, signal abort to others
        try:
            result = process_work()
        except Exception as e:
            self.send_signal(
                signal_type="abort",
                target_agent=None,  # Broadcast
                payload={"reason": str(e), "source": self._agent_id}
            )
            raise

# Run multiple workers
workers = [
    WorkerWorkflow(enable_coordination=True, agent_id=f"worker-{i}")
    for i in range(5)
]

tasks = [asyncio.create_task(w.execute()) for w in workers]
results = await asyncio.gather(*tasks, return_exceptions=True)

Heartbeat Tracking Integration

Automatic Heartbeat Updates

When enable_heartbeat_tracking=True, BaseWorkflow automatically:

  1. Start Heartbeat - On execute() call
{
    "agent_id": "my-workflow-abc12345",
    "status": "starting",
    "progress": 0.0,
    "current_task": "initializing",
    "metadata": {
        "workflow": "my-workflow",
        "run_id": "abc12345",
        "provider": "anthropic",
        "stages": 3
    }
}
  1. Stage Start Update - Before each stage
{
    "status": "running",
    "progress": 0.33,  # stage_index / len(stages)
    "current_task": "Running stage: stage1 (cheap)"
}
  1. Stage Completion Update - After each stage
{
    "status": "running",
    "progress": 0.66,
    "current_task": "Completed stage: stage1"
}
  1. Final Status - On workflow completion
{
    "status": "completed",  # or "failed"
    "progress": 1.0
}

Monitoring Active Workflows

# CLI: View all active agents
empathy telemetry agents

# Output:
# 🤖 Active Agents
# ----------------------------------------------------------------------
#   🟢 producer-abc12345
#       Status:       running
#       Progress:     66.7%
#       Task:         Running stage: validate (capable)
#       Last beat:    2.3s ago
#       Workflow:     producer

Python API:

from empathy_os.telemetry import HeartbeatCoordinator

coordinator = HeartbeatCoordinator()

# Get all active agents
active = coordinator.get_active_agents()
for agent in active:
    print(f"{agent.agent_id}: {agent.status} ({agent.progress*100:.0f}%)")

# Check if specific workflow is alive
if coordinator.is_agent_alive("my-workflow-abc12345"):
    print("Workflow is running!")

# Detect stale workflows (no update in 60s)
stale = coordinator.get_stale_agents(threshold_seconds=60.0)

Configuration

Redis Requirements

Both heartbeat tracking and coordination require Redis:

# Start Redis
redis-server

# Or use Empathy command
empathy memory start

# Verify Redis is running
empathy memory status

TTL Configuration

Heartbeat TTL (default: 30 seconds):

from empathy_os.telemetry import HeartbeatCoordinator

HeartbeatCoordinator.HEARTBEAT_TTL = 60  # Increase for longer-running workflows

Signal TTL (default: 60 seconds):

from empathy_os.telemetry import CoordinationSignals

CoordinationSignals.DEFAULT_TTL = 120  # Increase for slower coordination

# Or per-signal:
workflow.send_signal(
    signal_type="checkpoint",
    target_agent="orchestrator",
    payload={...},
    ttl_seconds=300  # 5 minutes
)

Graceful Degradation

When Redis is unavailable:

  • Warnings logged: "Failed to initialize HeartbeatCoordinator (Redis unavailable?)"
  • Features silently disabled
  • Workflow execution continues normally
  • Methods return empty values ("", None)

No exceptions raised - workflows remain functional without coordination.


Demo Script

Run the complete demonstration:

python examples/coordinated_workflow_demo.py

Includes:

  1. Producer-Consumer pattern with coordination
  2. Orchestrator pattern with broadcasts
  3. Abort signal handling

Output:

======================================================================
PRODUCER-CONSUMER WORKFLOW DEMONSTRATION
======================================================================

Starting producer and consumer workflows...

[producer] Starting workflow...
[consumer] Starting workflow...
[consumer] Waiting for producer signal...
[producer] Completed generate stage
[producer] Completed validate stage
[producer] Sent task_complete signal to consumer
[consumer] Received signal from producer!
[consumer] Processing data...

======================================================================
RESULTS
======================================================================
Producer: ✅ Success
Consumer: ✅ Success

Integration Testing

Test Heartbeat Integration

import pytest
from empathy_os.workflows.base import BaseWorkflow, ModelTier


class TestWorkflow(BaseWorkflow):
    name = "test-workflow"
    stages = ["stage1"]
    tier_map = {"stage1": ModelTier.CHEAP}

    async def run_stage(self, stage_name: str, tier: ModelTier, input_data: dict):
        return {"result": "success"}, 10, 5


@pytest.mark.asyncio
async def test_workflow_with_heartbeat_tracking():
    """Test workflow with heartbeat tracking enabled."""
    from empathy_os.telemetry import HeartbeatCoordinator

    coordinator = HeartbeatCoordinator()

    # Clear any existing heartbeats
    coordinator._heartbeat_coordinator = None

    workflow = TestWorkflow(
        enable_heartbeat_tracking=True,
        agent_id="test-workflow-001",
    )

    # Execute workflow
    result = await workflow.execute()

    assert result.success

    # Note: Heartbeat will have expired after workflow completion
    # as final status removes the heartbeat from Redis

Test Coordination Integration

@pytest.mark.asyncio
async def test_producer_consumer_coordination():
    """Test coordination between producer and consumer workflows."""
    producer = ProducerWorkflow(
        enable_coordination=True,
        agent_id="producer-test",
    )

    consumer = ConsumerWorkflow(
        enable_coordination=True,
        agent_id="consumer-test",
    )

    # Run concurrently
    producer_task = asyncio.create_task(producer.execute())
    consumer_task = asyncio.create_task(consumer.execute())

    producer_result, consumer_result = await asyncio.gather(
        producer_task, consumer_task
    )

    assert producer_result.success
    assert consumer_result.success

Related Documentation


Next Steps

Remaining Patterns

Pattern 4: Real-Time Event Streaming (not yet implemented)

  • Redis Streams + WebSocket for live updates
  • Real-time dashboard integration

Pattern 5: Human Approval Gates (not yet implemented)

  • Pause workflow for human decisions
  • Approval signal integration

Pattern 6: Agent-to-LLM Feedback Loop (not yet implemented)

  • Quality ratings inform routing
  • Automatic tier selection based on feedback

FAQ

Q: Do I need Redis to use workflows?

A: No. Redis is only required for heartbeat tracking and coordination. Workflows work normally without Redis - these features just gracefully degrade.

Q: What happens if Redis crashes during workflow execution?

A: Heartbeat updates and signal operations will fail silently (warnings logged). Workflow execution continues normally. When Redis restarts, new workflows will resume using coordination features.

Q: Can I use heartbeat tracking without coordination?

A: Yes. The features are independent:

# Heartbeat only
workflow = MyWorkflow(enable_heartbeat_tracking=True)

# Coordination only
workflow = MyWorkflow(enable_coordination=True)

# Both
workflow = MyWorkflow(enable_heartbeat_tracking=True, enable_coordination=True)

Q: How do I monitor workflows in production?

A: Use the CLI commands:

# View active workflows
empathy telemetry agents

# View pending signals for a workflow
empathy telemetry signals --agent my-workflow-abc12345

Or build a custom dashboard using the Python API from HeartbeatCoordinator and CoordinationSignals.


Status: ✅ Ready for Production Use Version: 4.8.2 Last Updated: January 27, 2026 Dependencies: Redis 5.0+ (optional)