Skip to content

Commit 629f825

Browse files
committed
feat: instrumenting vss store
1 parent 5d0fb14 commit 629f825

6 files changed

Lines changed: 164 additions & 0 deletions

File tree

Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
7575
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
7676
log = { version = "0.4.22", default-features = false, features = ["std"]}
7777

78+
tracing ={ version = "0.1.43" , default-features = false, features = ["log", "attributes"]}
79+
tracing-subscriber = {version = "0.3.22", default-features = false, features = ["std", "fmt", "json"]}
80+
tracing-opentelemetry ={ version = "0.32.0", default-features = false}
81+
opentelemetry ={ version = "0.31.0", default-features = false}
82+
opentelemetry_sdk ={ version = "0.31.0", default-features = false}
83+
opentelemetry-otlp ={ version = "0.31.0", default-features = false, features = ["trace", "grpc-tonic"]}
84+
opentelemetry-stdout = {version = "0.31.0"}
85+
7886
vss-client = { package = "vss-client-ng", version = "0.4" }
7987
prost = { version = "0.11.6", default-features = false}
8088

docker-compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ services:
4848
networks:
4949
- bitcoin-electrs
5050

51+
jaeger:
52+
image: jaegertracing/all-in-one:1.54
53+
container_name: jaeger
54+
ports:
55+
- "6831:6831/udp"
56+
- "16686:16686"
57+
- "14268:14268"
58+
- "4317:4317"
59+
- "4318:4318"
60+
5161
networks:
5262
bitcoin-electrs:
5363
driver: bridge

src/io/vss_store.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use lightning::util::persist::{KVStore, KVStoreSync};
2727
use lightning::util::ser::{Readable, Writeable};
2828
use prost::Message;
2929
use rand::RngCore;
30+
use tracing::instrument;
3031
use vss_client::client::VssClient;
3132
use vss_client::error::VssError;
3233
use vss_client::headers::{FixedHeaders, LnurlAuthToJwtProvider, VssHeaderProvider};
@@ -43,6 +44,7 @@ use vss_client::util::storable_builder::{EntropySource, StorableBuilder};
4344

4445
use crate::entropy::NodeEntropy;
4546
use crate::io::utils::check_namespace_key_validity;
47+
use crate::tracing::TracingHeaderProvider;
4648

4749
type CustomRetryPolicy = FilteredRetryPolicy<
4850
JitteredRetryPolicy<
@@ -135,6 +137,7 @@ impl VssStore {
135137
})?;
136138

137139
let async_retry_policy = retry_policy();
140+
let header_provider = Arc::new(TracingHeaderProvider::new(header_provider));
138141
let async_client =
139142
VssClient::new_with_headers(base_url, async_retry_policy, header_provider);
140143

@@ -495,6 +498,7 @@ impl VssStoreInner {
495498
Ok(keys)
496499
}
497500

501+
#[instrument(name = "vss.read", skip(self, client,), err(level = "error"))]
498502
async fn read_internal(
499503
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
500504
secondary_namespace: String, key: String,
@@ -531,6 +535,14 @@ impl VssStoreInner {
531535
Ok(decrypted)
532536
}
533537

538+
#[instrument(
539+
name = "vss.write",
540+
skip(self, client, buf, inner_lock_ref),
541+
fields(
542+
vss.payload_size_bytes = buf.len(),
543+
),
544+
err(level = "error")
545+
)]
534546
async fn write_internal(
535547
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
536548
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
@@ -575,6 +587,7 @@ impl VssStoreInner {
575587
.await
576588
}
577589

590+
#[instrument(name = "vss.remove", skip(self, client, inner_lock_ref), err(level = "error"))]
578591
async fn remove_internal(
579592
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
580593
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
@@ -608,6 +621,7 @@ impl VssStoreInner {
608621
.await
609622
}
610623

624+
#[instrument(name = "vss.list", skip(self, client), err(level = "error"))]
611625
async fn list_internal(
612626
&self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String,
613627
secondary_namespace: String,

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub mod payment;
102102
mod peer_store;
103103
mod runtime;
104104
mod scoring;
105+
mod tracing;
105106
mod tx_broadcaster;
106107
mod types;
107108
mod wallet;
@@ -111,6 +112,7 @@ use std::net::ToSocketAddrs;
111112
use std::sync::{Arc, Mutex, RwLock};
112113
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
113114

115+
pub use crate::tracing::{configure_tracer, TracingLogWriter};
114116
pub use balance::{BalanceDetails, LightningBalance, PendingSweepBalance};
115117
use bitcoin::secp256k1::PublicKey;
116118
use bitcoin::{Address, Amount};

src/tracing.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
use std::{collections::HashMap, future::Future};
4+
5+
use opentelemetry::propagation::Injector;
6+
use opentelemetry::propagation::TextMapPropagator;
7+
use opentelemetry::trace::TracerProvider;
8+
use opentelemetry_otlp::{SpanExporter as OtlpExporter, WithExportConfig};
9+
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider, Resource};
10+
use opentelemetry_stdout::SpanExporter as StdoutExporter;
11+
12+
use tracing::{debug, error, info, level_filters::LevelFilter, trace, warn};
13+
use tracing_opentelemetry::{OpenTelemetryLayer, OpenTelemetrySpanExt};
14+
use tracing_subscriber::{filter::Targets, fmt, layer::SubscriberExt, util::SubscriberInitExt};
15+
use vss_client::headers::{VssHeaderProvider, VssHeaderProviderError};
16+
17+
use crate::logger::{LogRecord, LogWriter};
18+
19+
/// Tracing adapter
20+
pub struct TracingLogWriter {}
21+
22+
impl LogWriter for TracingLogWriter {
23+
fn log(&self, record: LogRecord) {
24+
match record.level {
25+
lightning::util::logger::Level::Gossip => {
26+
trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
27+
},
28+
lightning::util::logger::Level::Trace => {
29+
trace!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
30+
},
31+
lightning::util::logger::Level::Debug => {
32+
debug!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
33+
},
34+
lightning::util::logger::Level::Info => {
35+
info!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
36+
},
37+
lightning::util::logger::Level::Warn => {
38+
warn!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
39+
},
40+
lightning::util::logger::Level::Error => {
41+
error!(target: "ldk_node", module = record.module_path, line = record.line, "{}", record.args)
42+
},
43+
}
44+
}
45+
}
46+
47+
pub(crate) struct TracingHeaderProvider {
48+
inner: Arc<dyn VssHeaderProvider>,
49+
propagator: TraceContextPropagator,
50+
}
51+
52+
impl TracingHeaderProvider {
53+
pub fn new(inner: Arc<dyn VssHeaderProvider>) -> Self {
54+
Self { inner, propagator: TraceContextPropagator::new() }
55+
}
56+
}
57+
58+
impl VssHeaderProvider for TracingHeaderProvider {
59+
fn get_headers<'life0, 'life1, 'async_trait>(
60+
&'life0 self, request: &'life1 [u8],
61+
) -> Pin<
62+
Box<
63+
dyn Future<Output = Result<HashMap<String, String>, VssHeaderProviderError>>
64+
+ Send
65+
+ 'async_trait,
66+
>,
67+
>
68+
where
69+
'life0: 'async_trait,
70+
'life1: 'async_trait,
71+
Self: 'async_trait,
72+
{
73+
let inner = Arc::clone(&self.inner);
74+
let request = request.to_vec();
75+
let propagator = self.propagator.clone();
76+
77+
Box::pin(async move {
78+
let mut headers = inner.get_headers(&request).await?;
79+
80+
let cx = tracing::Span::current().context();
81+
propagator.inject_context(&cx, &mut HeaderInjector(&mut headers));
82+
83+
Ok(headers)
84+
})
85+
}
86+
}
87+
88+
struct HeaderInjector<'a>(&'a mut HashMap<String, String>);
89+
90+
impl Injector for HeaderInjector<'_> {
91+
fn set(&mut self, key: &str, value: String) {
92+
self.0.insert(key.to_string(), value);
93+
}
94+
}
95+
96+
/// Initialize tracing subscriber with Jaeger backend
97+
pub fn configure_tracer() {
98+
let otlp_jaeger_exporter = OtlpExporter::builder()
99+
.with_tonic()
100+
.with_endpoint("http://localhost:4317")
101+
.build()
102+
.expect("Failed to create OTLP exporter");
103+
let stdout_exporter = StdoutExporter::default();
104+
105+
let tracer_provider = SdkTracerProvider::builder()
106+
.with_batch_exporter(otlp_jaeger_exporter)
107+
.with_batch_exporter(stdout_exporter)
108+
.with_resource(Resource::builder().with_service_name("ldk-node").build())
109+
.build();
110+
111+
let tracer = tracer_provider.tracer("ldk_node");
112+
113+
tracing_subscriber::registry()
114+
.with(
115+
Targets::new()
116+
.with_default(LevelFilter::WARN)
117+
.with_target("ldk_node", LevelFilter::INFO),
118+
)
119+
.with(fmt::layer().json())
120+
.with(OpenTelemetryLayer::new(tracer))
121+
.init();
122+
}

tests/integration_tests_vss.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,27 @@
1010
mod common;
1111

1212
use std::collections::HashMap;
13+
use std::sync::Arc;
1314

1415
use ldk_node::entropy::NodeEntropy;
1516
use ldk_node::Builder;
17+
use ldk_node::{configure_tracer, TracingLogWriter};
1618
use rand::{rng, Rng};
1719

1820
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1921
async fn channel_full_cycle_with_vss_store() {
2022
let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd();
2123
println!("== Node A ==");
2224
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
25+
26+
configure_tracer();
27+
tracing::info!("tracing initialized");
2328
let config_a = common::random_config(true);
29+
let tracing_writer = Arc::new(TracingLogWriter {});
30+
2431
let mut builder_a = Builder::from_config(config_a.node_config);
2532
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
33+
builder_a.set_custom_logger(tracing_writer);
2634
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
2735
let node_a = builder_a
2836
.build_with_vss_store_and_fixed_headers(

0 commit comments

Comments
 (0)