Skip to content

Commit b496426

Browse files
committed
feature: implement retries on backfill-qdrant-from-pg script
1 parent 9fa9daf commit b496426

1 file changed

Lines changed: 79 additions & 19 deletions

File tree

server/src/bin/backfill-qdrant-from-pg.rs

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,37 @@ struct CollectionToPointids {
2626
point_ids: HashSet<String>,
2727
}
2828

29+
async fn retry_operation<F, Fut, T, E>(
30+
operation: F,
31+
max_retries: u32,
32+
operation_name: &str,
33+
) -> Result<T, E>
34+
where
35+
F: Fn() -> Fut,
36+
Fut: std::future::Future<Output = Result<T, E>>,
37+
E: std::fmt::Display,
38+
{
39+
for attempt in 1..=max_retries {
40+
println!("{} (attempt {}/{})", operation_name, attempt, max_retries);
41+
42+
match operation().await {
43+
Ok(result) => return Ok(result),
44+
Err(e) => {
45+
println!("Error on attempt {}: {}", attempt, e);
46+
if attempt < max_retries {
47+
println!("Retrying in 2 seconds...");
48+
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
49+
} else {
50+
println!("Max retries reached");
51+
return Err(e);
52+
}
53+
}
54+
}
55+
}
56+
57+
unreachable!("Loop should always return");
58+
}
59+
2960
#[allow(clippy::print_stdout)]
3061
#[tokio::main]
3162
async fn main() -> Result<(), ServiceError> {
@@ -71,8 +102,18 @@ async fn main() -> Result<(), ServiceError> {
71102
.unwrap_or(uuid::Uuid::max());
72103

73104
let mut offset = Some(start_offset);
105+
let mut batch_number = 0;
106+
let mut total_chunks_processed = 0;
107+
let mut total_missing_points = 0;
108+
109+
println!("Starting backfill from offset: {}", start_offset);
110+
println!("Will stop at offset: {}", stop_offset);
111+
println!("Batch size: {}", postgres_fetch_points_count);
74112

75113
while let Some(cur_offset) = offset {
114+
batch_number += 1;
115+
println!("\n=== Batch {} ===", batch_number);
116+
println!("Current offset: {}", cur_offset);
76117
use trieve_server::data::schema::chunk_group_bookmarks::dsl as chunk_group_bookmarks_columns;
77118
use trieve_server::data::schema::chunk_metadata::dsl as chunk_metadata_columns;
78119
use trieve_server::data::schema::datasets::dsl as datasets_columns;
@@ -94,7 +135,8 @@ async fn main() -> Result<(), ServiceError> {
94135
.await
95136
.expect("Failed to query chunks");
96137

97-
println!("Got {} ids", qdrant_dataset_id_pairs.len());
138+
println!("Fetched {} chunk records from PostgreSQL", qdrant_dataset_id_pairs.len());
139+
total_chunks_processed += qdrant_dataset_id_pairs.len();
98140

99141
let chunk_ids = qdrant_dataset_id_pairs
100142
.iter()
@@ -103,12 +145,13 @@ async fn main() -> Result<(), ServiceError> {
103145

104146
offset = chunk_ids.iter().max().copied().copied();
105147
if qdrant_dataset_id_pairs.len() < (postgres_fetch_points_count as usize) {
106-
println!("setting offset to None");
148+
println!("Reached end of table (batch smaller than limit)");
107149
offset = None;
108150
}
109151

110152
if let Some(new_offset) = offset {
111153
if new_offset > stop_offset {
154+
println!("Reached stop offset: {}", stop_offset);
112155
offset = None;
113156
}
114157
}
@@ -122,7 +165,8 @@ async fn main() -> Result<(), ServiceError> {
122165
.load::<(uuid::Uuid, uuid::Uuid)>(&mut conn)
123166
.await
124167
.expect("Failed to query chunk groups");
125-
// for each find qdrant collection name via dataset id
168+
169+
println!("Found {} group associations", groups_ids_to_chunk_ids.len());
126170

127171
let dataset_id_config_pair = datasets_columns::datasets
128172
.filter(
@@ -138,6 +182,8 @@ async fn main() -> Result<(), ServiceError> {
138182
.await
139183
.expect("Failed to load dataset settings");
140184

185+
println!("Loaded {} unique datasets", dataset_id_config_pair.len());
186+
141187
// Modified version that combines by collection name
142188
let collected_qdrant_ids: Vec<CollectionToPointids> = qdrant_dataset_id_pairs
143189
.into_iter()
@@ -180,21 +226,31 @@ async fn main() -> Result<(), ServiceError> {
180226
collection_pairs.qdrant_collection_name,
181227
);
182228

183-
let qdrant_point_ids_response = qdrant_client
184-
.get_points(GetPoints {
185-
collection_name: collection_pairs.qdrant_collection_name,
186-
ids: collection_pairs
187-
.point_ids
188-
.iter()
189-
.map(|point_uuid| PointId::from(point_uuid.as_str()))
190-
.collect(),
191-
with_payload: Some(false.into()),
192-
with_vectors: Some(false.into()),
193-
read_consistency: None,
194-
shard_key_selector: None,
195-
timeout: None,
196-
})
197-
.await;
229+
let collection_name = collection_pairs.qdrant_collection_name.clone();
230+
let point_ids: Vec<PointId> = collection_pairs
231+
.point_ids
232+
.iter()
233+
.map(|point_uuid| PointId::from(point_uuid.as_str()))
234+
.collect();
235+
236+
let qdrant_point_ids_response = retry_operation(
237+
|| async {
238+
qdrant_client
239+
.get_points(GetPoints {
240+
collection_name: collection_name.clone(),
241+
ids: point_ids.clone(),
242+
with_payload: Some(false.into()),
243+
with_vectors: Some(false.into()),
244+
read_consistency: None,
245+
shard_key_selector: None,
246+
timeout: None,
247+
})
248+
.await
249+
},
250+
3,
251+
"Querying Qdrant for existing points",
252+
)
253+
.await;
198254

199255
match qdrant_point_ids_response {
200256
Ok(resp) => {
@@ -309,10 +365,14 @@ async fn main() -> Result<(), ServiceError> {
309365
.map_err(|err| ServiceError::BadRequest(err.to_string()))?;
310366
}
311367
}
312-
_ => unreachable!(),
368+
Err(e) => {
369+
println!("Error querying Qdrant after retries: {}", e);
370+
println!("Skipping this collection and continuing with next batch");
371+
}
313372
}
314373
}
315374
}
316375

376+
println!("Backfill complete");
317377
Ok(())
318378
}

0 commit comments

Comments
 (0)