Skip to content

Commit 8046900

Browse files
committed
graph: Add log drains for writing to backends
Implements slog drains for capturing and writing logs: - FileDrain: Writes logs to JSON Lines files (one file per subgraph) - LokiDrain: Writes logs to Grafana Loki via HTTP push API Both drains: - Capture structured log entries with metadata (module, line, column) - Format logs with timestamp, level, text, and arguments - Use efficient serialization with custom KVSerializers
1 parent 6c56156 commit 8046900

3 files changed

Lines changed: 773 additions & 0 deletions

File tree

graph/src/log/file.rs

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
use std::fmt;
2+
use std::fmt::Write as FmtWrite;
3+
use std::fs::{File, OpenOptions};
4+
use std::io::{BufWriter, Write};
5+
use std::path::PathBuf;
6+
use std::sync::{Arc, Mutex};
7+
8+
use chrono::prelude::{SecondsFormat, Utc};
9+
use serde::Serialize;
10+
use slog::*;
11+
12+
/// Configuration for `FileDrain`.
13+
#[derive(Clone, Debug)]
14+
pub struct FileDrainConfig {
15+
/// Directory where log files will be stored
16+
pub directory: PathBuf,
17+
/// The subgraph ID used for the log filename
18+
pub subgraph_id: String,
19+
/// Maximum file size in bytes
20+
pub max_file_size: u64,
21+
/// Retention period in days
22+
pub retention_days: u32,
23+
}
24+
25+
/// Log document structure for JSON Lines format
26+
#[derive(Clone, Debug, Serialize)]
27+
#[serde(rename_all = "camelCase")]
28+
struct FileLogDocument {
29+
id: String,
30+
subgraph_id: String,
31+
timestamp: String,
32+
level: String,
33+
text: String,
34+
arguments: Vec<(String, String)>,
35+
meta: FileLogMeta,
36+
}
37+
38+
#[derive(Clone, Debug, Serialize)]
39+
#[serde(rename_all = "camelCase")]
40+
struct FileLogMeta {
41+
module: String,
42+
line: i64,
43+
column: i64,
44+
}
45+
46+
/// Serializer for extracting key-value pairs into a Vec
47+
struct VecKVSerializer {
48+
kvs: Vec<(String, String)>,
49+
}
50+
51+
impl VecKVSerializer {
52+
fn new() -> Self {
53+
Self { kvs: Vec::new() }
54+
}
55+
56+
fn finish(self) -> Vec<(String, String)> {
57+
self.kvs
58+
}
59+
}
60+
61+
impl Serializer for VecKVSerializer {
62+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
63+
self.kvs.push((key.into(), val.to_string()));
64+
Ok(())
65+
}
66+
}
67+
68+
/// Serializer for concatenating key-value arguments into a string
69+
struct SimpleKVSerializer {
70+
kvs: Vec<(String, String)>,
71+
}
72+
73+
impl SimpleKVSerializer {
74+
fn new() -> Self {
75+
Self { kvs: Vec::new() }
76+
}
77+
78+
fn finish(self) -> (usize, String) {
79+
(
80+
self.kvs.len(),
81+
self.kvs
82+
.iter()
83+
.map(|(k, v)| format!("{}: {}", k, v))
84+
.collect::<Vec<_>>()
85+
.join(", "),
86+
)
87+
}
88+
}
89+
90+
impl Serializer for SimpleKVSerializer {
91+
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
92+
self.kvs.push((key.into(), val.to_string()));
93+
Ok(())
94+
}
95+
}
96+
97+
/// An slog `Drain` for logging to local files in JSON Lines format.
98+
///
99+
/// Each subgraph gets its own .jsonl file with log entries.
100+
/// Format: One JSON object per line
101+
/// ```jsonl
102+
/// {"id":"QmXxx-2024-01-15T10:30:00Z","subgraphId":"QmXxx","timestamp":"2024-01-15T10:30:00Z","level":"error","text":"Error message","arguments":[],"meta":{"module":"test.rs","line":42,"column":10}}
103+
/// ```
104+
pub struct FileDrain {
105+
config: FileDrainConfig,
106+
error_logger: Logger,
107+
writer: Arc<Mutex<BufWriter<File>>>,
108+
}
109+
110+
impl FileDrain {
111+
/// Creates a new `FileDrain`.
112+
pub fn new(config: FileDrainConfig, error_logger: Logger) -> std::io::Result<Self> {
113+
std::fs::create_dir_all(&config.directory)?;
114+
115+
let path = config
116+
.directory
117+
.join(format!("{}.jsonl", config.subgraph_id));
118+
let file = OpenOptions::new().create(true).append(true).open(path)?;
119+
120+
Ok(FileDrain {
121+
config,
122+
error_logger,
123+
writer: Arc::new(Mutex::new(BufWriter::new(file))),
124+
})
125+
}
126+
}
127+
128+
impl Drain for FileDrain {
129+
type Ok = ();
130+
type Err = Never;
131+
132+
fn log(&self, record: &Record, values: &OwnedKVList) -> std::result::Result<(), Never> {
133+
// Don't write `trace` logs to file
134+
if record.level() == Level::Trace {
135+
return Ok(());
136+
}
137+
138+
let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Nanos, true);
139+
let id = format!("{}-{}", self.config.subgraph_id, timestamp);
140+
141+
let level = match record.level() {
142+
Level::Critical => "critical",
143+
Level::Error => "error",
144+
Level::Warning => "warning",
145+
Level::Info => "info",
146+
Level::Debug => "debug",
147+
Level::Trace => "trace",
148+
};
149+
150+
// Serialize logger arguments
151+
let mut serializer = SimpleKVSerializer::new();
152+
record
153+
.kv()
154+
.serialize(record, &mut serializer)
155+
.expect("failed to serialize logger arguments");
156+
let (n_logger_kvs, logger_kvs) = serializer.finish();
157+
158+
// Serialize log message arguments
159+
let mut serializer = SimpleKVSerializer::new();
160+
values
161+
.serialize(record, &mut serializer)
162+
.expect("failed to serialize log message arguments");
163+
let (n_value_kvs, value_kvs) = serializer.finish();
164+
165+
// Serialize arguments into vec for storage
166+
let mut serializer = VecKVSerializer::new();
167+
record
168+
.kv()
169+
.serialize(record, &mut serializer)
170+
.expect("failed to serialize log message arguments into vec");
171+
let arguments = serializer.finish();
172+
173+
// Build text with all key-value pairs
174+
let mut text = format!("{}", record.msg());
175+
if n_logger_kvs > 0 {
176+
write!(text, ", {}", logger_kvs).unwrap();
177+
}
178+
if n_value_kvs > 0 {
179+
write!(text, ", {}", value_kvs).unwrap();
180+
}
181+
182+
// Build log document
183+
let log_doc = FileLogDocument {
184+
id,
185+
subgraph_id: self.config.subgraph_id.clone(),
186+
timestamp,
187+
level: level.to_string(),
188+
text,
189+
arguments,
190+
meta: FileLogMeta {
191+
module: record.module().into(),
192+
line: record.line() as i64,
193+
column: record.column() as i64,
194+
},
195+
};
196+
197+
// Write JSON line (synchronous, buffered)
198+
let mut writer = self.writer.lock().unwrap();
199+
if let Err(e) = serde_json::to_writer(&mut *writer, &log_doc) {
200+
error!(self.error_logger, "Failed to serialize log to JSON: {}", e);
201+
return Ok(());
202+
}
203+
204+
if let Err(e) = writeln!(&mut *writer) {
205+
error!(self.error_logger, "Failed to write newline: {}", e);
206+
return Ok(());
207+
}
208+
209+
// Flush to ensure durability
210+
if let Err(e) = writer.flush() {
211+
error!(self.error_logger, "Failed to flush log file: {}", e);
212+
}
213+
214+
Ok(())
215+
}
216+
}
217+
218+
/// Creates a new asynchronous file logger.
219+
///
220+
/// Uses `error_logger` to print any file logging errors,
221+
/// so they don't go unnoticed.
222+
pub fn file_logger(config: FileDrainConfig, error_logger: Logger) -> Logger {
223+
let file_drain = match FileDrain::new(config, error_logger.clone()) {
224+
Ok(drain) => drain,
225+
Err(e) => {
226+
error!(error_logger, "Failed to create FileDrain: {}", e);
227+
// Return a logger that discards all logs
228+
return Logger::root(slog::Discard, o!());
229+
}
230+
};
231+
232+
let async_drain = slog_async::Async::new(file_drain.fuse())
233+
.chan_size(20000)
234+
.overflow_strategy(slog_async::OverflowStrategy::Block)
235+
.build()
236+
.fuse();
237+
Logger::root(async_drain, o!())
238+
}
239+
240+
#[cfg(test)]
241+
mod tests {
242+
use super::*;
243+
use tempfile::TempDir;
244+
245+
#[test]
246+
fn test_file_drain_creation() {
247+
let temp_dir = TempDir::new().unwrap();
248+
let error_logger = Logger::root(slog::Discard, o!());
249+
250+
let config = FileDrainConfig {
251+
directory: temp_dir.path().to_path_buf(),
252+
subgraph_id: "QmTest".to_string(),
253+
max_file_size: 1024 * 1024,
254+
retention_days: 30,
255+
};
256+
257+
let drain = FileDrain::new(config, error_logger);
258+
assert!(drain.is_ok());
259+
260+
// Verify file was created
261+
let file_path = temp_dir.path().join("QmTest.jsonl");
262+
assert!(file_path.exists());
263+
}
264+
265+
#[test]
266+
fn test_log_entry_format() {
267+
let arguments = vec![
268+
("key1".to_string(), "value1".to_string()),
269+
("key2".to_string(), "value2".to_string()),
270+
];
271+
272+
let doc = FileLogDocument {
273+
id: "test-id".to_string(),
274+
subgraph_id: "QmTest".to_string(),
275+
timestamp: "2024-01-15T10:30:00Z".to_string(),
276+
level: "error".to_string(),
277+
text: "Test error message".to_string(),
278+
arguments,
279+
meta: FileLogMeta {
280+
module: "test.rs".to_string(),
281+
line: 42,
282+
column: 10,
283+
},
284+
};
285+
286+
let json = serde_json::to_string(&doc).unwrap();
287+
assert!(json.contains("\"id\":\"test-id\""));
288+
assert!(json.contains("\"subgraphId\":\"QmTest\""));
289+
assert!(json.contains("\"level\":\"error\""));
290+
assert!(json.contains("\"text\":\"Test error message\""));
291+
assert!(json.contains("\"arguments\""));
292+
}
293+
294+
#[test]
295+
fn test_file_drain_writes_jsonl() {
296+
use std::io::{BufRead, BufReader};
297+
298+
let temp_dir = TempDir::new().unwrap();
299+
let error_logger = Logger::root(slog::Discard, o!());
300+
301+
let config = FileDrainConfig {
302+
directory: temp_dir.path().to_path_buf(),
303+
subgraph_id: "QmTest".to_string(),
304+
max_file_size: 1024 * 1024,
305+
retention_days: 30,
306+
};
307+
308+
let drain = FileDrain::new(config.clone(), error_logger).unwrap();
309+
310+
// Create a test record
311+
let logger = Logger::root(drain, o!());
312+
info!(logger, "Test message"; "key" => "value");
313+
314+
// Give async drain time to write (in real test we'd use proper sync)
315+
std::thread::sleep(std::time::Duration::from_millis(100));
316+
317+
// Read the file
318+
let file_path = temp_dir.path().join("QmTest.jsonl");
319+
let file = File::open(file_path).unwrap();
320+
let reader = BufReader::new(file);
321+
322+
let lines: Vec<String> = reader.lines().map_while(|r| r.ok()).collect();
323+
324+
// Should have written at least one line
325+
assert!(!lines.is_empty());
326+
327+
// Each line should be valid JSON
328+
for line in lines {
329+
let parsed: serde_json::Value = serde_json::from_str(&line).unwrap();
330+
assert!(parsed.get("id").is_some());
331+
assert!(parsed.get("subgraphId").is_some());
332+
assert!(parsed.get("timestamp").is_some());
333+
assert!(parsed.get("level").is_some());
334+
assert!(parsed.get("text").is_some());
335+
}
336+
}
337+
}

0 commit comments

Comments
 (0)