Skip to content

Commit dad870a

Browse files
authored
empty find_missing_blobs can return immediately (TraceMachina#2217)
1 parent 6b6efcf commit dad870a

File tree

6 files changed

+155
-1
lines changed

6 files changed

+155
-1
lines changed

nativelink-store/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ rust_test_suite(
118118
"tests/filesystem_store_test.rs",
119119
"tests/gcs_client_test.rs",
120120
"tests/gcs_store_test.rs",
121+
"tests/grpc_store_test.rs",
121122
"tests/memory_store_test.rs",
122123
"tests/mongo_store_test.rs",
123124
"tests/ontap_s3_existence_cache_store_test.rs",
@@ -167,6 +168,7 @@ rust_test_suite(
167168
"@crates//:tempfile",
168169
"@crates//:tokio",
169170
"@crates//:tokio-stream",
171+
"@crates//:tonic",
170172
"@crates//:tracing",
171173
"@crates//:tracing-test",
172174
"@crates//:uuid",

nativelink-store/src/grpc_store.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,23 @@ impl GrpcStore {
141141
);
142142

143143
let mut request = grpc_request.into_inner();
144+
145+
// Some builds (Chromium for example) do lots of empty requests for some reason, so shortcut them
146+
if request.blob_digests.is_empty() {
147+
return Ok(Response::new(FindMissingBlobsResponse {
148+
missing_blob_digests: vec![],
149+
}));
150+
}
151+
144152
request.instance_name.clone_from(&self.instance_name);
145153
self.perform_request(request, |request| async move {
146154
let channel = self
147155
.connection_manager
148-
.connection(format!("find_missing_blobs: {:?}", request.blob_digests))
156+
.connection(format!(
157+
"find_missing_blobs: ({}) {:?}",
158+
request.blob_digests.len(),
159+
request.blob_digests
160+
))
149161
.await
150162
.err_tip(|| "in find_missing_blobs")?;
151163
ContentAddressableStorageClient::new(channel)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use core::time::Duration;
2+
3+
use nativelink_config::stores::{GrpcEndpoint, GrpcSpec, Retry, StoreType};
4+
use nativelink_error::Error;
5+
use nativelink_macro::nativelink_test;
6+
use nativelink_proto::build::bazel::remote::execution::v2::{
7+
FindMissingBlobsRequest, digest_function,
8+
};
9+
use nativelink_store::grpc_store::GrpcStore;
10+
use tokio::time::timeout;
11+
use tonic::Request;
12+
13+
#[nativelink_test]
14+
async fn fast_find_missing_blobs() -> Result<(), Error> {
15+
let spec = GrpcSpec {
16+
instance_name: String::new(),
17+
endpoints: vec![GrpcEndpoint {
18+
address: "http://foobar".into(),
19+
tls_config: None,
20+
concurrency_limit: None,
21+
connect_timeout_s: 0,
22+
tcp_keepalive_s: 0,
23+
http2_keepalive_interval_s: 0,
24+
http2_keepalive_timeout_s: 0,
25+
}],
26+
store_type: StoreType::Cas,
27+
retry: Retry::default(),
28+
max_concurrent_requests: 0,
29+
connections_per_endpoint: 0,
30+
rpc_timeout_s: 1,
31+
};
32+
let store = GrpcStore::new(&spec).await?;
33+
let request = Request::new(FindMissingBlobsRequest {
34+
instance_name: String::new(),
35+
blob_digests: vec![],
36+
digest_function: digest_function::Value::Sha256.into(),
37+
});
38+
let res = timeout(Duration::from_secs(1), async move {
39+
store.find_missing_blobs(request).await
40+
})
41+
.await??;
42+
let inner_res = res.into_inner();
43+
assert_eq!(inner_res.missing_blob_digests.len(), 0);
44+
Ok(())
45+
}

nativelink-util/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ rust_test_suite(
108108
"tests/proto_stream_utils_test.rs",
109109
"tests/resource_info_test.rs",
110110
"tests/retry_test.rs",
111+
"tests/store_trait_test.rs",
111112
"tests/telemetry_test.rs",
112113
"tests/tls_utils_test.rs",
113114
],
@@ -124,6 +125,7 @@ rust_test_suite(
124125
":nativelink-util",
125126
"//nativelink-config",
126127
"//nativelink-error",
128+
"//nativelink-metric",
127129
"//nativelink-proto",
128130
"@crates//:axum",
129131
"@crates//:bytes",

nativelink-util/src/store_trait.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use core::borrow::{Borrow, BorrowMut};
1616
use core::convert::Into;
1717
use core::fmt::{self, Debug, Display};
18+
use core::future;
1819
use core::hash::{Hash, Hasher};
1920
use core::ops::{Bound, RangeBounds};
2021
use core::pin::Pin;
@@ -455,6 +456,9 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
455456
&'a self,
456457
digests: &'a [StoreKey<'a>],
457458
) -> impl Future<Output = Result<Vec<Option<u64>>, Error>> + Send + 'a {
459+
if digests.is_empty() {
460+
return future::ready(Ok(vec![])).boxed();
461+
}
458462
self.as_store_driver_pin().has_many(digests)
459463
}
460464

@@ -466,6 +470,9 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
466470
digests: &'a [StoreKey<'a>],
467471
results: &'a mut [Option<u64>],
468472
) -> impl Future<Output = Result<(), Error>> + Send + 'a {
473+
if digests.is_empty() {
474+
return future::ready(Ok(())).boxed();
475+
}
469476
self.as_store_driver_pin()
470477
.has_with_results(digests, results)
471478
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use core::pin::Pin;
2+
use std::sync::Arc;
3+
4+
use nativelink_error::Error;
5+
use nativelink_macro::nativelink_test;
6+
use nativelink_metric::MetricsComponent;
7+
use nativelink_util::buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
8+
use nativelink_util::default_health_status_indicator;
9+
use nativelink_util::health_utils::HealthStatusIndicator;
10+
use nativelink_util::store_trait::{
11+
RemoveItemCallback, Store, StoreDriver, StoreKey, StoreLike, UploadSizeInfo,
12+
};
13+
use tonic::async_trait;
14+
15+
#[derive(Debug, MetricsComponent)]
16+
struct FakeStore {}
17+
18+
#[async_trait]
19+
#[allow(clippy::todo)]
20+
impl StoreDriver for FakeStore {
21+
async fn has_with_results(
22+
self: Pin<&Self>,
23+
_keys: &[StoreKey<'_>],
24+
_results: &mut [Option<u64>],
25+
) -> Result<(), Error> {
26+
todo!();
27+
}
28+
29+
async fn update(
30+
self: Pin<&Self>,
31+
_key: StoreKey<'_>,
32+
_reader: DropCloserReadHalf,
33+
_size_info: UploadSizeInfo,
34+
) -> Result<(), Error> {
35+
todo!();
36+
}
37+
38+
async fn get_part(
39+
self: Pin<&Self>,
40+
_key: StoreKey<'_>,
41+
_writer: &mut DropCloserWriteHalf,
42+
_offset: u64,
43+
_length: Option<u64>,
44+
) -> Result<(), Error> {
45+
todo!();
46+
}
47+
48+
fn inner_store(&self, _digest: Option<StoreKey>) -> &dyn StoreDriver {
49+
self
50+
}
51+
52+
fn as_any(&self) -> &(dyn core::any::Any + Sync + Send + 'static) {
53+
self
54+
}
55+
56+
fn as_any_arc(self: Arc<Self>) -> Arc<dyn core::any::Any + Sync + Send + 'static> {
57+
self
58+
}
59+
60+
fn register_remove_callback(
61+
self: Arc<Self>,
62+
_callback: Arc<dyn RemoveItemCallback>,
63+
) -> Result<(), Error> {
64+
todo!();
65+
}
66+
}
67+
68+
default_health_status_indicator!(FakeStore);
69+
70+
#[nativelink_test]
71+
async fn fast_has_with_results() -> Result<(), Error> {
72+
let store = Store::new(Arc::new(FakeStore {}));
73+
let mut results: [Option<u64>; 0] = [];
74+
store.has_with_results(&[], &mut results).await?;
75+
76+
Ok(())
77+
}
78+
79+
#[nativelink_test]
80+
async fn fast_has_many() -> Result<(), Error> {
81+
let store = Store::new(Arc::new(FakeStore {}));
82+
let res = store.has_many(&[]).await?;
83+
assert!(res.is_empty());
84+
85+
Ok(())
86+
}

0 commit comments

Comments
 (0)