Skip to content

Commit 2b9bb1a

Browse files
authored
Merge pull request #27 from nadeem4/feat/observability-implementation
Feat/observability implementation
2 parents a396587 + f5446eb commit 2b9bb1a

23 files changed

Lines changed: 716 additions & 65 deletions

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ The architecture is composed of three distinct planes, ensuring separation of co
3939
* **Layered Defense**: A combination of **[Retries, Circuit Breakers, and Sandboxing](docs/core/reliability.md)** ensures the system stays up even when LLMs or Databases go down.
4040
* **Fail-Fast**: We stop processing immediately if a dependency is unresponsive, preserving resources.
4141

42+
### 5. The Observability Plane (The Watchtower)
43+
44+
**Responsibility**: Visibility, Forensics, and Compliance.
45+
46+
* **Full-Stack Telemetry**: Native [OpenTelemetry](docs/ops/observability.md) integration provides distributed tracing (Jaeger) and metrics (Prometheus) for every node execution.
47+
* **Forensic Audit Logs**: A tamper-evident, persistent [Audit Log](docs/ops/observability.md#3-persistent-audit-log) records every AI decision (Prompt/Response/Reasoning) for compliance and debugging.
48+
4249
---
4350

4451
## 📐 Architectural Invariants
@@ -92,6 +99,7 @@ nl2sql setup --demo
9299
* **[Security Model](docs/safety/security.md)**: Defense-in-depth strategy against prompt injection and unauthorized access.
93100
* **[Security Model](docs/safety/security.md)**: Defense-in-depth strategy against prompt injection and unauthorized access.
94101
* **[Reliability & Fault Tolerance](docs/core/reliability.md)**: Guide to Circuit Breakers, Sandbox isolation, and Recovery strategies.
102+
* **[Observability & Operations](docs/ops/observability.md)**: Configuring OpenTelemetry, Logging, and Audit Trails.
95103

96104
---
97105

audit/remediation_plan.md

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,22 @@ This document serves as the master backlog for addressing findings from the Arch
7171
- **Status**: Fixed. Implemented in `nl2sql.common.resilience` and verified in `tests/unit/test_resilience.py`.
7272

7373
- [ ] **ENH-003: OpenTelemetry Integration** (P1 - High)
74-
- **Value**: Enable standard APM features (Datadog/Jaeger) for trace visualization.
75-
- **Action**: Replace custom `json` logging with OTelSDK.
74+
- **Value**: Standardize metrics export (Latency, Token Counts, Errors) to OTLP-compatible backends (Datadog, Honeycomb).
75+
- **Action**: Replace in-memory `LATENCY_LOG` with `opentelemetry-sdk` MeterProvider.
76+
- **Dependencies**: `opentelemetry-api`, `opentelemetry-sdk`, `opentelemetry-exporter-otlp`.
7677

7778
- [ ] **ENH-004: Persistent Audit Log** (P1 - High)
78-
- **Value**: Required for Compliance and Regression Testing.
79-
- **Action**: Create a `request_audit` database table to store Query/Plan/SQL tuples.
79+
- **Value**: Enable forensic debugging of AI decisions ("Time Travel Debugging").
80+
- **Action**: Implement `EventLogger` that writes {prompt, response, trace_id, duration} to a persistent store (file/DB) securely.
8081

8182
- [ ] **ENH-005: Tenant-Aware RLS Middleware** (P2 - Medium)
8283
- **Value**: Defense-in-depth enforcement of multi-tenancy.
83-
- **Action**: Implement a SQL transformation layer in `Generator` that automatically injects `WHERE tenant_id = ?` clauses into every generated AST.
84+
- **Action**: Implement a SQL transformation layer in `Generator` that automatically injects `WHERE tenant_id = ?` clauses into every generic AST.
8485

8586
- [ ] **ENH-006: Streaming Response Support** (P2 - Medium)
8687
- **Value**: Improves perceived latency.
8788
- **Action**: Update `AggregatorNode` to stream tokens to the frontend instead of waiting for full generation.
89+
90+
- [ ] **ENH-007: Structured Logging (JSON)** (P1 - High)
91+
- **Value**: Machine-readable logs for Splunk/ELK.
92+
- **Action**: Update `nl2sql.common.logger` to support `JsonFormatter` by default in production.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Remediation Plan: Observability & Reliability
2+
3+
**Source Audit**: `production_readiness_report.md`
4+
**Date**: 2026-01-13
5+
**Focus**: Telemetry, Forensics, and Production Visibility.
6+
7+
---
8+
9+
## 🚀 High Priority Enhancements
10+
11+
### [x] **ENH-OBS-001: OpenTelemetry Integration** (P0 - Critical)
12+
13+
- **Goal**: Enable standard APM features (Datadog, Jaeger, Honeycomb) for trace and metric visualization.
14+
- **Problem**: Current metrics (`LATENCY_LOG`) are in-memory only and lost on restart. No visualization of latency distribution.
15+
- **Implementation**:
16+
- [x] **Wire Up**: Connect `monitor.py` to the existing `opentelemetry-sdk` (dependencies already present).
17+
- [x] **Refactor**: Update `nl2sql.common.metrics` to replace in-memory lists with `MeterProvider`.
18+
- [x] **Instrument**: Update `monitor.py` to record OTeL Histograms for node execution duration.
19+
- [x] **Instrument**: Update `TokenHandler` to record OTeL Counters for token usage.
20+
- [x] **Config**: Wire up `OBSERVABILITY_EXPORTER` setting to initialize the generic exporter in `monitor.py`.
21+
22+
### Backend Strategy (Local & Prod)
23+
>
24+
> **Why OTLP?** It decouples Python code from the backend. Code sends to `OTLP Collector`, which routes data.
25+
26+
- **Traces** (Waterfalls): Sent to **Jaeger** (Local) or Datadog/Honeycomb (Prod).
27+
- **Metrics** (Latency/Errors): Sent to **Prometheus** (Local) or Datadog (Prod).
28+
- **Visualization**: Use **Grafana** to view Prometheus metrics and Jaeger traces in one UI.
29+
30+
### [x] **ENH-OBS-002: Structured Logging (JSON)** (P0 - Critical)
31+
32+
- **Goal**: Machine-readable logs for ingestion by Splunk/ELK/Datadog.
33+
- **Problem**: Logs are text-based and lack easy parsing for fields like `trace_id` or `user_id`.
34+
- **Implementation**:
35+
- [x] **Enable**: Wire `OBSERVABILITY_EXPORTER=otlp` to trigger the existing `JsonFormatter` in `configure_logging()`.
36+
- [x] **Verify**: Ensure `trace_id` injection (already implemented in `TraceContextFilter`) works correctly with the JSON output.
37+
38+
### [x] **ENH-OBS-003: Persistent Audit Log** (P1 - High)
39+
40+
- **Goal**: Forensic "Time Travel" debugging for AI decisions.
41+
- **Problem**: "Reasoning" is transient. We cannot explain past AI decisions to customers.
42+
- **Implementation**:
43+
- [x] Create `EventLogger` class.
44+
- [x] Log `{trace_id, timestamp, node, prompt_text, response_text, model, tokens}` to a persistent store (initially `events.log` rotated file, extensible to DB).
45+
- [x] Ensure PII/Secrets are sanitized before logging prompts.
46+
47+
## 🛠️ Medium Priority
48+
49+
### [x] **ENH-OBS-004: Tenant Context Propagation** (P2 - Medium)
50+
51+
- **Goal**: Multi-tenant observability.
52+
- **Problem**: Logs don't consistently show which tenant/user initiated the request.
53+
- **Implementation**:
54+
- [x] **Schema Validation**: Define strict Pydantic model for `user_context` in `GraphState` (currently untyped Dict).
55+
- [x] **Correlation**: Inject `tenant_id` from `user_context` into `trace_context` for log correlation.
56+
57+
---
58+
59+
## 📉 Success Metrics
60+
61+
- **Latency Visibility**: Can view p95 latency per node in APM.
62+
- **Error Tracking**: Can alert on "Validation Failure Rate > 5%".
63+
- **Cost Tracking**: Can verify "Token Usage per Tenant".
64+
- **Debuggability**: Can retrieve the exact prompt that caused a specific error 24 hours later.

configs/llm.demo.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
version: 1
44
default:
55
provider: openai
6-
model: gpt-4o
6+
model: gpt-5.2
77
temperature: 0.0
88
api_key: ${env:OPENAI_API_KEY}

docs/ops/configuration.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ These settings control the startup behavior and file locations.
2020
| `SECRETS_CONFIG` | `configs/secrets.yaml` | Path to the secrets provider file. |
2121
| `POLICIES_CONFIG` | `configs/policies.json` | Path to the RBAC definitions. |
2222
| `ROUTER_L1_THRESHOLD` | `0.4` | Vector search similarity threshold. |
23+
| `OBSERVABILITY_EXPORTER` | `none` | Telemetry exporter: `otlp` (prod), `console` (dev), `none`. |
24+
| `OTEL_EXPORTER_OTLP_ENDPOINT`| `None` | Endpoint for OTeL Collector (e.g. `http://localhost:4317`). |
25+
| `AUDIT_LOG_PATH` | `logs/audit_events.log` | Path for the persistent forensic audit log. |
2326

2427
## 2. Datasources (`datasources.yaml`)
2528

docs/ops/observability.md

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,75 @@
1-
# Observability
1+
# Observability and Monitoring
22

3-
## Logging
3+
The platform includes a comprehensive observability stack designed for production readiness, leveraging **OpenTelemetry**, **Structured Logging**, and **Forensic Audit Logs**.
44

5-
We use a structured logging approach suitable for production environments (Splunk, Datadog, ELK).
5+
## 1. Metrics & Tracing (OpenTelemetry)
66

7-
* **Format**: JSON (Production) or Human-Readable (Dev).
8-
* **Attributes**: Logs include `request_id`, `user_id`, `node_name`, and `execution_time`.
7+
We use **OpenTelemetry (OTel)** for vendor-neutral instrumentation.
98

10-
### Enabling JSON Logs
9+
### Configuration
1110

12-
Set the environment variable or use the flag:
11+
Set the following environment variables:
1312

14-
```bash
15-
export LOG_FORMAT=json
16-
# or
17-
nl2sql run "query" --json-logs
18-
```
13+
- `OBSERVABILITY_EXPORTER="otlp"`: Enables the OTLP exporter (requires a collector like Jaeger or Datadog Agent).
14+
- `OTEL_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"`: The endpoint for the collector (gRPC).
1915

20-
::: nl2sql.common.logger.JsonFormatter
16+
### Key Metrics
2117

22-
## Tracing
18+
| Metric Name | Type | Unit | Attributes | Description |
19+
| :--- | :--- | :--- | :--- | :--- |
20+
| `nl2sql.token.usage` | Counter | `1` | `model`, `agent`, `datasource_id` | Total LLM tokens consumed. |
21+
| `nl2sql.node.duration` | Histogram | `s` | `node`, `datasource_id` | Execution duration of graph nodes. |
2322

24-
The platform is instrumented with [LangSmith](https://smith.langchain.com/) for deep tracing of the Agentic Graph.
23+
### Visualization
2524

26-
1. Set `LANGCHAIN_TRACING_V2=true`.
27-
2. Set `LANGCHAIN_API_KEY=...`.
25+
- **Local**: Use [Jaeger](https://www.jaegertracing.io/) for traces and [Prometheus](https://prometheus.io/) for metrics.
26+
- **Production**: Compatible with Datadog, Honeycomb, New Relic, etc.
2827

29-
This will stream full traces of the Planner, Validator, and Generator steps to the LangSmith dashboard.
28+
## 2. Structured Logging
3029

31-
## Metrics (Prometheus)
30+
For production, logs are output in **JSON format** to facilitate parsing by aggregators (Splunk, ELK).
3231

33-
The platform exposes a `/metrics` endpoint for Prometheus scraping.
32+
- **Activation**: JSON logging is automatically enabled when `OBSERVABILITY_EXPORTER="otlp"`.
33+
- **Correlation**: Every log entry includes a `trace_id` and `tenant_id` (if authenticated) to correlate logs across the request lifecycle.
3434

35-
### Key Metrics
35+
**Example Log Entry:**
3636

37-
| Metric Name | Type | Description |
38-
| :--- | :--- | :--- |
39-
| `nl2sql_requests_total` | Counter | Total number of requests served. |
40-
| `nl2sql_request_latency_seconds` | Histogram | End-to-end latency distribution. |
41-
| `nl2sql_token_usage_total` | Counter | Total LLM tokens consumed (prompt + completion). |
42-
| `nl2sql_active_connections` | Gauge | Current number of active DB connections. |
37+
```json
38+
{
39+
"timestamp": "2024-01-01T12:00:00",
40+
"level": "INFO",
41+
"message": "Planning phase completed",
42+
"trace_id": "8a3c...",
43+
"tenant_id": "org_123",
44+
"node": "planner"
45+
}
46+
```
47+
48+
## 3. Persistent Audit Log
49+
50+
For forensic analysis and "Time Travel" debugging, the system maintains a separate, persistent audit log.
51+
52+
- **Location**: `logs/audit_events.log` (Rotation enabled: 10MB x 5 backups).
53+
- **Content**: detailed record of AI Decisions (Prompt inputs, Model responses, Token usage).
54+
- **Purpose**: Allows operators to answer "Why did the AI say X?" hours or days later.
55+
56+
**Event Structure:**
57+
58+
```json
59+
{
60+
"timestamp": "...",
61+
"event_type": "llm_interaction",
62+
"trace_id": "...",
63+
"tenant_id": "...",
64+
"data": {
65+
"agent": "planner",
66+
"model": "gpt-4o",
67+
"response_snippet": "SELECT * FROM...",
68+
"token_usage": {"total_tokens": 150}
69+
}
70+
}
71+
```
4372

44-
### Grafana Dashboard
73+
## 4. Legacy Tooling
4574

46-
A standard Grafana dashboard ID `#12345` is available for import to visualize these metrics.
75+
The CLI `Performance Tree` is preserved for local development convenience but piggybacks on the same instrumentation hooks.

packages/core/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ dependencies = [
1717
"pydantic>=1.10",
1818
"opentelemetry-api>=1.20.0",
1919
"opentelemetry-sdk>=1.20.0",
20+
"opentelemetry-exporter-otlp>=1.20.0",
2021
"sqlglot>=23.0.0",
2122
"pydantic-settings>=2.0.0",
2223
"pandas>=1.5.0", # For metrics/evals
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import logging
2+
import json
3+
import os
4+
from logging.handlers import RotatingFileHandler
5+
from typing import Any, Dict, Optional
6+
from datetime import datetime
7+
from nl2sql.common.settings import settings
8+
9+
class EventLogger:
10+
"""Persistent audit logger for high-value AI events.
11+
12+
Writes structured JSON events to a dedicated log file, separate from
13+
application debug logs. Used for forensic analysis and "Time Travel" debugging.
14+
"""
15+
16+
def __init__(self):
17+
self.logger = logging.getLogger("nl2sql.audit")
18+
self.logger.setLevel(logging.INFO)
19+
self.logger.propagate = False # Do not bubble up to root logger (avoid stdout spam)
20+
21+
# Ensure handlers are set up (singleton-ish check)
22+
if not self.logger.handlers:
23+
log_path = getattr(settings, "audit_log_path", "logs/audit_events.log")
24+
25+
# Ensure directory exists
26+
os.makedirs(os.path.dirname(log_path), exist_ok=True)
27+
28+
# 10MB per file, max 5 backup files
29+
handler = RotatingFileHandler(
30+
log_path, maxBytes=10*1024*1024, backupCount=5, encoding="utf-8"
31+
)
32+
33+
# Use specific JSON formatter for audit events
34+
formatter = logging.Formatter("%(message)s")
35+
handler.setFormatter(formatter)
36+
37+
self.logger.addHandler(handler)
38+
39+
def log_event(
40+
self,
41+
event_type: str,
42+
payload: Dict[str, Any],
43+
trace_id: Optional[str] = None,
44+
tenant_id: Optional[str] = None
45+
):
46+
"""Logs a structured event to the audit log.
47+
48+
Args:
49+
event_type: Category of event (e.g., 'llm_interaction', 'security_violation')
50+
payload: The event data dictionary.
51+
trace_id: Correlation ID.
52+
tenant_id: Tenant/Customer ID.
53+
"""
54+
55+
sensitive_keys = {"api_key", "password", "secret", "authorization"}
56+
cleaned_payload = self._redact(payload, sensitive_keys)
57+
58+
event = {
59+
"timestamp": datetime.utcnow().isoformat(),
60+
"event_type": event_type,
61+
"trace_id": trace_id,
62+
"tenant_id": tenant_id,
63+
"data": cleaned_payload
64+
}
65+
66+
self.logger.info(json.dumps(event))
67+
68+
def _redact(self, data: Any, keys_to_redact: set) -> Any:
69+
"""Recursively redact sensitive keys from dictionary.
70+
71+
Args:
72+
data: Input data (dict, list, or primitive).
73+
keys_to_redact: Set of lowercase keys to match and redact.
74+
75+
Returns:
76+
The sanitized data structure with sensitive values replaced by '***REDACTED***'.
77+
"""
78+
if isinstance(data, dict):
79+
return {
80+
k: ("***REDACTED***" if k.lower() in keys_to_redact else self._redact(v, keys_to_redact))
81+
for k, v in data.items()
82+
}
83+
elif isinstance(data, list):
84+
return [self._redact(item, keys_to_redact) for item in data]
85+
else:
86+
return data
87+
88+
# Global instance
89+
event_logger = EventLogger()

packages/core/src/nl2sql/common/logger.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
from typing import Any, Dict, Optional
77

88
_trace_id_ctx = contextvars.ContextVar("trace_id", default=None)
9+
_tenant_id_ctx = contextvars.ContextVar("tenant_id", default=None)
910

1011
class TraceContextFilter(logging.Filter):
11-
"""Injects trace_id from contextvar into the log record."""
12+
"""Injects trace_id and tenant_id from contextvars into the log record."""
1213
def filter(self, record):
1314
record.trace_id = _trace_id_ctx.get()
15+
record.tenant_id = _tenant_id_ctx.get()
1416
return True
1517

1618
@contextmanager
@@ -22,6 +24,15 @@ def trace_context(trace_id: str):
2224
finally:
2325
_trace_id_ctx.reset(token)
2426

27+
@contextmanager
28+
def tenant_context(tenant_id: Optional[str]):
29+
"""Context manager to set the tenant_id for the current context."""
30+
token = _tenant_id_ctx.set(tenant_id)
31+
try:
32+
yield
33+
finally:
34+
_tenant_id_ctx.reset(token)
35+
2536
class JsonFormatter(logging.Formatter):
2637
"""Formatter that outputs JSON strings after parsing the LogRecord."""
2738

@@ -43,14 +54,17 @@ def format(self, record: logging.LogRecord) -> str:
4354

4455
if getattr(record, "trace_id", None):
4556
log_record["trace_id"] = record.trace_id
57+
58+
if getattr(record, "tenant_id", None):
59+
log_record["tenant_id"] = record.tenant_id
4660

4761
# Standard LogRecord attributes to ignore
4862
standard_attrs = {
4963
"args", "asctime", "created", "exc_info", "exc_text", "filename",
5064
"funcName", "levelname", "levelno", "lineno", "module",
5165
"msecs", "message", "msg", "name", "pathname", "process",
5266
"processName", "relativeCreated", "stack_info", "thread", "threadName",
53-
"taskName", "trace_id"
67+
"taskName", "trace_id", "tenant_id"
5468
}
5569

5670
for key, value in record.__dict__.items():
@@ -80,10 +94,7 @@ def configure_logging(level: str = "INFO", json_format: bool = False):
8094
if json_format:
8195
handler.setFormatter(JsonFormatter())
8296
else:
83-
# Include trace_id in standard format if present
84-
# This is a bit tricky with dynamic formatting, usually easier to check record in formatter
85-
# For simplicity, we stick to standard format but maybe prepend trace_id if possible?
86-
# We'll stick to a standard format for text logs for now, trace_id mainly for JSON/Production
97+
# Standard text format
8798
formatter = logging.Formatter(
8899
"%(asctime)s - [%(trace_id)s] - %(name)s - %(levelname)s - %(message)s"
89100
)

0 commit comments

Comments
 (0)