Skip to content

Commit e27ca2c

Browse files
Frandoclaude
andcommitted
feat(logs): opt-in log collection over iroh-services
Lets a node forward its tracing events to a connected service and have that service drive the runtime level remotely. Cloud is the single source of truth for the active directive — there is no install-time directive on the node side. - LogsCap (Send / SetLevel) gates which peers may push logs and which may change the level. - LogCollector wraps a tracing-subscriber reload::Layer that starts off; install() registers it as a layer with no global subscriber side-effects, so callers compose it next to their own fmt layer. - Push path: bounded ring buffer with rate limiting; flush task retries with backoff on transient send errors instead of self- destructing. - Callback path: SetLogLevel { directives, expires_in_secs, revert_to }; revert_to=None means revert to off. - Wire types are postcard-friendly (closed FieldValue enum, Vec<(String, FieldValue)> rather than serde_json::Value or #[serde(flatten)]). - examples/logs.rs runs forever, composes the push layer with a default fmt layer, and emits an INFO event every 2s. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 88b028c commit e27ca2c

8 files changed

Lines changed: 874 additions & 12 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,6 @@ default = []
6060

6161
[[example]]
6262
name = "net_diagnostics"
63+
64+
[[example]]
65+
name = "logs"

examples/logs.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//! Log collection example.
2+
//!
3+
//! Demonstrates how to install the iroh-services log collector alongside the
4+
//! standard `tracing-subscriber` `fmt` layer, ship records to the cloud over
5+
//! the iroh-services RPC channel, and let the cloud override the local
6+
//! filter at runtime via `SetLogLevel`.
7+
//!
8+
//! The level filter starts at `off`. The cloud pushes a level after this
9+
//! endpoint is opted into log collection from the dashboard or REST API; both
10+
//! the buffered cloud-shipping layer and the stderr fmt layer respect that
11+
//! level. Anything emitted before the cloud responds is silently dropped.
12+
//!
13+
//! Run with: cargo run --example logs
14+
15+
use std::time::Duration;
16+
use tracing_subscriber::prelude::*;
17+
18+
use anyhow::Result;
19+
use iroh::{Endpoint, endpoint::presets, protocol::Router};
20+
use iroh_services::{
21+
API_SECRET_ENV_VAR_NAME, ApiSecret, CLIENT_HOST_ALPN, Client, ClientHost, caps::LogsCap, logs,
22+
};
23+
use tracing::info;
24+
25+
#[tokio::main]
26+
async fn main() -> Result<()> {
27+
// 1. Build the buffer layer and compose it with a stderr fmt layer behind
28+
// the same reloadable filter, so local console output mirrors what
29+
// gets shipped. The cloud raises the filter from `off` after this
30+
// endpoint is opted in via the dashboard.
31+
let (collector, log_layer) = logs::layer();
32+
tracing_subscriber::registry()
33+
.with(log_layer)
34+
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
35+
.try_init()
36+
.map_err(|e| anyhow::anyhow!("failed to install tracing subscriber: {e}"))?;
37+
38+
// 2. Create the endpoint and parse the API secret so we know which
39+
// cloud endpoint to grant SetLevel to.
40+
let endpoint = Endpoint::bind(presets::N0).await?;
41+
let secret = ApiSecret::from_env_var(API_SECRET_ENV_VAR_NAME)?;
42+
let cloud_id = secret.addr().id;
43+
44+
let name = format!("logs-example-{}", &endpoint.id().to_string()[..8]);
45+
46+
// 3. Build the client. `with_log_collection(collector.clone())` starts a
47+
// background task that drains the buffer every second and ships the
48+
// batch as a `PutLogs` RPC.
49+
let client = Client::builder(&endpoint)
50+
.api_secret(secret)?
51+
.name(name)?
52+
.with_log_collection(collector.clone())
53+
.build()
54+
.await?;
55+
56+
// 4. Grant `LogsCap::SetLevel` so the cloud can dial us back and apply a
57+
// filter override. Spawned so a momentarily-down cloud does not block
58+
// startup.
59+
let client_for_grant = client.clone();
60+
let grant_task = tokio::spawn(async move {
61+
if let Err(err) = client_for_grant
62+
.grant_capability(cloud_id, [LogsCap::SetLevel])
63+
.await
64+
{
65+
eprintln!("failed to grant LogsCap::SetLevel: {err:?}");
66+
}
67+
});
68+
69+
// 5. Accept the cloud's callback connections on `CLIENT_HOST_ALPN`. The
70+
// `ClientHost` needs the same collector so the `SetLogLevel` request
71+
// can hot-reload the filter.
72+
let host = ClientHost::new(&endpoint).with_log_collector(collector);
73+
let router = Router::builder(endpoint)
74+
.accept(CLIENT_HOST_ALPN, host)
75+
.spawn();
76+
77+
// 6. Emit an info log every other second forever. These will surface on
78+
// the dashboard's Logs page (and on this process's stderr) once the
79+
// endpoint is opted into log collection at `info` level or above.
80+
println!("emitting logs; ctrl+c to exit.");
81+
let mut tick = tokio::time::interval(Duration::from_secs(2));
82+
let mut counter: u64 = 0;
83+
loop {
84+
tokio::select! {
85+
_ = tick.tick() => {
86+
counter += 1;
87+
info!(counter, "logs example heartbeat");
88+
}
89+
_ = tokio::signal::ctrl_c() => break,
90+
}
91+
}
92+
93+
grant_task.abort();
94+
router.endpoint().close().await;
95+
Ok(())
96+
}

src/caps.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ pub enum Cap {
8181
Metrics(MetricsCap),
8282
#[strum(to_string = "net-diagnostics:{0}")]
8383
NetDiagnostics(NetDiagnosticsCap),
84+
#[strum(to_string = "logs:{0}")]
85+
Logs(LogsCap),
8486
}
8587

8688
impl FromStr for Cap {
@@ -94,6 +96,7 @@ impl FromStr for Cap {
9496
"metrics" => Self::Metrics(MetricsCap::from_str(inner)?),
9597
"relay" => Self::Relay(RelayCap::from_str(inner)?),
9698
"net-diagnostics" => Self::NetDiagnostics(NetDiagnosticsCap::from_str(inner)?),
99+
"logs" => Self::Logs(LogsCap::from_str(inner)?),
97100
_ => bail!("invalid cap domain"),
98101
})
99102
} else {
@@ -121,6 +124,16 @@ cap_enum!(
121124
}
122125
);
123126

127+
cap_enum!(
128+
/// Capabilities for the log collection feature.
129+
pub enum LogsCap {
130+
/// Permits the bearer to push log lines to the cloud.
131+
Push,
132+
/// Permits the bearer to set the log level filter on the issuer at runtime.
133+
SetLevel,
134+
}
135+
);
136+
124137
impl Caps {
125138
pub fn new(caps: impl IntoIterator<Item = impl Into<Cap>>) -> Self {
126139
Self::V0(CapSet::new(caps))
@@ -179,6 +192,7 @@ impl Capability for Cap {
179192
(Cap::Relay(slf), Cap::Relay(other)) => slf.permits(other),
180193
(Cap::Metrics(slf), Cap::Metrics(other)) => slf.permits(other),
181194
(Cap::NetDiagnostics(slf), Cap::NetDiagnostics(other)) => slf.permits(other),
195+
(Cap::Logs(slf), Cap::Logs(other)) => slf.permits(other),
182196
(_, _) => false,
183197
}
184198
}
@@ -192,6 +206,8 @@ fn client_capabilities(other: &Cap) -> bool {
192206
Cap::Metrics(MetricsCap::PutAny) => true,
193207
Cap::NetDiagnostics(NetDiagnosticsCap::PutAny) => true,
194208
Cap::NetDiagnostics(NetDiagnosticsCap::GetAny) => true,
209+
Cap::Logs(LogsCap::Push) => true,
210+
Cap::Logs(LogsCap::SetLevel) => true,
195211
}
196212
}
197213

@@ -221,6 +237,16 @@ impl Capability for NetDiagnosticsCap {
221237
}
222238
}
223239

240+
impl Capability for LogsCap {
241+
fn permits(&self, other: &Self) -> bool {
242+
match (self, other) {
243+
(LogsCap::Push, LogsCap::Push) => true,
244+
(LogsCap::SetLevel, LogsCap::SetLevel) => true,
245+
(_, _) => false,
246+
}
247+
}
248+
}
249+
224250
/// A set of capabilities
225251
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
226252
pub struct CapSet<C: Capability + Ord>(BTreeSet<C>);

src/client.rs

Lines changed: 136 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ use uuid::Uuid;
1717
use crate::{
1818
api_secret::ApiSecret,
1919
caps::Caps,
20+
logs::LogCollector,
2021
net_diagnostics::{DiagnosticsReport, checks::run_diagnostics},
2122
protocol::{
22-
ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutMetrics,
23+
ALPN, Auth, IrohServicesClient, NameEndpoint, Ping, Pong, PutLogs, PutMetrics,
2324
PutNetworkDiagnostics, RemoteError,
2425
},
2526
};
@@ -54,6 +55,7 @@ pub struct Client {
5455
endpoint: Endpoint,
5556
message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
5657
_actor_task: Arc<AbortOnDropHandle<()>>,
58+
_log_flush_task: Option<Arc<AbortOnDropHandle<()>>>,
5759
}
5860

5961
/// ClientBuilder provides configures and builds a iroh-services client, typically
@@ -67,11 +69,19 @@ pub struct ClientBuilder {
6769
metrics_interval: Option<Duration>,
6870
remote: Option<EndpointAddr>,
6971
registry: Registry,
72+
log_collector: Option<LogCollector>,
73+
log_flush_interval: Duration,
74+
log_max_batch: usize,
7075
}
7176

7277
const DEFAULT_CAP_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24 * 30); // 1 month
7378
pub const API_SECRET_ENV_VAR_NAME: &str = "IROH_SERVICES_API_SECRET";
7479

80+
/// Default interval between log batch flushes when log collection is enabled.
81+
pub const DEFAULT_LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
82+
/// Default maximum batch size pushed in a single PutLogs request.
83+
pub const DEFAULT_LOG_MAX_BATCH: usize = 200;
84+
7585
impl ClientBuilder {
7686
pub fn new(endpoint: &Endpoint) -> Self {
7787
let mut registry = Registry::default();
@@ -85,9 +95,35 @@ impl ClientBuilder {
8595
metrics_interval: Some(Duration::from_secs(60)),
8696
remote: None,
8797
registry,
98+
log_collector: None,
99+
log_flush_interval: DEFAULT_LOG_FLUSH_INTERVAL,
100+
log_max_batch: DEFAULT_LOG_MAX_BATCH,
88101
}
89102
}
90103

104+
/// Enables periodic shipment of buffered log lines to iroh-services.
105+
///
106+
/// The collector is shared with [`crate::client_host::ClientHost`] when
107+
/// runtime log-level overrides are needed; clone it before passing so both
108+
/// sides hold a handle.
109+
pub fn with_log_collection(mut self, collector: LogCollector) -> Self {
110+
self.log_collector = Some(collector);
111+
self
112+
}
113+
114+
/// Override the log batch flush interval. Defaults to one second.
115+
pub fn log_flush_interval(mut self, interval: Duration) -> Self {
116+
self.log_flush_interval = interval;
117+
self
118+
}
119+
120+
/// Override the maximum number of lines included in a single PutLogs
121+
/// request. Defaults to [`DEFAULT_LOG_MAX_BATCH`].
122+
pub fn log_max_batch(mut self, max: usize) -> Self {
123+
self.log_max_batch = max;
124+
self
125+
}
126+
91127
/// Register a metrics group to forward to iroh-services
92128
///
93129
/// The default registered metrics uses only the endpoint
@@ -213,22 +249,37 @@ impl ClientBuilder {
213249
let conn = IrohLazyRemoteConnection::new(self.endpoint.clone(), remote, ALPN.to_vec());
214250
let irpc_client = IrohServicesClient::boxed(conn);
215251

216-
let (tx, rx) = tokio::sync::mpsc::channel(1);
252+
let session_id = Uuid::new_v4();
253+
// The actor mailbox is only used for control-plane messages (auth,
254+
// ping, name, grant_cap) plus the periodic metrics + log flush. A
255+
// small buffer is enough but `1` head-of-line-blocks log flushes
256+
// behind metrics ticks, so leave a little room.
257+
let (tx, rx) = tokio::sync::mpsc::channel(8);
217258
let actor_task = AbortOnDropHandle::new(n0_future::task::spawn(
218259
ClientActor {
219260
capabilities,
220261
client: irpc_client,
221262
name: self.name.clone(),
222-
session_id: Uuid::new_v4(),
263+
session_id,
223264
authorized: false,
224265
}
225266
.run(self.name, self.registry, self.metrics_interval, rx),
226267
));
227268

269+
let log_flush_task = self.log_collector.map(|collector| {
270+
let message_channel = tx.clone();
271+
let interval = self.log_flush_interval;
272+
let max_batch = self.log_max_batch;
273+
Arc::new(AbortOnDropHandle::new(n0_future::task::spawn(
274+
run_log_flush(message_channel, collector, interval, max_batch, session_id),
275+
)))
276+
});
277+
228278
Ok(Client {
229279
endpoint: self.endpoint,
230280
message_channel: tx,
231281
_actor_task: Arc::new(actor_task),
282+
_log_flush_task: log_flush_task,
232283
})
233284
}
234285
}
@@ -425,6 +476,10 @@ enum ClientActorMessage {
425476
report: Box<DiagnosticsReport>,
426477
done: oneshot::Sender<Result<(), Error>>,
427478
},
479+
PutLogs {
480+
request: PutLogs,
481+
done: oneshot::Sender<Result<(), Error>>,
482+
},
428483
ReadName {
429484
done: oneshot::Sender<Option<String>>,
430485
},
@@ -505,6 +560,13 @@ impl ClientActor {
505560
warn!("failed to publish network diagnostics: {:#?}", err);
506561
}
507562
}
563+
ClientActorMessage::PutLogs{ request, done } => {
564+
let res = self.put_logs(request).await;
565+
if let Err(err) = done.send(res) {
566+
debug!("failed to publish logs: {:#?}", err);
567+
self.authorized = false;
568+
}
569+
}
508570
}
509571
}
510572
_ = async {
@@ -613,6 +675,77 @@ impl ClientActor {
613675

614676
Ok(())
615677
}
678+
679+
async fn put_logs(&mut self, request: PutLogs) -> Result<(), Error> {
680+
trace!(
681+
lines = request.lines.len(),
682+
dropped = request.dropped,
683+
"client actor put logs"
684+
);
685+
self.auth().await?;
686+
687+
self.client
688+
.rpc(request)
689+
.await
690+
.map_err(|_| RemoteError::InternalServerError)??;
691+
692+
Ok(())
693+
}
694+
}
695+
696+
async fn run_log_flush(
697+
message_channel: tokio::sync::mpsc::Sender<ClientActorMessage>,
698+
collector: LogCollector,
699+
interval: Duration,
700+
max_batch: usize,
701+
session_id: Uuid,
702+
) {
703+
const INITIAL_BACKOFF: Duration = Duration::from_millis(500);
704+
const MAX_BACKOFF: Duration = Duration::from_secs(30);
705+
706+
let mut ticker = n0_future::time::interval(interval);
707+
// After a slow RPC the default `Burst` behavior would fire several
708+
// ticks back-to-back; `Delay` waits a full interval from the previous
709+
// completed tick.
710+
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
711+
let mut backoff = INITIAL_BACKOFF;
712+
loop {
713+
ticker.tick().await;
714+
let (lines, dropped) = collector.drain(max_batch);
715+
if lines.is_empty() && dropped == 0 {
716+
backoff = INITIAL_BACKOFF;
717+
continue;
718+
}
719+
let request = PutLogs {
720+
session_id,
721+
lines,
722+
dropped,
723+
};
724+
let (tx, rx) = oneshot::channel();
725+
if message_channel
726+
.send(ClientActorMessage::PutLogs { request, done: tx })
727+
.await
728+
.is_err()
729+
{
730+
// Mailbox closed only when the actor task has terminated; that
731+
// means the entire client is gone and there is nothing to do.
732+
debug!("log flush stopped: client actor channel closed");
733+
return;
734+
}
735+
match rx.await {
736+
Ok(Ok(())) => {
737+
backoff = INITIAL_BACKOFF;
738+
}
739+
// Either the RPC failed (Ok(Err)) or the actor dropped the
740+
// response sender mid-handoff (Err(_)). Both are transient: keep
741+
// ticking and back off so the next attempt happens later.
742+
other => {
743+
debug!(?other, ?backoff, "log flush attempt failed; backing off");
744+
n0_future::time::sleep(backoff).await;
745+
backoff = (backoff * 2).min(MAX_BACKOFF);
746+
}
747+
}
748+
}
616749
}
617750

618751
async fn set_name_inner(

0 commit comments

Comments
 (0)