Skip to content

Commit 2b0b6d6

Browse files
committed
feat(worker): support shared rolling log
1 parent 8e021f8 commit 2b0b6d6

2 files changed

Lines changed: 221 additions & 42 deletions

File tree

config.example.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ lifetime = "7d"
3333
# groups are not set, default to the user's group
3434
# tags are not set
3535
file_log = false
36+
# shared_log is not set, default to false. When enabled, all workers share a centralized log file with daily rotation (max 3 files)
3637
# log_path is not set. It will use the default rolling log file path if file_log is set to true
38+
# - If shared_log is enabled and log_path is not set, it will use workers.log in cache directory
39+
# - If shared_log is disabled and log_path is not set, it will use {worker_uuid}.log in cache directory
3740
# retain is not set, default to false
3841

3942
[client]

netmito/src/config/worker.rs

Lines changed: 218 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,113 @@ use figment::{
55
Figment,
66
};
77
use serde::{Deserialize, Serialize};
8+
use std::io::Write;
89
use std::ops::Not;
910
use std::{collections::HashSet, time::Duration};
10-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
11+
use tracing_appender::rolling::{RollingFileAppender, Rotation};
12+
use tracing_subscriber::{fmt::MakeWriter, layer::SubscriberExt, util::SubscriberInitExt, Layer};
1113
use url::Url;
1214

1315
use crate::error::Error;
1416

1517
use super::{coordinator::DEFAULT_COORDINATOR_ADDR, TracingGuard};
1618

19+
/// A writer wrapper that adds worker UUID prefix to each log line
20+
struct WorkerIdWriter<W: Write> {
21+
inner: W,
22+
prefix: Vec<u8>,
23+
at_line_start: bool,
24+
buffer: Vec<u8>,
25+
}
26+
27+
impl<W: Write> Drop for WorkerIdWriter<W> {
28+
fn drop(&mut self) {
29+
let _ = self.flush_buffer();
30+
}
31+
}
32+
33+
impl<W: Write> WorkerIdWriter<W> {
34+
fn new(inner: W, worker_id: String) -> Self {
35+
// Pre-format the prefix once to avoid repeated formatting
36+
let prefix = format!("[worker:{}] ", worker_id).into_bytes();
37+
Self {
38+
inner,
39+
prefix,
40+
at_line_start: true,
41+
buffer: Vec::with_capacity(8192), // 8KB buffer
42+
}
43+
}
44+
45+
fn flush_buffer(&mut self) -> std::io::Result<()> {
46+
if !self.buffer.is_empty() {
47+
self.inner.write_all(&self.buffer)?;
48+
self.buffer.clear();
49+
}
50+
Ok(())
51+
}
52+
}
53+
54+
impl<W: Write> Write for WorkerIdWriter<W> {
55+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
56+
let mut remaining = buf;
57+
let total_len = buf.len();
58+
59+
while !remaining.is_empty() {
60+
// Find the next newline using memchr (fast SIMD search)
61+
if let Some(newline_pos) = remaining.iter().position(|&b| b == b'\n') {
62+
// Write prefix if at line start
63+
if self.at_line_start && newline_pos > 0 {
64+
self.buffer.extend_from_slice(&self.prefix);
65+
self.at_line_start = false;
66+
}
67+
68+
// Write the line including newline
69+
self.buffer.extend_from_slice(&remaining[..=newline_pos]);
70+
self.at_line_start = true;
71+
72+
// Flush buffer if it's getting large (> 4KB)
73+
if self.buffer.len() > 4096 {
74+
self.flush_buffer()?;
75+
}
76+
77+
remaining = &remaining[newline_pos + 1..];
78+
} else {
79+
// No newline in remaining data
80+
if self.at_line_start && !remaining.is_empty() {
81+
self.buffer.extend_from_slice(&self.prefix);
82+
self.at_line_start = false;
83+
}
84+
self.buffer.extend_from_slice(remaining);
85+
break;
86+
}
87+
}
88+
89+
Ok(total_len)
90+
}
91+
92+
fn flush(&mut self) -> std::io::Result<()> {
93+
self.flush_buffer()?;
94+
self.inner.flush()
95+
}
96+
}
97+
98+
/// A MakeWriter wrapper that creates WorkerIdWriter instances
99+
struct WorkerIdMakeWriter<M> {
100+
inner: M,
101+
worker_id: String,
102+
}
103+
104+
impl<'a, M> MakeWriter<'a> for WorkerIdMakeWriter<M>
105+
where
106+
M: MakeWriter<'a>,
107+
{
108+
type Writer = WorkerIdWriter<M::Writer>;
109+
110+
fn make_writer(&'a self) -> Self::Writer {
111+
WorkerIdWriter::new(self.inner.make_writer(), self.worker_id.clone())
112+
}
113+
}
114+
17115
#[derive(Deserialize, Serialize, Debug)]
18116
pub struct WorkerConfig {
19117
pub(crate) coordinator_addr: Url,
@@ -29,6 +127,8 @@ pub struct WorkerConfig {
29127
pub(crate) labels: HashSet<String>,
30128
pub(crate) log_path: Option<RelativePathBuf>,
31129
pub(crate) file_log: bool,
130+
#[serde(default)]
131+
pub(crate) shared_log: bool,
32132
#[serde(with = "humantime_serde")]
33133
pub(crate) lifetime: Option<Duration>,
34134
#[serde(default)]
@@ -89,6 +189,10 @@ pub struct WorkerConfigCli {
89189
#[arg(long)]
90190
#[serde(skip_serializing_if = "<&bool>::not")]
91191
pub file_log: bool,
192+
/// Enable shared logging across multiple workers with daily rotation (max 3 files)
193+
#[arg(long)]
194+
#[serde(skip_serializing_if = "<&bool>::not")]
195+
pub shared_log: bool,
92196
/// The lifetime of the worker to alive (e.g., 7d, 1year)
93197
#[arg(long)]
94198
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
@@ -117,6 +221,7 @@ impl Default for WorkerConfig {
117221
labels: HashSet::new(),
118222
log_path: None,
119223
file_log: false,
224+
shared_log: false,
120225
lifetime: None,
121226
retain: false,
122227
skip_redis: false,
@@ -151,51 +256,122 @@ impl WorkerConfig {
151256
U: Into<T>,
152257
{
153258
if self.file_log {
154-
let file_logger = self
155-
.log_path
156-
.as_ref()
157-
.and_then(|p| {
158-
let path = p.relative();
159-
let dir = path.parent();
160-
let file_name = path.file_name();
161-
match (dir, file_name) {
162-
(Some(dir), Some(file_name)) => {
163-
Some(tracing_appender::rolling::never(dir, file_name))
259+
let id = worker_id.into();
260+
let id_str = id.to_string();
261+
262+
// Determine file logger based on shared_log setting
263+
let file_logger = if self.shared_log {
264+
// Shared logging: use log_path if provided, otherwise use fixed "workers.log"
265+
self.log_path
266+
.as_ref()
267+
.and_then(|p| {
268+
let path = p.relative();
269+
let dir = path.parent();
270+
let file_name = path.file_name();
271+
match (dir, file_name) {
272+
(Some(dir), Some(file_name)) => {
273+
// Use daily rotation with max 3 log files for shared log
274+
RollingFileAppender::builder()
275+
.rotation(Rotation::DAILY)
276+
.filename_prefix(file_name.to_string_lossy().to_string())
277+
.max_log_files(3)
278+
.build(dir)
279+
.ok()
280+
}
281+
_ => None,
164282
}
165-
_ => None,
166-
}
167-
})
168-
.or_else(|| {
169-
dirs::cache_dir()
170-
.map(|mut p| {
171-
p.push("mitosis");
172-
p.push("worker");
173-
p
174-
})
175-
.map(|dir| {
176-
let id = worker_id.into();
177-
tracing_appender::rolling::never(dir, format!("{id}.log"))
178-
})
179-
})
180-
.ok_or(Error::ConfigError(Box::new(figment::Error::from(
181-
"log path not valid and cache directory not found",
182-
))))?;
283+
})
284+
.or_else(|| {
285+
// Use fixed "workers.log" with daily rotation in cache directory
286+
dirs::cache_dir()
287+
.map(|mut p| {
288+
p.push("mitosis");
289+
p.push("worker");
290+
p
291+
})
292+
.and_then(|dir| {
293+
RollingFileAppender::builder()
294+
.rotation(Rotation::DAILY)
295+
.filename_prefix("workers.log")
296+
.max_log_files(3)
297+
.build(dir)
298+
.ok()
299+
})
300+
})
301+
.ok_or(Error::ConfigError(Box::new(figment::Error::from(
302+
"log path not valid and cache directory not found",
303+
))))?
304+
} else {
305+
// Non-shared logging: use per-worker log file with no rotation
306+
self.log_path
307+
.as_ref()
308+
.and_then(|p| {
309+
let path = p.relative();
310+
let dir = path.parent();
311+
let file_name = path.file_name();
312+
match (dir, file_name) {
313+
(Some(dir), Some(file_name)) => {
314+
Some(tracing_appender::rolling::never(dir, file_name))
315+
}
316+
_ => None,
317+
}
318+
})
319+
.or_else(|| {
320+
dirs::cache_dir()
321+
.map(|mut p| {
322+
p.push("mitosis");
323+
p.push("worker");
324+
p
325+
})
326+
.map(|dir| {
327+
tracing_appender::rolling::never(dir, format!("{id_str}.log"))
328+
})
329+
})
330+
.ok_or(Error::ConfigError(Box::new(figment::Error::from(
331+
"log path not valid and cache directory not found",
332+
))))?
333+
};
334+
183335
let (non_blocking, guard) = tracing_appender::non_blocking(file_logger);
184336
let env_filter = tracing_subscriber::EnvFilter::try_from_env("MITO_FILE_LOG_LEVEL")
185337
.unwrap_or_else(|_| "netmito=info".into());
186-
let coordinator_guard = tracing_subscriber::registry()
187-
.with(
188-
tracing_subscriber::fmt::layer().with_filter(
189-
tracing_subscriber::EnvFilter::try_from_default_env()
190-
.unwrap_or_else(|_| "netmito=info".into()),
191-
),
192-
)
193-
.with(
194-
tracing_subscriber::fmt::layer()
195-
.with_writer(non_blocking)
196-
.with_filter(env_filter),
197-
)
198-
.set_default();
338+
339+
// If shared_log is enabled, wrap the writer to add worker UUID prefix
340+
let coordinator_guard = if self.shared_log {
341+
let worker_writer = WorkerIdMakeWriter {
342+
inner: non_blocking,
343+
worker_id: id_str,
344+
};
345+
346+
tracing_subscriber::registry()
347+
.with(
348+
tracing_subscriber::fmt::layer().with_filter(
349+
tracing_subscriber::EnvFilter::try_from_default_env()
350+
.unwrap_or_else(|_| "netmito=info".into()),
351+
),
352+
)
353+
.with(
354+
tracing_subscriber::fmt::layer()
355+
.with_writer(worker_writer)
356+
.with_filter(env_filter),
357+
)
358+
.set_default()
359+
} else {
360+
tracing_subscriber::registry()
361+
.with(
362+
tracing_subscriber::fmt::layer().with_filter(
363+
tracing_subscriber::EnvFilter::try_from_default_env()
364+
.unwrap_or_else(|_| "netmito=info".into()),
365+
),
366+
)
367+
.with(
368+
tracing_subscriber::fmt::layer()
369+
.with_writer(non_blocking)
370+
.with_filter(env_filter),
371+
)
372+
.set_default()
373+
};
374+
199375
Ok(TracingGuard {
200376
subscriber_guard: Some(coordinator_guard),
201377
file_guard: Some(guard),

0 commit comments

Comments
 (0)