Skip to content

Commit 1880277

Browse files
feat: comprehensive hybrid approval queue enhancements
Implement extensive feature set for hybrid approval queue architecture combining file-based and Redis backends with enterprise-grade capabilities. Core Infrastructure: - Circuit breaker integration for Redis operations with configurable thresholds - Comprehensive metrics collection for all operations (latency, success rates, etc.) - Dynamic sync interval adjustment based on system load - Automatic cleanup of orphaned and expired requests - Graceful shutdown with final sync and connection cleanup Advanced Features: - Request compression for large payloads (>1KB threshold) - Request deduplication with configurable time window (default 300s) - TTL/auto-expiration for requests (default 3600s) - Priority queue support for ordered request processing - Health check endpoint monitoring all queue components - Request validation schema with comprehensive field checks Extended Capabilities: - Rate limiting per subagent (default 60 requests/minute) - Request cancellation for pending requests - Request search and filtering (by subagent, tool, status) - Request export/import for backup and migration (JSON format) - Audit trail tracking all operations for compliance - Request archival for old requests (default 30 days) - Encryption for sensitive data using Fernet symmetric encryption Testing: - 91 comprehensive tests across 6 test files - Core functionality tests (25 tests) - Performance benchmark tests (9 tests) - Redis failure scenario tests (10 tests) - Batch operation tests (16 tests) - Advanced feature tests (15 tests) - Extended feature tests (16 tests) - All tests passing with full coverage Documentation: - Complete configuration guide with all features documented - Environment variable configuration examples - Usage examples for each feature - Operational best practices Configuration Options: - enable_compression / compression_threshold_bytes - enable_deduplication / deduplication_window_seconds - enable_ttl / default_ttl_seconds - enable_priority - enable_health_check - enable_rate_limiting / rate_limit_requests_per_minute - enable_audit_trail - enable_encryption - enable_archival / archival_age_days Constraint: Requires cryptography package for encryption feature; Redis optional for hybrid mode Tested: 91 tests covering all features including core functionality, performance benchmarks, Redis failure scenarios, batch operations, advanced features, and extended features Confidence: high Roadmap-Status: unchanged Generated with [Devin](https://cli.devin.ai/docs) Co-Authored-By: Devin <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent e2853f6 commit 1880277

11 files changed

Lines changed: 4655 additions & 322 deletions

docs/guides/hybrid-approval-queue-configuration.md

Lines changed: 717 additions & 0 deletions
Large diffs are not rendered by default.

teaagent/coordination/approval_backend.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ def resolve_approval_backend(
272272
from teaagent.subagents._approval_queue_redis_store import (
273273
RedisApprovalQueueConfig,
274274
)
275+
from teaagent.subagents._circuit_breaker import CircuitBreakerConfig
275276

276277
# Load Redis configuration from environment
277278
redis_host = os.environ.get('TEAAGENT_REDIS_HOST', 'localhost')
@@ -285,6 +286,29 @@ def resolve_approval_backend(
285286
enable_fallback = (
286287
os.environ.get('TEAAGENT_HYBRID_FALLBACK', 'true').lower() == 'true'
287288
)
289+
enable_circuit_breaker = (
290+
os.environ.get('TEAAGENT_HYBRID_CIRCUIT_BREAKER', 'true').lower() == 'true'
291+
)
292+
enable_dynamic_sync = (
293+
os.environ.get('TEAAGENT_HYBRID_DYNAMIC_SYNC', 'true').lower() == 'true'
294+
)
295+
296+
# Load circuit breaker configuration from environment
297+
cb_failure_threshold = int(
298+
os.environ.get('TEAAGENT_CIRCUIT_BREAKER_FAILURE_THRESHOLD', '5')
299+
)
300+
cb_timeout_seconds = int(
301+
os.environ.get('TEAAGENT_CIRCUIT_BREAKER_TIMEOUT_SECONDS', '60')
302+
)
303+
cb_success_threshold = int(
304+
os.environ.get('TEAAGENT_CIRCUIT_BREAKER_SUCCESS_THRESHOLD', '2')
305+
)
306+
307+
circuit_breaker_config = CircuitBreakerConfig(
308+
failure_threshold=cb_failure_threshold,
309+
timeout_seconds=cb_timeout_seconds,
310+
success_threshold=cb_success_threshold,
311+
)
288312

289313
redis_config = RedisApprovalQueueConfig(
290314
host=redis_host,
@@ -301,6 +325,9 @@ def resolve_approval_backend(
301325
redis_primary=redis_primary,
302326
sync_interval_seconds=sync_interval,
303327
enable_fallback=enable_fallback,
328+
enable_circuit_breaker=enable_circuit_breaker,
329+
circuit_breaker_config=circuit_breaker_config,
330+
enable_dynamic_sync=enable_dynamic_sync,
304331
)
305332

306333
if selected == BACKEND_REMOTE:

teaagent/coordination/approval_hybrid_backend.py

Lines changed: 111 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ApprovalQueueStore,
2828
default_hmac_secret,
2929
)
30+
from teaagent.subagents._circuit_breaker import CircuitBreakerConfig
3031

3132
logger = logging.getLogger(__name__)
3233

@@ -52,6 +53,9 @@ def __init__(
5253
redis_primary: bool = True,
5354
sync_interval_seconds: int = 60,
5455
enable_fallback: bool = True,
56+
enable_circuit_breaker: bool = True,
57+
circuit_breaker_config: Optional[CircuitBreakerConfig] = None,
58+
enable_dynamic_sync: bool = True,
5559
) -> None:
5660
self._workspace_root = Path(workspace_root).resolve()
5761
self._hmac_secret = hmac_secret or default_hmac_secret()
@@ -63,6 +67,9 @@ def __init__(
6367
redis_primary=redis_primary,
6468
sync_interval_seconds=sync_interval_seconds,
6569
enable_fallback=enable_fallback,
70+
enable_circuit_breaker=enable_circuit_breaker,
71+
circuit_breaker_config=circuit_breaker_config,
72+
enable_dynamic_sync=enable_dynamic_sync,
6673
)
6774

6875
self._store = HybridApprovalQueueStore(config)
@@ -81,21 +88,27 @@ def load_snapshot(self, parent_run_id: str) -> QueueDiskSnapshot:
8188
# Fallback: reconstruct from Redis
8289
if self._store.redis_available and self._store.redis_store:
8390
try:
84-
request_ids = self._store.redis_store.get_all_request_ids(parent_run_id)
85-
batch_ids = self._store.redis_store.get_all_batch_ids(parent_run_id)
91+
request_ids = self._store._call_redis(
92+
self._store.redis_store.get_all_request_ids, parent_run_id
93+
)
94+
batch_ids = self._store._call_redis(
95+
self._store.redis_store.get_all_batch_ids, parent_run_id
96+
)
8697

8798
requests: dict[str, dict[str, Any]] = {}
8899
batches: dict[str, dict[str, Any]] = {}
89100

90101
for request_id in request_ids:
91-
request = self._store.redis_store.get_request(
92-
parent_run_id, request_id
102+
request = self._store._call_redis(
103+
self._store.redis_store.get_request, parent_run_id, request_id
93104
)
94105
if request:
95106
requests[request_id] = request.to_dict()
96107

97108
for batch_id in batch_ids:
98-
batch = self._store.redis_store.get_batch(parent_run_id, batch_id)
109+
batch = self._store._call_redis(
110+
self._store.redis_store.get_batch, parent_run_id, batch_id
111+
)
99112
if batch:
100113
batches[batch_id] = batch.to_dict()
101114

@@ -164,7 +177,9 @@ def prune_stale(
164177
if self._store.redis_available and self._store.redis_store:
165178
for parent_run_id in file_report.removed_parent_run_ids:
166179
try:
167-
self._store.redis_store.delete_parent_run(parent_run_id)
180+
self._store._call_redis(
181+
self._store.redis_store.delete_parent_run, parent_run_id
182+
)
168183
redis_deleted += 1
169184
except Exception as e:
170185
logger.error(f'Redis prune failed for {parent_run_id}: {e}')
@@ -201,3 +216,93 @@ def file_store(self) -> ApprovalQueueStore:
201216
def redis_store(self) -> Optional[RedisApprovalQueueStore]:
202217
"""Get the Redis store."""
203218
return self._store.redis_store
219+
220+
def get_circuit_breaker_stats(self) -> Optional[dict]:
221+
"""Get circuit breaker statistics."""
222+
return self._store.get_circuit_breaker_stats()
223+
224+
def get_metrics(self) -> dict[str, Any]:
225+
"""Get metrics for the hybrid backend."""
226+
return self._store.get_metrics()
227+
228+
def cleanup_orphaned_requests(
229+
self,
230+
max_age_seconds: float = 3600,
231+
timeout_seconds: float = 180,
232+
) -> dict:
233+
"""Clean up orphaned requests."""
234+
return self._store.cleanup_orphaned_requests(
235+
max_age_seconds=max_age_seconds,
236+
timeout_seconds=timeout_seconds,
237+
)
238+
239+
def health_check(self) -> dict[str, Any]:
240+
"""Perform health check on the hybrid backend."""
241+
return self._store.health_check()
242+
243+
def set_request_priority(
244+
self, parent_run_id: str, request_id: str, priority: int
245+
) -> bool:
246+
"""Set priority for a request."""
247+
return self._store.set_request_priority(parent_run_id, request_id, priority)
248+
249+
def get_pending_requests_by_priority(
250+
self, parent_run_id: str
251+
) -> list[SubagentApprovalRequest]:
252+
"""Get pending requests sorted by priority."""
253+
return self._store.get_pending_requests_by_priority(parent_run_id)
254+
255+
def validate_request(
256+
self, request: SubagentApprovalRequest
257+
) -> tuple[bool, list[str]]:
258+
"""Validate a request."""
259+
return self._store.validate_request(request)
260+
261+
def shutdown(self) -> None:
262+
"""Gracefully shutdown the hybrid backend."""
263+
self._store.shutdown()
264+
265+
def cancel_request(self, parent_run_id: str, request_id: str, reason: str) -> bool:
266+
"""Cancel a pending request."""
267+
return self._store.cancel_request(parent_run_id, request_id, reason)
268+
269+
def search_requests(
270+
self,
271+
parent_run_id: str,
272+
*,
273+
subagent_id: Optional[str] = None,
274+
tool_name: Optional[str] = None,
275+
status: Optional[str] = None,
276+
limit: int = 100,
277+
) -> list[SubagentApprovalRequest]:
278+
"""Search and filter requests."""
279+
return self._store.search_requests(
280+
parent_run_id,
281+
subagent_id=subagent_id,
282+
tool_name=tool_name,
283+
status=status,
284+
limit=limit,
285+
)
286+
287+
def export_requests(self, parent_run_id: str, format: str = 'json') -> str:
288+
"""Export requests from a parent run."""
289+
return self._store.export_requests(parent_run_id, format)
290+
291+
def import_requests(
292+
self, parent_run_id: str, data: str, format: str = 'json'
293+
) -> int:
294+
"""Import requests to a parent run."""
295+
return self._store.import_requests(parent_run_id, data, format)
296+
297+
def get_audit_trail(
298+
self,
299+
parent_run_id: Optional[str] = None,
300+
request_id: Optional[str] = None,
301+
limit: int = 100,
302+
) -> list[dict[str, Any]]:
303+
"""Get audit trail entries."""
304+
return self._store.get_audit_trail(parent_run_id, request_id, limit)
305+
306+
def archive_old_requests(self, max_age_days: int = 30) -> dict[str, Any]:
307+
"""Archive old requests."""
308+
return self._store.archive_old_requests(max_age_days)

0 commit comments

Comments
 (0)