Skip to content

Commit d9696e6

Browse files
committed
feat: instrumenting vss store with tracing
1 parent 5d0fb14 commit d9696e6

6 files changed

Lines changed: 167 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
7575
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
7676
log = { version = "0.4.22", default-features = false, features = ["std"]}
7777

78+
tracing ={ version = "0.1.43" , default-features = false, features = ["log", "attributes"]}
79+
tracing-subscriber = {version = "0.3.22", default-features = false, features = ["std", "fmt", "json"]}
80+
tracing-opentelemetry ={ version = "0.32.0", default-features = false}
81+
opentelemetry ={ version = "0.31.0", default-features = false}
82+
opentelemetry_sdk ={ version = "0.31.0", default-features = false}
83+
opentelemetry-otlp ={ version = "0.31.0", default-features = false, features = ["trace", "grpc-tonic"]}
84+
opentelemetry-stdout = {version = "0.31.0"}
85+
7886
vss-client = { package = "vss-client-ng", version = "0.4" }
7987
prost = { version = "0.11.6", default-features = false}
8088

docker-compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ services:
4848
networks:
4949
- bitcoin-electrs
5050

51+
jaeger:
52+
image: jaegertracing/all-in-one:1.54
53+
container_name: jaeger
54+
ports:
55+
- "6831:6831/udp"
56+
- "16686:16686"
57+
- "14268:14268"
58+
- "4317:4317"
59+
- "4318:4318"
60+
5161
networks:
5262
bitcoin-electrs:
5363
driver: bridge

src/io/vss_store.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use lightning::util::persist::{KVStore, KVStoreSync};
2727
use lightning::util::ser::{Readable, Writeable};
2828
use prost::Message;
2929
use rand::RngCore;
30+
use tracing::instrument;
3031
use vss_client::client::VssClient;
3132
use vss_client::error::VssError;
3233
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
@@ -43,6 +44,7 @@ use vss_client::util::storable_builder::{EntropySource, StorableBuilder};
4344

4445
use crate::entropy::NodeEntropy;
4546
use crate::io::utils::check_namespace_key_validity;
47+
use crate::tracing::TracingHeaderProvider;
4648

4749
type CustomRetryPolicy = FilteredRetryPolicy<
4850
JitteredRetryPolicy<
@@ -135,6 +137,7 @@ impl VssStore {
135137
})?;
136138

137139
let async_retry_policy = retry_policy();
140+
let header_provider = Arc::new(TracingHeaderProvider::new(header_provider));
138141
let async_client =
139142
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
140143

@@ -495,6 +498,7 @@ impl VssStoreInner {
495498
Ok(keys)
496499
}
497500

501+
#[instrument(name = "vss.read", skip(self, client,), err(level = "error"))]
498502
async fn read_internal(
499503
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
500504
secondary_namespace: String, key: String,
@@ -531,6 +535,14 @@ impl VssStoreInner {
531535
Ok(decrypted)
532536
}
533537

538+
#[instrument(
539+
name = "vss.write",
540+
skip(self, client, buf, inner_lock_ref),
541+
fields(
542+
vss.payload_size_bytes = buf.len(),
543+
),
544+
err(level = "error")
545+
)]
534546
async fn write_internal(
535547
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
536548
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
@@ -575,6 +587,7 @@ impl VssStoreInner {
575587
.await
576588
}
577589

590+
#[instrument(name = "vss.remove", skip(self, client, inner_lock_ref), err(level = "error"))]
578591
async fn remove_internal(
579592
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
580593
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
@@ -608,6 +621,7 @@ impl VssStoreInner {
608621
.await
609622
}
610623

624+
#[instrument(name = "vss.list", skip(self, client), err(level = "error"))]
611625
async fn list_internal(
612626
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
613627
secondary_namespace: String,

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub mod payment;
102102
mod peer_store;
103103
mod runtime;
104104
mod scoring;
105+
mod tracing;
105106
mod tx_broadcaster;
106107
mod types;
107108
mod wallet;
@@ -111,6 +112,7 @@ use std::net::ToSocketAddrs;
111112
use std::sync::{Arc, Mutex, RwLock};
112113
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
113114

115+
pub use crate::tracing::{configure_tracer, TracingLogWriter};
114116
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
115117
use bitcoin::secp256k1::PublicKey;
116118
use bitcoin::{Address, Amount};

src/tracing.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
use std::{collections::HashMap, future::Future};
4+
5+
use opentelemetry::propagation::Injector;
6+
use opentelemetry::propagation::TextMapPropagator;
7+
use opentelemetry::trace::TracerProvider;
8+
use opentelemetry_otlp::{SpanExporter as OtlpExporter, WithExportConfig};
9+
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider, Resource};
10+
use opentelemetry_stdout::SpanExporter as StdoutExporter;
11+
12+
use tracing::{debug, error, info, level_filters::LevelFilter, trace, warn};
13+
use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt};
14+
use tracing_subscriber::{filter::Targets, fmt, layer::SubscriberExt, util::SubscriberInitExt};
15+
use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError};
16+
17+
use crate::logger::{LogRecord, LogWriter};
18+
19+
/// Adapter that bridges our `Logger` to the `tracing` ecosystem.
20+
///
21+
/// This allows existing `log_*!` macros to emit events as tracing spans/events
22+
/// without requiring a migration to tracing-specific macros.
23+
pub struct TracingLogWriter {}
24+
25+
impl LogWriter for TracingLogWriter {
26+
fn log(&self, record: LogRecord) {
27+
match record.level {
28+
lightning::util::logger::Level::Gossip => {
29+
trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
30+
},
31+
lightning::util::logger::Level::Trace => {
32+
trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
33+
},
34+
lightning::util::logger::Level::Debug => {
35+
debug!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
36+
},
37+
lightning::util::logger::Level::Info => {
38+
info!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
39+
},
40+
lightning::util::logger::Level::Warn => {
41+
warn!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
42+
},
43+
lightning::util::logger::Level::Error => {
44+
error!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
45+
},
46+
}
47+
}
48+
}
49+
50+
pub(crate) struct TracingHeaderProvider {
51+
inner: Arc<dyn VssHeaderProvider>,
52+
propagator: TraceContextPropagator,
53+
}
54+
55+
impl TracingHeaderProvider {
56+
pub fn new(inner: Arc<dyn VssHeaderProvider>) -> Self {
57+
Self { inner, propagator: TraceContextPropagator::new() }
58+
}
59+
}
60+
61+
impl VssHeaderProvider for TracingHeaderProvider {
62+
fn get_headers<'life0, 'life1, 'async_trait>(
63+
&'life0 self, request: &'life1 [u8],
64+
) -> Pin<
65+
Box<
66+
dyn Future<Output = Result<HashMap<String, String>, VssHeaderProviderError>>
67+
+ Send
68+
+ 'async_trait,
69+
>,
70+
>
71+
where
72+
'life0: 'async_trait,
73+
'life1: 'async_trait,
74+
Self: 'async_trait,
75+
{
76+
let inner = Arc::clone(&self.inner);
77+
let request = request.to_vec();
78+
let propagator = self.propagator.clone();
79+
80+
Box::pin(async move {
81+
let mut headers = inner.get_headers(&request).await?;
82+
83+
let cx = tracing::Span::current().context();
84+
propagator.inject_context(&cx, &mut HeaderInjector(&mut headers));
85+
86+
Ok(headers)
87+
})
88+
}
89+
}
90+
91+
struct HeaderInjector<'a>(&'a mut HashMap<String, String>);
92+
93+
impl Injector for HeaderInjector<'_> {
94+
fn set(&mut self, key: &str, value: String) {
95+
self.0.insert(key.to_string(), value);
96+
}
97+
}
98+
99+
/// Initialize tracing subscriber for Jaeger and `stdout` backends.
100+
pub fn configure_tracer() {
101+
let otlp_jaeger_exporter = OtlpExporter::builder()
102+
.with_tonic()
103+
.with_endpoint("http://localhost:4317")
104+
.build()
105+
.expect("Failed to create OTLP exporter");
106+
let stdout_exporter = StdoutExporter::default();
107+
108+
let tracer_provider = SdkTracerProvider::builder()
109+
.with_batch_exporter(otlp_jaeger_exporter)
110+
.with_batch_exporter(stdout_exporter)
111+
.with_resource(Resource::builder().with_service_name("ldk-node").build())
112+
.build();
113+
114+
let tracer = tracer_provider.tracer("ldk_node");
115+
116+
tracing_subscriber::registry()
117+
.with(
118+
Targets::new()
119+
.with_default(LevelFilter::WARN)
120+
.with_target("ldk_node", LevelFilter::INFO),
121+
)
122+
.with(fmt::layer().json())
123+
.with(OpenTelemetryLayer::new(tracer))
124+
.init();
125+
}

tests/integration_tests_vss.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,27 @@
1010
mod common;
1111

1212
use std::collections::HashMap;
13+
use std::sync::Arc;
1314

1415
use ldk_node::entropy::NodeEntropy;
1516
use ldk_node::Builder;
17+
use ldk_node::{configure_tracer, TracingLogWriter};
1618
use rand::{rng, Rng};
1719

1820
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1921
async fn channel_full_cycle_with_vss_store() {
2022
let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd();
2123
println!("== Node A ==");
2224
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
25+
26+
configure_tracer();
27+
tracing::info!("tracing initialized");
2328
let config_a = common::random_config(true);
29+
let tracing_writer = Arc::new(TracingLogWriter {});
30+
2431
let mut builder_a = Builder::from_config(config_a.node_config);
2532
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
33+
builder_a.set_custom_logger(tracing_writer);
2634
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
2735
let node_a = builder_a
2836
.build_with_vss_store_and_fixed_headers(

0 commit comments

Comments
 (0)