Skip to content

Commit a5204f8

Browse files
committed
Implement count_by_index in redis store.
1 parent 4b9d0cd commit a5204f8

File tree

3 files changed

+154
-1
lines changed

3 files changed

+154
-1
lines changed

nativelink-store/src/redis_store.rs

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use uuid::Uuid;
6565
use crate::cas_utils::is_zero_digest;
6666
use crate::redis_utils::{
6767
FtAggregateCursor, FtAggregateOptions, FtCreateOptions, SearchSchema, ft_aggregate, ft_create,
68+
ft_search_count,
6869
};
6970

7071
/// The default size of the read chunk when reading data from Redis.
@@ -1632,7 +1633,80 @@ where
16321633
where
16331634
K: SchedulerIndexProvider + Send,
16341635
{
1635-
Err(make_err!(Code::Unimplemented, "Not implemented"))
1636+
if index.is_empty() {
1637+
return Ok(Vec::new());
1638+
}
1639+
let index_name = format!(
1640+
"{}",
1641+
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1642+
);
1643+
let mut counts = Vec::with_capacity(index.len());
1644+
for idx in index {
1645+
let index_value = idx.index_value();
1646+
let sanitized_field = try_sanitize(index_value.as_ref()).err_tip(|| {
1647+
format!("In RedisStore::count_by_index::try_sanitize - {index_value:?}")
1648+
})?;
1649+
let query = if sanitized_field.is_empty() {
1650+
"*".to_string()
1651+
} else {
1652+
format!("@{}:{{ {} }}", K::INDEX_NAME, sanitized_field)
1653+
};
1654+
let run_ft_search = || {
1655+
Ok::<_, Error>(async {
1656+
let mut client = self.get_client().await?;
1657+
ft_search_count(
1658+
&mut client.connection_manager,
1659+
index_name.as_str(),
1660+
query.as_str(),
1661+
)
1662+
.await
1663+
})
1664+
};
1665+
let count = run_ft_search()?
1666+
.or_else(|_| async {
1667+
let mut schema = vec![SearchSchema {
1668+
field_name: K::INDEX_NAME.into(),
1669+
sortable: false,
1670+
}];
1671+
if let Some(sort_key) = K::MAYBE_SORT_KEY {
1672+
schema.push(SearchSchema {
1673+
field_name: sort_key.into(),
1674+
sortable: true,
1675+
});
1676+
}
1677+
let create_result = ft_create(
1678+
self.connection_manager.get_connection().await?.0,
1679+
format!(
1680+
"{}",
1681+
get_index_name!(K::KEY_PREFIX, K::INDEX_NAME, K::MAYBE_SORT_KEY)
1682+
),
1683+
FtCreateOptions {
1684+
prefixes: vec![K::KEY_PREFIX.into()],
1685+
nohl: true,
1686+
nofields: true,
1687+
nofreqs: true,
1688+
nooffsets: true,
1689+
temporary: Some(INDEX_TTL_S),
1690+
},
1691+
schema,
1692+
)
1693+
.await
1694+
.err_tip(|| {
1695+
format!(
1696+
"Error with ft_create in RedisStore::count_by_index({index_name})",
1697+
)
1698+
});
1699+
let count_result = run_ft_search()?.await.err_tip(|| {
1700+
format!(
1701+
"Error with second FT.SEARCH count in RedisStore::count_by_index({index_name})",
1702+
)
1703+
});
1704+
count_result.or_else(|e| create_result.merge(Err(e)))
1705+
})
1706+
.await?;
1707+
counts.push(count);
1708+
}
1709+
Ok(counts)
16361710
}
16371711

16381712
async fn search_by_index_prefix<K>(
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2024-2026 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// See LICENSE file for details
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use nativelink_error::{Code, Error, ResultExt, make_err};
16+
use redis::aio::ConnectionLike;
17+
use redis::Value;
18+
19+
/// Returns the number of documents matching `query` on RediSearch index `index`, without loading
20+
/// document payloads (`LIMIT 0 0`).
21+
pub(crate) async fn ft_search_count<C: ConnectionLike + Send>(
22+
connection_manager: &mut C,
23+
index: &str,
24+
query: &str,
25+
) -> Result<usize, Error> {
26+
let res: Value = redis::cmd("FT.SEARCH")
27+
.arg(index)
28+
.arg(query)
29+
.arg("LIMIT")
30+
.arg(0_i64)
31+
.arg(0_i64)
32+
.query_async(connection_manager)
33+
.await
34+
.err_tip(|| format!("FT.SEARCH count index={index} query={query:?}"))?;
35+
36+
parse_ft_search_total(res).err_tip(|| format!("parse FT.SEARCH total index={index}"))
37+
}
38+
39+
fn parse_ft_search_total(value: Value) -> Result<usize, Error> {
40+
match value {
41+
Value::Array(arr) if !arr.is_empty() => int_to_document_count(&arr[0]),
42+
Value::Map(ref entries) => {
43+
for (k, v) in entries.iter() {
44+
let Value::SimpleString(key) = k else {
45+
continue;
46+
};
47+
if key == "total_results" {
48+
return int_to_document_count(v);
49+
}
50+
}
51+
Err(make_err!(
52+
Code::Internal,
53+
"FT.SEARCH map missing total_results: {value:?}"
54+
))
55+
}
56+
other => Err(make_err!(
57+
Code::Internal,
58+
"Unexpected FT.SEARCH response: {other:?}"
59+
)),
60+
}
61+
}
62+
63+
fn int_to_document_count(v: &Value) -> Result<usize, Error> {
64+
let Value::Int(n) = v else {
65+
return Err(make_err!(
66+
Code::Internal,
67+
"FT.SEARCH count field not integer: {v:?}"
68+
));
69+
};
70+
if *n < 0 {
71+
return Err(make_err!(
72+
Code::Internal,
73+
"Negative document count from FT.SEARCH: {n}"
74+
));
75+
}
76+
Ok(*n as usize)
77+
}

nativelink-store/src/redis_utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,7 @@ mod aggregate_types;
1616
mod ft_aggregate;
1717
mod ft_create;
1818
mod ft_cursor_read;
19+
mod ft_search_count;
1920
pub(crate) use ft_aggregate::{FtAggregateCursor, FtAggregateOptions, ft_aggregate};
2021
pub(crate) use ft_create::{FtCreateOptions, SearchSchema, ft_create};
22+
pub(crate) use ft_search_count::ft_search_count;

0 commit comments

Comments
 (0)