Skip to content

Commit 78a11b5

Browse files
committed
Refactor: Enforce ExecutionRequest contract and implement safe sandbox execution (ENH-001/Phase 9)
1 parent 21b1f22 commit 78a11b5

10 files changed

Lines changed: 542 additions & 226 deletions

File tree

audit/remediation_plan.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ This document serves as the master backlog for addressing findings from the Arch
6161

6262
## 🚀 Enhancements (Architecture Upgrade)
6363

64-
- [ ] **ENH-001: Sandboxed Execution Service** (P0 - Critical)
64+
- [x] **ENH-001: Sandboxed Execution Service** (P0 - Critical)
6565
- **Value**: Prevents driver crashes from taking down the Agent and mitigates RCE risks.
6666
- **Action**: Move `ExecutorNode` logic to a separate gRPC service/sidecar with limited privileges.
6767

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
# ADR-002: Circuit Breaker Pattern (ENH-002)
2+
3+
## 1. Problem Statement
4+
5+
The NL2SQL Agent relies on multiple unreliable downstream dependencies:
6+
7+
1. **LLM Providers** (OpenAI/Anthropic): Subject to rate limits, latency spikes, and 503 outages.
8+
2. **Vector Stores** (Chroma/Weaviate): Network I/O, potential lock contention.
9+
3. **SQL Databases**: Network partitions, connection pool exhaustion, "Hard Down" instances.
10+
11+
### The Risks
12+
13+
* **Cascading Failure**: If the LLM is down, 100 concurrent users will launch 100 requests. If we just retry, we amplify the load (Retry Storms).
14+
* **Resource Exhaustion**: Threads hanging on `socket.recv` block the entire web server, even for health checks.
15+
* **Poor UX**: Users wait 60s+ for a timeout instead of getting an immediate "Service Unavailable" message.
16+
* **Degraded Functionality**: Without isolation, a DB failure kills the entire agent, even if the user only wanted to chat or check schema.
17+
18+
## 2. Options Analysis
19+
20+
### Option A: Retries Only (Current State)
21+
22+
We currently implement Exponential Backoff.
23+
24+
* **Pros**: Simple. Handles transient blips.
25+
* **Cons**: catastrophic during full outages. If OpenAI is down for 1 hour, every request still tries 3-5 times, wasting CPU and I/O.
26+
27+
### Option B: In-Process Circuit Breaker (`pybreaker`) - **Recommended**
28+
29+
Use a Python library to track failure rates in memory.
30+
31+
* **Mechanism**:
32+
* **Closed**: Normal operation.
33+
* **Open**: After `fail_max` errors, fail immediately (raise `CircuitBreakerError`). Avoid downstream calls.
34+
* **Half-Open**: After `reset_timeout`, let one request through to test the waters.
35+
* **Pros**: lightweight, easy to integrate via decorators, no extra infrastructure.
36+
* **Cons**: State is per-process (not distributed). Each pod must rediscover the outage independently.
37+
38+
### Option C: Service Mesh (Istio / Linkerd)
39+
40+
Offload resilience to the network layer (Envoy sidecars).
41+
42+
* **Pros**: Distributed, language-agnostic, central control.
43+
* **Cons**: Massive operational overkill for the current scale. Depends on k8s deployment.
44+
45+
## 3. Decision
46+
47+
We will implement **Option B** using the `pybreaker` library.
48+
49+
### Rationale
50+
51+
* **Complexity**: Low. It requires only a decorator on service methods.
52+
* **Granularity**: We can define different breakers for different failure domains (e.g., `LLM_BREAKER`, `DB_BREAKER`).
53+
* **Dependencies**: Adds one small pure-Python dependency.
54+
55+
## 4. Implementation Details
56+
57+
### Thresholds & Classification
58+
59+
**LLM Breaker (`LLM_BREAKER`)**:
60+
61+
* `fail_max = 5`: Open after 5 consecutive failures.
62+
* `reset_timeout = 60s`: Wait 1 minute.
63+
* **Failure Classification**:
64+
* **Failures**: Network Errors, Timed Out, 5xx (Server Error).
65+
* **Ignored**: 429 (Rate Limit), 400 (Bad Request), 401 (Auth). *Rationale: Rate limits are soft failures and should trigger backoff, not circuit breaking.*
66+
67+
**DB/Vector Breaker (`DB_BREAKER`, `VECTOR_BREAKER`)**:
68+
69+
* `fail_max = 5`
70+
* `reset_timeout = 30s` (Faster recovery for infra blips).
71+
* **Failure Classification**: Any unhandled exception or connection error.
72+
73+
### State Transitions (Critical)
74+
75+
* **Closed -> Open**: Triggered by `fail_max` consecutive errors.
76+
* **Open -> Half-Open**: After `reset_timeout` passes.
77+
* **Half-Open -> Closed**: If the *single probe request* succeeds.
78+
* **Half-Open -> Open**: If the *single probe request* fails.
79+
* **Invariant**: The transition to Half-Open MUST be protected by a lock to prevent a Thundering Herd (only ONE request acts as the probe).
80+
81+
### Tiered Resilience (Degradation Strategy)
82+
83+
We will implement "Graceful Degradation" rather than "Hard Stop" when a breaker opens:
84+
85+
| Breaker Open | Impact | Fallback Behavior |
86+
| :--- | :--- | :--- |
87+
| **DB** | Cannot execute SQL | **Accept Query**: Validate intent via LLM. <br> **Response**: "I understood your query, but the database is currently unreachable. Here is the SQL I would have run: ..." |
88+
| **Vector** | Cannot route/search | **Static Route**: Fallback to direct text search (if available) or warn user. <br> **LLM**: Proceed with "Best Effort" routing without examples. |
89+
| **LLM** | Cannot understand | **Critical Failure**: Return "Service Unavailable. Please try again later." |
90+
91+
### Observability
92+
93+
Circuit breaker state changes must be exported as metrics for alerting (SLO dashboards):
94+
95+
* `breaker_open_total{type="llm|db|vector"}`
96+
* `breaker_half_open_total{type="llm|db|vector"}`
97+
* `breaker_closed_total{type="llm|db|vector"}`
98+
* `breaker_failure_total{type="...", error="timeout|503"}`
99+
* `breaker_ignored_failure_total{type="...", error="429|400"}`
100+
101+
## 5. Integration with Sandbox
102+
103+
The Circuit Breaker sits **upstream** of the Sandbox:
104+
`Agent -> Circuit Breaker -> Sandbox -> SQL Driver`
105+
106+
This ensures that:
107+
108+
1. **Crash Isolation**: Sandbox handles Segfaults.
109+
2. **Failure Isolation**: Breaker handles Timeouts/Network Partitions.
110+
111+
### Scope
112+
113+
We will wrap the following critical paths:
114+
115+
1. `LLMService.invoke()`: Protects against AI provider outages.
116+
2. `VectorStore.search()`: Protects against Retrieval outages.
117+
3. `Adapter.connect()`: Protects against SQL DB outages.
118+
119+
## 5. Consequences
120+
121+
* **Positive**:
122+
* **Fail Fast**: Usage during outages will return <10ms errors instead of 30s timeouts.
123+
* **Reduced Load**: Downstream services get breathing room to recover.
124+
* **Negative**:
125+
* **Debugging**: Developers must distinguish between "Service Down" and "Circuit Open" errors.
126+
* **Local State**: Process restarts reset the breaker state.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Contract definitions for the Execution Service Interface.
3+
4+
This module defines the Pydantic models used to communicate between the
5+
Control Plane (Nodes) and the Data Plane (Execution Sandbox).
6+
"""
7+
from typing import Dict, Any, Optional, Literal, List
8+
from pydantic import BaseModel, Field, ConfigDict
9+
10+
class ExecutionRequest(BaseModel):
11+
"""Payload for requesting an operation from the properties sandbox."""
12+
13+
mode: Literal["execute", "dry_run", "cost_estimate", "fetch_schema"] = Field(
14+
..., description="The type of operation to perform."
15+
)
16+
datasource_id: str = Field(..., description="The unique ID of the target datasource.")
17+
engine_type: str = Field(..., description="The SQL engine type (e.g. postgres, sqlite).")
18+
connection_args: Dict[str, Any] = Field(
19+
..., description="Connection arguments (host, user, password, etc.)."
20+
)
21+
sql: Optional[str] = Field(None, description="The SQL query to execute (required for execute/dry_run).")
22+
parameters: Dict[str, Any] = Field(
23+
default_factory=dict, description="Query parameters for parameterized execution."
24+
)
25+
limits: Dict[str, int] = Field(
26+
default_factory=dict,
27+
description="Execution limits (e.g., {'row_limit': 1000, 'timeout_ms': 5000})."
28+
)
29+
30+
model_config = ConfigDict(extra="ignore")
31+
32+
33+
class ExecutionResult(BaseModel):
34+
"""Standardized response from the execution sandbox."""
35+
36+
success: bool = Field(..., description="Whether the operation completed successfully.")
37+
data: Optional[Any] = Field(
38+
None, description="The result payload (Rows, CostEstimate, Schema, etc.)."
39+
)
40+
error: Optional[str] = Field(None, description="Error message if failed.")
41+
metrics: Dict[str, float] = Field(
42+
default_factory=dict, description="Performance metrics (e.g., execution_time_ms)."
43+
)
44+
45+
model_config = ConfigDict(extra="ignore")

packages/core/src/nl2sql/common/sandbox.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,67 @@ def get_indexing_pool() -> ProcessPoolExecutor:
9090
ProcessPoolExecutor: The shared indexing pool instance.
9191
"""
9292
return SandboxManager.get_indexing_pool()
93+
94+
from typing import Callable, Any, Dict
95+
from nl2sql.common.contracts import ExecutionRequest, ExecutionResult
96+
97+
def execute_in_sandbox(
98+
pool: ProcessPoolExecutor,
99+
func: Callable[[ExecutionRequest], ExecutionResult],
100+
request: ExecutionRequest,
101+
timeout_sec: int = 30
102+
) -> ExecutionResult:
103+
"""Executes a function in the sandbox with centralized error handling.
104+
105+
This helper centralizes the logic for handling:
106+
1. BrokenProcessPool (Worker Crash/Segfault/OOM)
107+
2. TimeoutError (Job hung)
108+
3. Generic Exception (Serialization error or other infra failure)
109+
110+
Args:
111+
pool (ProcessPoolExecutor): The execution pool to usage.
112+
func (Callable): The function to execute (must accept ExecutionRequest and return ExecutionResult).
113+
request (ExecutionRequest): The contract payload.
114+
timeout_sec (int): Maximum time to wait for the result.
115+
116+
Returns:
117+
ExecutionResult: The result, safely wrapping any infrastructure errors.
118+
"""
119+
try:
120+
future = pool.submit(func, request)
121+
return future.result(timeout=timeout_sec)
122+
123+
except TimeoutError:
124+
logger.error(f"Sandbox Timeout ({timeout_sec}s) for {func.__name__}")
125+
return ExecutionResult(
126+
success=False,
127+
error=f"Operation timed out after {timeout_sec} seconds. The worker process may be hung.",
128+
metrics={"execution_time_ms": timeout_sec * 1000}
129+
)
130+
131+
except Exception as e:
132+
msg = str(e)
133+
logger.error(f"Sandbox Exception: {msg}")
134+
135+
is_crash = False
136+
try:
137+
from concurrent.futures.process import BrokenProcessPool
138+
if isinstance(e, BrokenProcessPool):
139+
is_crash = True
140+
except ImportError:
141+
pass
142+
143+
if "BrokenProcessPool" in msg or "Terminated" in msg:
144+
is_crash = True
145+
146+
if is_crash:
147+
return ExecutionResult(
148+
success=False,
149+
error=f"SANDBOX CRASH: The worker process terminated abruptly (Segfault/OOM). ({msg})",
150+
metrics={"is_crash": 1.0}
151+
)
152+
153+
return ExecutionResult(
154+
success=False,
155+
error=f"Sandbox Execution Failed: {msg}"
156+
)

0 commit comments

Comments
 (0)