|
| 1 | +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +//! Local test helper: inserts sample log entries and flushes them via the log agent pipeline. |
| 5 | +//! |
| 6 | +//! # Usage |
| 7 | +//! |
| 8 | +//! ## Flush to a local capture server (recommended for local dev) |
| 9 | +//! |
| 10 | +//! The easiest way — runs capture server and example together: |
| 11 | +//! ./scripts/test-log-intake.sh |
| 12 | +//! |
| 13 | +//! Or manually in two terminals: |
| 14 | +//! |
| 15 | +//! In terminal 1 — start the capture server (handles POST, prints JSON): |
| 16 | +//! python3 scripts/test-log-intake.sh # not available standalone |
| 17 | +//! # Use the script above, or run: python3 -c "$(sed -n '/PYEOF/,/PYEOF/p' scripts/test-log-intake.sh)" |
| 18 | +//! |
| 19 | +//! In terminal 2 — run this example: |
| 20 | +//! DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED=true \ |
| 21 | +//! DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL=http://localhost:9999/logs \ |
| 22 | +//! DD_API_KEY=local-test-key \ |
| 23 | +//! cargo run -p datadog-logs-agent --example send_logs |
| 24 | +//! |
| 25 | +//! NOTE: `python3 -m http.server` does NOT work — it rejects POST requests. |
| 26 | +//! |
| 27 | +//! ## Flush to a real Datadog endpoint |
| 28 | +//! |
| 29 | +//! DD_API_KEY=<your-key> \ |
| 30 | +//! DD_SITE=datadoghq.com \ |
| 31 | +//! cargo run -p datadog-logs-agent --example send_logs |
| 32 | +//! |
| 33 | +//! ## Configuration via env vars |
| 34 | +//! |
| 35 | +//! | Variable | Default | |
| 36 | +//! |--------------------------------------------------|--------------------| |
| 37 | +//! | DD_API_KEY | (empty) | |
| 38 | +//! | DD_SITE | datadoghq.com | |
| 39 | +//! | DD_LOGS_CONFIG_USE_COMPRESSION | true | |
| 40 | +//! | DD_LOGS_CONFIG_COMPRESSION_LEVEL | 3 | |
| 41 | +//! | DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_ENABLED | false | |
| 42 | +//! | DD_OBSERVABILITY_PIPELINES_WORKER_LOGS_URL | (empty) | |
| 43 | +//! | LOG_ENTRY_COUNT | 5 | |
| 44 | +
|
| 45 | +use datadog_logs_agent::{ |
| 46 | + AggregatorService, Destination, IntakeEntry, LogFlusher, LogFlusherConfig, |
| 47 | +}; |
| 48 | + |
| 49 | +#[allow(clippy::disallowed_methods)] // plain reqwest::Client for local testing |
| 50 | +#[tokio::main] |
| 51 | +async fn main() { |
| 52 | + let entry_count: usize = std::env::var("LOG_ENTRY_COUNT") |
| 53 | + .ok() |
| 54 | + .and_then(|v| v.parse().ok()) |
| 55 | + .unwrap_or(5); |
| 56 | + |
| 57 | + let config = LogFlusherConfig::from_env(); |
| 58 | + |
| 59 | + // Print effective configuration |
| 60 | + let (endpoint, compressed) = describe_config(&config); |
| 61 | + println!("──────────────────────────────────────────"); |
| 62 | + println!(" datadog-logs-agent local test"); |
| 63 | + println!("──────────────────────────────────────────"); |
| 64 | + println!(" endpoint : {endpoint}"); |
| 65 | + println!(" api_key : {}", mask(&config.api_key)); |
| 66 | + println!(" compressed : {compressed}"); |
| 67 | + println!(" entries : {entry_count}"); |
| 68 | + println!("──────────────────────────────────────────"); |
| 69 | + |
| 70 | + // Start aggregator service |
| 71 | + let (service, handle) = AggregatorService::new(); |
| 72 | + tokio::spawn(service.run()); |
| 73 | + |
| 74 | + // Insert sample entries representing different runtimes |
| 75 | + let mut entries = Vec::with_capacity(entry_count); |
| 76 | + |
| 77 | + for i in 0..entry_count { |
| 78 | + let entry = match i % 3 { |
| 79 | + 0 => lambda_entry(i), |
| 80 | + 1 => azure_entry(i), |
| 81 | + _ => plain_entry(i), |
| 82 | + }; |
| 83 | + entries.push(entry); |
| 84 | + } |
| 85 | + |
| 86 | + println!("\nInserting {entry_count} log entries..."); |
| 87 | + handle.insert_batch(entries).expect("insert_batch failed"); |
| 88 | + |
| 89 | + // Build HTTP client |
| 90 | + let client = reqwest::Client::builder() |
| 91 | + .timeout(config.flush_timeout) |
| 92 | + .build() |
| 93 | + .expect("failed to build HTTP client"); |
| 94 | + |
| 95 | + // Flush |
| 96 | + println!("Flushing to {endpoint}..."); |
| 97 | + let flusher = LogFlusher::new(config, client, handle); |
| 98 | + let failed = flusher.flush(vec![]).await; |
| 99 | + |
| 100 | + if failed.is_empty() { |
| 101 | + println!("\n✓ Flush succeeded"); |
| 102 | + } else { |
| 103 | + eprintln!("\n✗ Flush failed — check endpoint and API key"); |
| 104 | + std::process::exit(1); |
| 105 | + } |
| 106 | +} |
| 107 | + |
| 108 | +// ── Sample log entry builders ───────────────────────────────────────────────── |
| 109 | + |
| 110 | +fn lambda_entry(i: usize) -> IntakeEntry { |
| 111 | + let mut attrs = serde_json::Map::new(); |
| 112 | + attrs.insert( |
| 113 | + "lambda".to_string(), |
| 114 | + serde_json::json!({ |
| 115 | + "arn": "arn:aws:lambda:us-east-1:123456789012:function:my-fn", |
| 116 | + "request_id": format!("req-{i:04}") |
| 117 | + }), |
| 118 | + ); |
| 119 | + IntakeEntry { |
| 120 | + message: format!("[lambda] invocation #{i} completed"), |
| 121 | + timestamp: now_ms(), |
| 122 | + hostname: Some("arn:aws:lambda:us-east-1:123456789012:function:my-fn".to_string()), |
| 123 | + service: Some("my-fn".to_string()), |
| 124 | + ddsource: Some("lambda".to_string()), |
| 125 | + ddtags: Some("env:local,runtime:lambda".to_string()), |
| 126 | + status: Some("info".to_string()), |
| 127 | + attributes: attrs, |
| 128 | + } |
| 129 | +} |
| 130 | + |
| 131 | +fn azure_entry(i: usize) -> IntakeEntry { |
| 132 | + let mut attrs = serde_json::Map::new(); |
| 133 | + attrs.insert( |
| 134 | + "azure".to_string(), |
| 135 | + serde_json::json!({ |
| 136 | + "resource_id": "/subscriptions/sub-123/resourceGroups/rg/providers/Microsoft.Web/sites/my-fn", |
| 137 | + "operation_name": "Microsoft.Web/sites/functions/run/action" |
| 138 | + }), |
| 139 | + ); |
| 140 | + IntakeEntry { |
| 141 | + message: format!("[azure] function triggered #{i}"), |
| 142 | + timestamp: now_ms(), |
| 143 | + hostname: Some("my-azure-fn".to_string()), |
| 144 | + service: Some("payments".to_string()), |
| 145 | + ddsource: Some("azure-functions".to_string()), |
| 146 | + ddtags: Some("env:local,runtime:azure".to_string()), |
| 147 | + status: Some("info".to_string()), |
| 148 | + attributes: attrs, |
| 149 | + } |
| 150 | +} |
| 151 | + |
| 152 | +fn plain_entry(i: usize) -> IntakeEntry { |
| 153 | + IntakeEntry { |
| 154 | + message: format!("[generic] log message #{i}"), |
| 155 | + timestamp: now_ms(), |
| 156 | + hostname: Some("localhost".to_string()), |
| 157 | + service: Some("test-service".to_string()), |
| 158 | + ddsource: Some("rust".to_string()), |
| 159 | + ddtags: Some("env:local".to_string()), |
| 160 | + status: if i.is_multiple_of(5) { |
| 161 | + Some("error".to_string()) |
| 162 | + } else { |
| 163 | + Some("info".to_string()) |
| 164 | + }, |
| 165 | + attributes: serde_json::Map::new(), |
| 166 | + } |
| 167 | +} |
| 168 | + |
| 169 | +// ── Helpers ─────────────────────────────────────────────────────────────────── |
| 170 | + |
| 171 | +fn now_ms() -> i64 { |
| 172 | + std::time::SystemTime::now() |
| 173 | + .duration_since(std::time::UNIX_EPOCH) |
| 174 | + .map(|d| d.as_millis() as i64) |
| 175 | + .unwrap_or(0) |
| 176 | +} |
| 177 | + |
| 178 | +fn describe_config(config: &LogFlusherConfig) -> (String, bool) { |
| 179 | + match &config.mode { |
| 180 | + Destination::Datadog => ( |
| 181 | + format!("https://http-intake.logs.{}/api/v2/logs", config.site), |
| 182 | + config.use_compression, |
| 183 | + ), |
| 184 | + Destination::ObservabilityPipelinesWorker { url } => (url.clone(), false), |
| 185 | + } |
| 186 | +} |
| 187 | + |
| 188 | +fn mask(s: &str) -> String { |
| 189 | + if s.is_empty() { |
| 190 | + return "(not set)".to_string(); |
| 191 | + } |
| 192 | + if s.len() <= 8 { |
| 193 | + return "*".repeat(s.len()); |
| 194 | + } |
| 195 | + format!("{}…{}", &s[..4], &s[s.len() - 4..]) |
| 196 | +} |
0 commit comments