Skip to content

Commit e2853f6

Browse files
Implement hybrid approval queue architecture with Redis backend
Implementation of hybrid approval queue combining file-based and Redis backends for improved performance and resilience, following the migration strategy from the approval queue research project. New Components: - _approval_queue_redis_store.py: Redis-backed approval queue store with Redis data structures for requests, batches, and pending sets - _approval_queue_hybrid_store.py: Hybrid store combining file and Redis backends with dual-write strategy, consistency validation, and sync mechanisms - _circuit_breaker.py: Circuit breaker pattern for resilient Redis operations - _feature_flags.py: Feature flag system for gradual rollout of new features - approval_hybrid_backend.py: Hybrid coordination backend integrating with existing approval backend protocol Backend Integration: - Updated approval_backend.py to support hybrid backend (BACKEND_HYBRID) - Added environment variable configuration for Redis connection - Integrated with existing resolve_approval_backend() function Key Features: - Dual-write strategy: Write to both Redis and file for consistency - Fallback mechanisms: Automatic fallback if Redis unavailable - Consistency validation: Monitor and validate consistency between backends - Sync mechanisms: Periodic and on-demand sync between backends - Circuit breaker: Prevent cascading failures from Redis issues - Feature flags: Gradual rollout with environment variable controls Configuration: - TEAAGENT_APPROVAL_COORDINATION_BACKEND=hybrid to enable hybrid backend - TEAAGENT_REDIS_HOST, TEAAGENT_REDIS_PORT for Redis connection - TEAAGENT_REDIS_PASSWORD, TEAAGENT_REDIS_SSL for Redis security - TEAAGENT_REDIS_PRIMARY for write priority (default: true) - TEAAGENT_HYBRID_SYNC_INTERVAL for sync frequency (default: 60) - TEAAGENT_HYBRID_FALLBACK for fallback enablement (default: true) Testing: - Comprehensive unit tests for hybrid store (16 tests, all passing) - Tests for dual-write, fallback, consistency validation, and sync - Mocked Redis tests for Redis-specific functionality Constraint: Implementation only, no production deployment yet Tested: Unit tests for hybrid store (16/16 passing) Confidence: high Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 0b614eb commit e2853f6

7 files changed

Lines changed: 1890 additions & 0 deletions

File tree

teaagent/coordination/approval_backend.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
BACKEND_FILE = 'file'
3030
BACKEND_REMOTE = 'remote'
31+
BACKEND_HYBRID = 'hybrid'
3132

3233

3334
@runtime_checkable
@@ -261,6 +262,47 @@ def resolve_approval_backend(
261262
.lower()
262263
)
263264

265+
if selected == BACKEND_HYBRID:
266+
if workspace_root is None:
267+
raise ValueError('Workspace root is required for hybrid backend')
268+
269+
from teaagent.coordination.approval_hybrid_backend import (
270+
HybridApprovalCoordinationBackend,
271+
)
272+
from teaagent.subagents._approval_queue_redis_store import (
273+
RedisApprovalQueueConfig,
274+
)
275+
276+
# Load Redis configuration from environment
277+
redis_host = os.environ.get('TEAAGENT_REDIS_HOST', 'localhost')
278+
redis_port = int(os.environ.get('TEAAGENT_REDIS_PORT', '6379'))
279+
redis_password = os.environ.get('TEAAGENT_REDIS_PASSWORD') or None
280+
redis_ssl = os.environ.get('TEAAGENT_REDIS_SSL', 'false').lower() == 'true'
281+
redis_primary = (
282+
os.environ.get('TEAAGENT_REDIS_PRIMARY', 'true').lower() == 'true'
283+
)
284+
sync_interval = int(os.environ.get('TEAAGENT_HYBRID_SYNC_INTERVAL', '60'))
285+
enable_fallback = (
286+
os.environ.get('TEAAGENT_HYBRID_FALLBACK', 'true').lower() == 'true'
287+
)
288+
289+
redis_config = RedisApprovalQueueConfig(
290+
host=redis_host,
291+
port=redis_port,
292+
password=redis_password,
293+
ssl=redis_ssl,
294+
)
295+
296+
secret = hmac_secret if hmac_secret is not None else default_hmac_secret()
297+
return HybridApprovalCoordinationBackend(
298+
workspace_root=Path(workspace_root).resolve(),
299+
hmac_secret=secret,
300+
redis_config=redis_config,
301+
redis_primary=redis_primary,
302+
sync_interval_seconds=sync_interval,
303+
enable_fallback=enable_fallback,
304+
)
305+
264306
if selected == BACKEND_REMOTE:
265307
base_url = os.environ.get('TEAAGENT_APPROVAL_COORDINATION_URL', '').strip()
266308
if not base_url:
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
"""Hybrid approval coordination backend combining file and Redis backends."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from pathlib import Path
7+
from typing import Any, Optional
8+
9+
from teaagent.coordination.approval_backend import (
10+
ApprovalQueuePruneReport,
11+
QueueDiskSnapshot,
12+
)
13+
from teaagent.subagents._approval_queue import (
14+
ApprovalBatch,
15+
ApprovalRequestStatus,
16+
SubagentApprovalRequest,
17+
)
18+
from teaagent.subagents._approval_queue_hybrid_store import (
19+
HybridApprovalQueueConfig,
20+
HybridApprovalQueueStore,
21+
)
22+
from teaagent.subagents._approval_queue_redis_store import (
23+
RedisApprovalQueueConfig,
24+
RedisApprovalQueueStore,
25+
)
26+
from teaagent.subagents._approval_queue_store import (
27+
ApprovalQueueStore,
28+
default_hmac_secret,
29+
)
30+
31+
logger = logging.getLogger(__name__)
32+
33+
BACKEND_HYBRID = 'hybrid'
34+
35+
36+
class HybridApprovalCoordinationBackend:
37+
"""Hybrid backend combining file-based and Redis coordination.
38+
39+
This backend provides:
40+
- Fast reads from file-based storage
41+
- Fast writes to Redis with file backup
42+
- Automatic fallback between backends
43+
- Consistency validation and sync
44+
"""
45+
46+
def __init__(
47+
self,
48+
workspace_root: Path,
49+
*,
50+
hmac_secret: Optional[str] = None,
51+
redis_config: Optional[RedisApprovalQueueConfig] = None,
52+
redis_primary: bool = True,
53+
sync_interval_seconds: int = 60,
54+
enable_fallback: bool = True,
55+
) -> None:
56+
self._workspace_root = Path(workspace_root).resolve()
57+
self._hmac_secret = hmac_secret or default_hmac_secret()
58+
59+
config = HybridApprovalQueueConfig(
60+
workspace_root=self._workspace_root,
61+
hmac_secret=self._hmac_secret,
62+
redis_config=redis_config,
63+
redis_primary=redis_primary,
64+
sync_interval_seconds=sync_interval_seconds,
65+
enable_fallback=enable_fallback,
66+
)
67+
68+
self._store = HybridApprovalQueueStore(config)
69+
70+
@property
71+
def backend_id(self) -> str:
72+
return BACKEND_HYBRID
73+
74+
def load_snapshot(self, parent_run_id: str) -> QueueDiskSnapshot:
75+
"""Load snapshot from file (primary) with Redis fallback."""
76+
try:
77+
return self._store.file_store.load(parent_run_id)
78+
except Exception as e:
79+
logger.error(f'File snapshot load failed: {e}')
80+
81+
# Fallback: reconstruct from Redis
82+
if self._store.redis_available and self._store.redis_store:
83+
try:
84+
request_ids = self._store.redis_store.get_all_request_ids(parent_run_id)
85+
batch_ids = self._store.redis_store.get_all_batch_ids(parent_run_id)
86+
87+
requests: dict[str, dict[str, Any]] = {}
88+
batches: dict[str, dict[str, Any]] = {}
89+
90+
for request_id in request_ids:
91+
request = self._store.redis_store.get_request(
92+
parent_run_id, request_id
93+
)
94+
if request:
95+
requests[request_id] = request.to_dict()
96+
97+
for batch_id in batch_ids:
98+
batch = self._store.redis_store.get_batch(parent_run_id, batch_id)
99+
if batch:
100+
batches[batch_id] = batch.to_dict()
101+
102+
return QueueDiskSnapshot(parent_run_id, requests, batches)
103+
except Exception as e:
104+
logger.error(f'Redis snapshot load failed: {e}')
105+
106+
return QueueDiskSnapshot(parent_run_id, {}, {})
107+
108+
def save(
109+
self,
110+
parent_run_id: str,
111+
requests: dict[str, SubagentApprovalRequest],
112+
batches: dict[str, ApprovalBatch],
113+
) -> None:
114+
"""Save snapshot with dual-write strategy."""
115+
# Save requests
116+
for request in requests.values():
117+
self._store.save_request(parent_run_id, request)
118+
119+
# Save batches
120+
for batch in batches.values():
121+
self._store.save_batch(parent_run_id, batch)
122+
123+
def update_request_status(
124+
self,
125+
parent_run_id: str,
126+
request_id: str,
127+
status: ApprovalRequestStatus,
128+
*,
129+
reason: Optional[str] = None,
130+
approved_by: str = 'human',
131+
) -> bool:
132+
"""Update request status with dual-write strategy."""
133+
return self._store.update_request_status(
134+
parent_run_id,
135+
request_id,
136+
status,
137+
reason=reason,
138+
approved_by=approved_by,
139+
)
140+
141+
def list_parent_run_ids(self) -> list[str]:
142+
"""List parent run IDs from file (primary) with Redis fallback."""
143+
return self._store.list_parent_run_ids()
144+
145+
def exists(self, parent_run_id: str) -> bool:
146+
"""Check if parent run exists in file with Redis fallback."""
147+
return self._store.exists(parent_run_id)
148+
149+
def prune_stale(
150+
self,
151+
*,
152+
max_age_seconds: float,
153+
now: Optional[float] = None,
154+
) -> ApprovalQueuePruneReport:
155+
"""Prune stale artifacts from file and Redis."""
156+
# Prune from file
157+
file_report = self._store.file_store.prune_stale(
158+
max_age_seconds=max_age_seconds,
159+
now=now,
160+
)
161+
162+
# Prune from Redis
163+
redis_deleted = 0
164+
if self._store.redis_available and self._store.redis_store:
165+
for parent_run_id in file_report.removed_parent_run_ids:
166+
try:
167+
self._store.redis_store.delete_parent_run(parent_run_id)
168+
redis_deleted += 1
169+
except Exception as e:
170+
logger.error(f'Redis prune failed for {parent_run_id}: {e}')
171+
172+
return ApprovalQueuePruneReport(
173+
removed_parent_run_ids=file_report.removed_parent_run_ids,
174+
skipped_pending=file_report.skipped_pending,
175+
skipped_recent=file_report.skipped_recent,
176+
)
177+
178+
def sync_to_file(self, parent_run_id: str) -> dict:
179+
"""Sync Redis state to file."""
180+
return self._store.sync_to_file(parent_run_id)
181+
182+
def sync_to_redis(self, parent_run_id: str) -> dict:
183+
"""Sync file state to Redis."""
184+
return self._store.sync_to_redis(parent_run_id)
185+
186+
def validate_consistency(self, parent_run_id: str) -> dict:
187+
"""Validate consistency between file and Redis backends."""
188+
return self._store.validate_consistency(parent_run_id)
189+
190+
@property
191+
def redis_available(self) -> bool:
192+
"""Check if Redis is available."""
193+
return self._store.redis_available
194+
195+
@property
196+
def file_store(self) -> ApprovalQueueStore:
197+
"""Get the file store."""
198+
return self._store.file_store
199+
200+
@property
201+
def redis_store(self) -> Optional[RedisApprovalQueueStore]:
202+
"""Get the Redis store."""
203+
return self._store.redis_store

0 commit comments

Comments
 (0)