|
| 1 | +# datadog-log-agent Adoption Design |
| 2 | + |
| 3 | +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:writing-plans to create the implementation plan from this design. |
| 4 | +
|
| 5 | +**Goal:** Replace bottlecap's `src/logs/` aggregator and flusher with the shared `datadog-log-agent` crate, while keeping all Lambda-specific logic (`LambdaProcessor`) in bottlecap. |
| 6 | + |
| 7 | +**Architecture:** `datadog-log-agent` owns batching limits, HTTP flushing, zstd compression, retry logic, and OWP support. `LambdaProcessor` remains in bottlecap and continues to own Lambda On-Demand lifecycle concerns (orphan logs, request_id tracking, OOM detection, processing rules). `LambdaProcessor` produces `Vec<LogEntry>` instead of `Vec<String>`. |
| 8 | + |
| 9 | +**Tech Stack:** Rust, tokio, reqwest, zstd, mockito (tests), datadog-log-agent (path dep → git dep post-merge) |
| 10 | + |
| 11 | +--- |
| 12 | + |
| 13 | +## Scope |
| 14 | + |
| 15 | +This design covers changes in **two repos**: |
| 16 | + |
| 17 | +1. **`serverless-components`** — add `is_oom_message()` utility to `datadog-log-agent` |
| 18 | +2. **`datadog-lambda-extension`** — adopt `datadog-log-agent` in bottlecap |
| 19 | + |
| 20 | +--- |
| 21 | + |
| 22 | +## Changes in `serverless-components` (`datadog-log-agent`) |
| 23 | + |
| 24 | +### New file: `crates/datadog-log-agent/src/lambda.rs` |
| 25 | + |
| 26 | +Expose a pure OOM detection utility, extracted from bottlecap's `LambdaProcessor`: |
| 27 | + |
| 28 | +```rust |
| 29 | +/// Returns true if the message matches a known Lambda OOM error pattern. |
| 30 | +pub fn is_oom_message(message: &str) -> bool { |
| 31 | + // The 7 Lambda OOM patterns (extracted from bottlecap lambda/processor.rs) |
| 32 | +} |
| 33 | +``` |
| 34 | + |
| 35 | +Re-export from `lib.rs`: |
| 36 | +```rust |
| 37 | +pub mod lambda; |
| 38 | +pub use lambda::is_oom_message; |
| 39 | +``` |
| 40 | + |
| 41 | +--- |
| 42 | + |
| 43 | +## Changes in `datadog-lambda-extension` (bottlecap) |
| 44 | + |
| 45 | +### Dependency |
| 46 | + |
| 47 | +```toml |
| 48 | +# bottlecap/Cargo.toml |
| 49 | +datadog-log-agent = { path = "../../serverless-components/crates/datadog-log-agent" } |
| 50 | +# Post-merge: git = "https://github.com/DataDog/serverless-components", rev = "<merged-sha>" |
| 51 | +``` |
| 52 | + |
| 53 | +### Files deleted from `src/logs/` |
| 54 | + |
| 55 | +| File | Reason | |
| 56 | +|---|---| |
| 57 | +| `aggregator.rs` | Replaced by `datadog-log-agent`'s internal `Aggregator` | |
| 58 | +| `aggregator_service.rs` | Replaced by `AggregatorService` / `AggregatorHandle` | |
| 59 | +| `flusher.rs` | Replaced by `LogFlusher` / `LogFlusherConfig` | |
| 60 | +| `constants.rs` | Same constants live in `datadog-log-agent` | |
| 61 | +| `lambda/mod.rs` | `IntakeLog` / `Message` / `Lambda` types replaced by `LogEntry` | |
| 62 | + |
| 63 | +### Files modified in `src/logs/` |
| 64 | + |
| 65 | +#### `lambda/processor.rs` — core change |
| 66 | + |
| 67 | +`LambdaProcessor` produces `Vec<LogEntry>` instead of `Vec<String>`: |
| 68 | + |
| 69 | +- `ready_logs: Vec<String>` → `ready_logs: Vec<LogEntry>` |
| 70 | +- `get_intake_log()` returns `LogEntry` instead of serialized `String` |
| 71 | +- Lambda context maps to `attributes["lambda"] = { "arn": "...", "request_id": "..." }` |
| 72 | +- OOM check: `self.is_oom(&msg)` → `datadog_log_agent::is_oom_message(&msg)` |
| 73 | +- All other Lambda-specific logic unchanged (orphan logs, request_id, processing rules, managed instance mode) |
| 74 | + |
| 75 | +**Data shape mapping:** |
| 76 | + |
| 77 | +``` |
| 78 | +IntakeLog (before) LogEntry (after) |
| 79 | +───────────────────────────────── ──────────────────────────────────── |
| 80 | +message.message ──────────────► message |
| 81 | +message.timestamp ──────────────► timestamp |
| 82 | +message.status ──────────────► status: Some(...) |
| 83 | +hostname ──────────────► hostname: Some(...) |
| 84 | +service ──────────────► service: Some(...) |
| 85 | +source ("ddsource") ──────────────► ddsource: Some(...) |
| 86 | +tags ("ddtags") ──────────────► ddtags: Some(...) |
| 87 | +message.lambda.arn ──────────────► attributes["lambda"]["arn"] |
| 88 | +message.lambda.req_id ──────────────► attributes["lambda"]["request_id"] |
| 89 | +``` |
| 90 | + |
| 91 | +The JSON serialized form is wire-compatible: `LogEntry` flattens `attributes` at the top level via `#[serde(flatten)]`. |
| 92 | + |
| 93 | +#### `agent.rs` |
| 94 | + |
| 95 | +Update type imports: |
| 96 | +- `logs::aggregator_service::AggregatorHandle` → `datadog_log_agent::AggregatorHandle` |
| 97 | +- `logs::aggregator_service::AggregatorService` → `datadog_log_agent::AggregatorService` |
| 98 | + |
| 99 | +#### `processor.rs` |
| 100 | + |
| 101 | +Update type imports only (no logic changes). |
| 102 | + |
| 103 | +#### `mod.rs` |
| 104 | + |
| 105 | +Remove module declarations for deleted files. |
| 106 | + |
| 107 | +### Wiring in `main.rs` / `start_logs_agent()` |
| 108 | + |
| 109 | +Replace `LogsFlusher` with `LogFlusher`: |
| 110 | + |
| 111 | +```rust |
| 112 | +use datadog_log_agent::{AggregatorHandle, AggregatorService, LogFlusher, LogFlusherConfig}; |
| 113 | + |
| 114 | +fn start_logs_agent(...) -> (Sender<TelemetryEvent>, LogFlusher, CancellationToken, AggregatorHandle) { |
| 115 | + let (service, handle) = AggregatorService::new(); |
| 116 | + tokio::spawn(service.run()); |
| 117 | + |
| 118 | + let config = LogFlusherConfig { api_key, ..LogFlusherConfig::from_env() }; |
| 119 | + let flusher = LogFlusher::new(config, http_client, handle.clone()); |
| 120 | + |
| 121 | + // LogsAgent and LambdaProcessor wiring unchanged |
| 122 | + (tx, flusher, cancel_token, handle) |
| 123 | +} |
| 124 | +``` |
| 125 | + |
| 126 | +`FlushingService` calls `flusher.flush().await` — method name is identical, no change needed there. |
| 127 | + |
| 128 | +--- |
| 129 | + |
| 130 | +## Error Handling |
| 131 | + |
| 132 | +- If `DD_API_KEY` is absent, `start_logs_agent` returns early (same behavior as `serverless-compat`) |
| 133 | +- Retry logic (3 attempts, 403 = stop) is now owned by `datadog-log-agent` |
| 134 | +- OPW mode: if `DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED=true`, `LogFlusherConfig::from_env()` picks it up automatically |
| 135 | + |
| 136 | +--- |
| 137 | + |
| 138 | +## Testing |
| 139 | + |
| 140 | +- `datadog-log-agent`'s existing integration tests cover: batch limits, compression, retry, OPW mode, additional endpoints |
| 141 | +- In bottlecap: update existing `src/logs/` unit tests to use `LogEntry` instead of `IntakeLog`/serialized strings |
| 142 | +- Add a smoke test in bottlecap verifying `LambdaProcessor` produces correctly shaped `LogEntry` (lambda attrs present) |
| 143 | +- OOM utility: unit tests in `datadog-log-agent/src/lambda.rs` (moved from bottlecap) |
| 144 | + |
| 145 | +--- |
| 146 | + |
| 147 | +## What Does NOT Change |
| 148 | + |
| 149 | +- `LambdaProcessor` core logic: orphan logs, request_id tracking, managed instance mode, processing rules |
| 150 | +- Event bus wiring (OOM, platform events) |
| 151 | +- `LogsAgent` receive loop |
| 152 | +- `FlushingService` flush orchestration |
| 153 | +- All environment variable names (`DD_API_KEY`, `DD_SITE`, `DD_LOGS_CONFIG_USE_COMPRESSION`, etc.) |
| 154 | +- Wire format to Datadog (same JSON shape) |
0 commit comments