Skip to content

Commit 6b6efcf

Browse files
authored
Add debug info to connection manager queues (TraceMachina#2188)
* Add debug info to connection manager queues * Don't need all the logging for update_action_result * Add CAS speed check
1 parent 36a8238 commit 6b6efcf

File tree

8 files changed

+183
-48
lines changed

8 files changed

+183
-48
lines changed

BUILD.bazel

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,25 @@ rust_binary(
5959
],
6060
)
6161

62+
rust_binary(
63+
name = "cas_speed_check",
64+
srcs = [
65+
"src/bin/cas_speed_check.rs",
66+
],
67+
deps = [
68+
"//nativelink-error",
69+
"//nativelink-proto",
70+
"//nativelink-util",
71+
"@crates//:clap",
72+
"@crates//:hex",
73+
"@crates//:rand",
74+
"@crates//:sha2",
75+
"@crates//:tokio",
76+
"@crates//:tonic",
77+
"@crates//:tracing",
78+
],
79+
)
80+
6281
filegroup(
6382
name = "docs",
6483
srcs = [

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ nix = ["nativelink-worker/nix"]
3232
[dependencies]
3333
nativelink-config = { path = "nativelink-config" }
3434
nativelink-error = { path = "nativelink-error" }
35+
nativelink-proto = { path = "nativelink-proto" }
3536
nativelink-scheduler = { path = "nativelink-scheduler" }
3637
nativelink-service = { path = "nativelink-service" }
3738
nativelink-store = { path = "nativelink-store" }
@@ -51,6 +52,7 @@ clap = { version = "4.5.35", features = [
5152
"usage",
5253
], default-features = false }
5354
futures = { version = "0.3.31", default-features = false }
55+
hex = { version = "0.4.3", default-features = false }
5456
hyper = { version = "1.6.0", default-features = false }
5557
hyper-util = { version = "0.1.11", default-features = false, features = [
5658
"tracing",
@@ -62,6 +64,7 @@ rand = { version = "0.9.0", default-features = false, features = [
6264
rustls-pki-types = { version = "1.13.1", features = [
6365
"std",
6466
], default-features = false }
67+
sha2 = { version = "0.10.8", default-features = false }
6568
tokio = { version = "1.44.1", features = [
6669
"fs",
6770
"io-util",

nativelink-scheduler/src/grpc_scheduler.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ impl GrpcScheduler {
216216
// Not in the cache, lookup the capabilities with the upstream.
217217
let channel = self
218218
.connection_manager
219-
.connection()
219+
.connection("get_known_properties".into())
220220
.await
221221
.err_tip(|| "in get_platform_property_manager()")?;
222222
let capabilities_result = CapabilitiesClient::new(channel)
@@ -274,7 +274,7 @@ impl GrpcScheduler {
274274
.perform_request(request, |request| async move {
275275
let channel = self
276276
.connection_manager
277-
.connection()
277+
.connection(format!("add_action: {:?}", request.action_digest))
278278
.await
279279
.err_tip(|| "in add_action()")?;
280280
ExecutionClient::new(channel)
@@ -309,7 +309,7 @@ impl GrpcScheduler {
309309
.perform_request(request, |request| async move {
310310
let channel = self
311311
.connection_manager
312-
.connection()
312+
.connection(format!("filter_operations: {}", request.name))
313313
.await
314314
.err_tip(|| "in find_by_client_operation_id()")?;
315315
ExecutionClient::new(channel)

nativelink-service/src/ac_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ impl ActionCache for AcServer {
201201

202202
#[instrument(
203203
err,
204-
ret(level = Level::INFO),
204+
ret(level = Level::TRACE),
205205
level = Level::ERROR,
206206
skip_all,
207207
fields(request = ?grpc_request.get_ref())

nativelink-store/src/grpc_store.rs

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ impl GrpcStore {
145145
self.perform_request(request, |request| async move {
146146
let channel = self
147147
.connection_manager
148-
.connection()
148+
.connection(format!("find_missing_blobs: {:?}", request.blob_digests))
149149
.await
150150
.err_tip(|| "in find_missing_blobs")?;
151151
ContentAddressableStorageClient::new(channel)
@@ -170,7 +170,7 @@ impl GrpcStore {
170170
self.perform_request(request, |request| async move {
171171
let channel = self
172172
.connection_manager
173-
.connection()
173+
.connection("batch_update_blobs".into())
174174
.await
175175
.err_tip(|| "in batch_update_blobs")?;
176176
ContentAddressableStorageClient::new(channel)
@@ -195,7 +195,7 @@ impl GrpcStore {
195195
self.perform_request(request, |request| async move {
196196
let channel = self
197197
.connection_manager
198-
.connection()
198+
.connection("batch_read_blobs".into())
199199
.await
200200
.err_tip(|| "in batch_read_blobs")?;
201201
ContentAddressableStorageClient::new(channel)
@@ -220,7 +220,7 @@ impl GrpcStore {
220220
self.perform_request(request, |request| async move {
221221
let channel = self
222222
.connection_manager
223-
.connection()
223+
.connection(format!("get_tree: {:?}", request.root_digest))
224224
.await
225225
.err_tip(|| "in get_tree")?;
226226
ContentAddressableStorageClient::new(channel)
@@ -247,7 +247,7 @@ impl GrpcStore {
247247
) -> Result<impl Stream<Item = Result<ReadResponse, Status>> + use<>, Error> {
248248
let channel = self
249249
.connection_manager
250-
.connection()
250+
.connection(format!("read_internal: {}", request.resource_name))
251251
.await
252252
.err_tip(|| "in read_internal")?;
253253
let mut response = ByteStreamClient::new(channel)
@@ -325,34 +325,36 @@ impl GrpcStore {
325325
"GrpcStore::write: requesting connection from pool",
326326
);
327327
let conn_start = std::time::Instant::now();
328-
let rpc_fut = self.connection_manager.connection().and_then(|channel| {
329-
let conn_elapsed = conn_start.elapsed();
330-
let instance_for_rpc = instance_name.clone();
331-
let conn_elapsed_ms =
332-
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
333-
trace!(
334-
instance_name = %instance_for_rpc,
335-
conn_elapsed_ms,
336-
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
337-
);
338-
let rpc_start = std::time::Instant::now();
339-
let local_state_for_rpc = local_state.clone();
340-
async move {
341-
let res = ByteStreamClient::new(channel)
342-
.write(WriteStateWrapper::new(local_state_for_rpc))
343-
.await
344-
.err_tip(|| "in GrpcStore::write");
345-
let rpc_elapsed_ms =
346-
u64::try_from(rpc_start.elapsed().as_millis()).unwrap_or(u64::MAX);
328+
let rpc_fut = self.connection_manager.connection("write".into()).and_then(
329+
|channel| {
330+
let conn_elapsed = conn_start.elapsed();
331+
let instance_for_rpc = instance_name.clone();
332+
let conn_elapsed_ms =
333+
u64::try_from(conn_elapsed.as_millis()).unwrap_or(u64::MAX);
347334
trace!(
348335
instance_name = %instance_for_rpc,
349-
rpc_elapsed_ms,
350-
success = res.is_ok(),
351-
"GrpcStore::write: ByteStream.Write RPC returned",
336+
conn_elapsed_ms,
337+
"GrpcStore::write: got connection, starting ByteStream.Write RPC",
352338
);
353-
res
354-
}
355-
});
339+
let rpc_start = std::time::Instant::now();
340+
let local_state_for_rpc = local_state.clone();
341+
async move {
342+
let res = ByteStreamClient::new(channel)
343+
.write(WriteStateWrapper::new(local_state_for_rpc))
344+
.await
345+
.err_tip(|| "in GrpcStore::write");
346+
let rpc_elapsed_ms = u64::try_from(rpc_start.elapsed().as_millis())
347+
.unwrap_or(u64::MAX);
348+
trace!(
349+
instance_name = %instance_for_rpc,
350+
rpc_elapsed_ms,
351+
success = res.is_ok(),
352+
"GrpcStore::write: ByteStream.Write RPC returned",
353+
);
354+
res
355+
}
356+
},
357+
);
356358

357359
let result = if rpc_timeout > Duration::ZERO {
358360
match tokio::time::timeout(rpc_timeout, rpc_fut).await {
@@ -444,7 +446,7 @@ impl GrpcStore {
444446
self.perform_request(request, |request| async move {
445447
let channel = self
446448
.connection_manager
447-
.connection()
449+
.connection(format!("query_write_status: {}", request.resource_name))
448450
.await
449451
.err_tip(|| "in query_write_status")?;
450452
ByteStreamClient::new(channel)
@@ -464,7 +466,7 @@ impl GrpcStore {
464466
self.perform_request(request, |request| async move {
465467
let channel = self
466468
.connection_manager
467-
.connection()
469+
.connection(format!("get_action_result: {:?}", request.action_digest))
468470
.await
469471
.err_tip(|| "in get_action_result")?;
470472
ActionCacheClient::new(channel)
@@ -484,7 +486,7 @@ impl GrpcStore {
484486
self.perform_request(request, |request| async move {
485487
let channel = self
486488
.connection_manager
487-
.connection()
489+
.connection(format!("update_action_result: {:?}", request.action_digest))
488490
.await
489491
.err_tip(|| "in update_action_result")?;
490492
ActionCacheClient::new(channel)

nativelink-util/src/connection_manager.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::retry::{self, Retrier, RetryResult};
3434
#[derive(Debug)]
3535
pub struct ConnectionManager {
3636
// The channel to request connections from the worker.
37-
worker_tx: mpsc::Sender<oneshot::Sender<Connection>>,
37+
worker_tx: mpsc::Sender<(String, oneshot::Sender<Connection>)>,
3838
}
3939

4040
/// The index into `ConnectionManagerWorker::endpoints`.
@@ -101,8 +101,8 @@ struct ConnectionManagerWorker {
101101
connecting_channels: FuturesUnordered<Pin<Box<dyn Future<Output = IndexedChannel> + Send>>>,
102102
/// Connected channels that are available for use.
103103
available_channels: VecDeque<EstablishedChannel>,
104-
/// Requests for a Channel when available.
105-
waiting_connections: VecDeque<oneshot::Sender<Connection>>,
104+
/// Requests for a Channel when available - (reason, request)
105+
waiting_connections: VecDeque<(String, oneshot::Sender<Connection>)>,
106106
/// The retry configuration for connecting to an Endpoint, on failure will
107107
/// restart the retrier after a 1 second delay.
108108
retrier: Retrier,
@@ -165,10 +165,10 @@ impl ConnectionManager {
165165
/// Get a Connection that can be used as a `tonic::Channel`, except it
166166
/// performs some additional counting to reconnect on error and restrict
167167
/// the number of concurrent connections.
168-
pub async fn connection(&self) -> Result<Connection, Error> {
168+
pub async fn connection(&self, reason: String) -> Result<Connection, Error> {
169169
let (tx, rx) = oneshot::channel();
170170
self.worker_tx
171-
.send(tx)
171+
.send((reason, tx))
172172
.await
173173
.map_err(|err| make_err!(Code::Unavailable, "Requesting a new connection: {err:?}"))?;
174174
rx.await
@@ -180,7 +180,7 @@ impl ConnectionManagerWorker {
180180
async fn service_requests(
181181
mut self,
182182
connections_per_endpoint: usize,
183-
mut worker_rx: mpsc::Receiver<oneshot::Sender<Connection>>,
183+
mut worker_rx: mpsc::Receiver<(String, oneshot::Sender<Connection>)>,
184184
mut connection_rx: mpsc::UnboundedReceiver<ConnectionRequest>,
185185
) {
186186
// Make the initial set of connections, connection failures will be
@@ -199,12 +199,12 @@ impl ConnectionManagerWorker {
199199
loop {
200200
tokio::select! {
201201
request = worker_rx.recv() => {
202-
let Some(request) = request else {
202+
let Some((reason, request)) = request else {
203203
// The ConnectionManager was dropped, shut down the
204204
// worker.
205205
break;
206206
};
207-
self.handle_worker(request);
207+
self.handle_worker(reason, request);
208208
}
209209
maybe_request = connection_rx.recv() => {
210210
if let Some(request) = maybe_request {
@@ -308,20 +308,22 @@ impl ConnectionManagerWorker {
308308
}
309309

310310
// This must never be made async otherwise the select may cancel it.
311-
fn handle_worker(&mut self, tx: oneshot::Sender<Connection>) {
311+
fn handle_worker(&mut self, reason: String, tx: oneshot::Sender<Connection>) {
312312
if let Some(channel) = (self.available_connections > 0)
313313
.then_some(())
314314
.and_then(|()| self.available_channels.pop_front())
315315
{
316+
debug!(reason, "ConnectionManager: request running");
316317
self.provide_channel(channel, tx);
317318
} else {
318319
debug!(
319320
available_connections = self.available_connections,
320321
available_channels = self.available_channels.len(),
321322
waiting_connections = self.waiting_connections.len(),
323+
reason,
322324
"ConnectionManager: no connection available, request queued",
323325
);
324-
self.waiting_connections.push_back(tx);
326+
self.waiting_connections.push_back((reason, tx));
325327
}
326328
}
327329

@@ -342,7 +344,8 @@ impl ConnectionManagerWorker {
342344
&& !self.available_channels.is_empty()
343345
{
344346
if let Some(channel) = self.available_channels.pop_front() {
345-
if let Some(tx) = self.waiting_connections.pop_front() {
347+
if let Some((reason, tx)) = self.waiting_connections.pop_front() {
348+
debug!(reason, "ConnectionManager: channel available, running");
346349
self.provide_channel(channel, tx);
347350
} else {
348351
// This should never happen, but better than an unwrap.

0 commit comments

Comments
 (0)