Skip to content

Commit 565fd84

Browse files
Harden sync push retries and stabilize auto-sync acceptance coverage
1 parent c8ddb2d commit 565fd84

6 files changed

Lines changed: 195 additions & 21 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/contextdb-engine/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ time.workspace = true
2929
[dev-dependencies]
3030
contextdb-server = { path = "../contextdb-server", version = "0.2" }
3131
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
32+
futures-util = "0.3"
33+
async-nats = "0.46"
3234
testcontainers = "0.27"
3335
tempfile = "3"
3436
criterion = { version = "0.8", features = ["html_reports"] }

crates/contextdb-server/src/sync_client.rs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,42 @@ impl SyncClient {
265265

266266
match tokio::time::timeout(PUSH_REQUEST_TIMEOUT, inbox_sub.next()).await {
267267
Ok(Some(msg)) => {
268-
if msg.status == Some(async_nats::StatusCode::NO_RESPONDERS)
269-
&& attempt < 4
270-
{
271-
tracing::debug!(attempt, "push got no responders, retrying");
272-
continue;
268+
if let Some(status) = msg.status {
269+
if status == async_nats::StatusCode::NO_RESPONDERS && attempt < 4 {
270+
tracing::debug!(attempt, "push got no responders, retrying");
271+
continue;
272+
}
273+
if attempt < 4 {
274+
tracing::debug!(
275+
attempt,
276+
?status,
277+
"push got status reply, retrying"
278+
);
279+
continue;
280+
}
281+
return Err(Error::SyncError(format!(
282+
"push failed with NATS status reply: {status:?}"
283+
)));
273284
}
274-
let envelope = decode(&msg.payload)
275-
.map_err(|e| Error::SyncError(e.to_string()))?;
276-
let response: PushResponse =
277-
rmp_serde::from_slice(&envelope.payload)
278-
.map_err(|e| Error::SyncError(e.to_string()))?;
285+
286+
let envelope = match decode(&msg.payload) {
287+
Ok(envelope) => envelope,
288+
Err(err) if attempt < 4 => {
289+
tracing::debug!(attempt, error = %err, "push got malformed reply envelope, retrying");
290+
continue;
291+
}
292+
Err(err) => return Err(Error::SyncError(err.to_string())),
293+
};
294+
let response: PushResponse = match rmp_serde::from_slice(
295+
&envelope.payload,
296+
) {
297+
Ok(response) => response,
298+
Err(err) if attempt < 4 => {
299+
tracing::debug!(attempt, error = %err, "push got malformed reply payload, retrying");
300+
continue;
301+
}
302+
Err(err) => return Err(Error::SyncError(err.to_string())),
303+
};
279304
if let Some(err) = response.error {
280305
return Err(Error::SyncError(err));
281306
}

crates/contextdb-server/tests/sync_integration.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use contextdb_engine::Database;
22
use contextdb_engine::sync_types::{ConflictPolicies, ConflictPolicy};
3+
use contextdb_server::protocol::{MessageType, PushResponse, WireApplyResult, encode};
34
use contextdb_server::{SyncClient, SyncServer};
45
use std::collections::HashMap;
56
use std::sync::Arc;
@@ -114,6 +115,70 @@ async fn sync_round_trip_smoke() {
114115
let _ = client.pull(&policies).await;
115116
}
116117

118+
#[tokio::test]
119+
async fn sync_00b_push_retries_malformed_reply_before_succeeding() {
120+
use contextdb_core::Value;
121+
use futures_util::StreamExt;
122+
use uuid::Uuid;
123+
124+
let nats = start_nats().await;
125+
let edge = Arc::new(Database::open_memory());
126+
let empty = HashMap::new();
127+
edge.execute("CREATE TABLE t (id UUID PRIMARY KEY, v TEXT)", &empty)
128+
.unwrap();
129+
130+
let responder = async_nats::connect(&nats.nats_url).await.unwrap();
131+
let mut sub = responder
132+
.subscribe(contextdb_server::subjects::push_subject("malformed-reply"))
133+
.await
134+
.unwrap();
135+
136+
tokio::spawn(async move {
137+
let mut attempt = 0u32;
138+
while let Some(msg) = sub.next().await {
139+
attempt += 1;
140+
if let Some(reply) = msg.reply {
141+
let payload = if attempt == 1 {
142+
vec![0x91]
143+
} else {
144+
encode(
145+
MessageType::PushResponse,
146+
&PushResponse {
147+
result: Some(WireApplyResult {
148+
applied_rows: 1,
149+
skipped_rows: 0,
150+
conflicts: Vec::new(),
151+
new_lsn: 2,
152+
}),
153+
error: None,
154+
},
155+
)
156+
.unwrap()
157+
};
158+
responder.publish(reply, payload.into()).await.unwrap();
159+
if attempt >= 2 {
160+
break;
161+
}
162+
}
163+
}
164+
});
165+
166+
let client = SyncClient::new(edge.clone(), &nats.nats_url, "malformed-reply");
167+
let id = Uuid::new_v4();
168+
let mut p = HashMap::new();
169+
p.insert("id".to_string(), Value::Uuid(id));
170+
p.insert("v".to_string(), Value::Text("retry".into()));
171+
edge.execute("INSERT INTO t (id, v) VALUES ($id, $v)", &p)
172+
.unwrap();
173+
174+
let result = client
175+
.push()
176+
.await
177+
.expect("push should retry malformed reply");
178+
assert_eq!(result.applied_rows, 1);
179+
assert!(client.push_watermark() > 0, "push watermark should advance");
180+
}
181+
117182
/// I connected the sync server to a NATS account that denies its sync subscriptions,
118183
/// and the server task stayed alive instead of panicking during bootstrap.
119184
#[tokio::test]

tests/acceptance/common.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
use contextdb_core::{Direction, Error, Value, VersionedRow};
44
use contextdb_engine::{Database, QueryResult};
5+
use contextdb_server::protocol::{MessageType, PullRequest, PullResponse, decode, encode};
6+
use contextdb_server::subjects::pull_subject;
7+
use futures_util::StreamExt;
58
use std::collections::HashMap;
69
use std::fs::File;
710
use std::io::{ErrorKind, Read, Write};
@@ -297,6 +300,71 @@ pub(crate) async fn start_nats() -> NatsFixture {
297300
}
298301
}
299302

303+
pub(crate) async fn wait_for_sync_server_ready(
304+
nats_url: &str,
305+
tenant_id: &str,
306+
timeout: Duration,
307+
) -> bool {
308+
let start = Instant::now();
309+
while start.elapsed() < timeout {
310+
let client = match async_nats::connect(nats_url).await {
311+
Ok(client) => client,
312+
Err(_) => {
313+
tokio::time::sleep(Duration::from_millis(100)).await;
314+
continue;
315+
}
316+
};
317+
318+
let inbox = client.new_inbox();
319+
let mut inbox_sub = match client.subscribe(inbox.clone()).await {
320+
Ok(sub) => sub,
321+
Err(_) => {
322+
tokio::time::sleep(Duration::from_millis(100)).await;
323+
continue;
324+
}
325+
};
326+
327+
let payload = match encode(
328+
MessageType::PullRequest,
329+
&PullRequest {
330+
since_lsn: 0,
331+
max_entries: Some(1),
332+
},
333+
) {
334+
Ok(payload) => payload,
335+
Err(_) => return false,
336+
};
337+
338+
if client
339+
.publish_with_reply(pull_subject(tenant_id), inbox.clone(), payload.into())
340+
.await
341+
.is_err()
342+
{
343+
tokio::time::sleep(Duration::from_millis(100)).await;
344+
continue;
345+
}
346+
347+
let response = tokio::time::timeout(Duration::from_millis(500), inbox_sub.next()).await;
348+
match response {
349+
Ok(Some(msg)) if msg.status == Some(async_nats::StatusCode::NO_RESPONDERS) => {}
350+
Ok(Some(msg)) if msg.status.is_some() => {}
351+
Ok(Some(msg)) => {
352+
if let Ok(envelope) = decode(&msg.payload)
353+
&& matches!(envelope.message_type, MessageType::PullResponse)
354+
&& rmp_serde::from_slice::<PullResponse>(&envelope.payload).is_ok()
355+
{
356+
return true;
357+
}
358+
}
359+
Ok(None) | Err(_) => {}
360+
}
361+
362+
tokio::time::sleep(Duration::from_millis(100)).await;
363+
}
364+
365+
false
366+
}
367+
300368
pub(crate) fn wait_until(timeout: Duration, mut predicate: impl FnMut() -> bool) -> bool {
301369
let start = Instant::now();
302370
while start.elapsed() < timeout {

tests/acceptance/sync.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,10 @@ async fn f12c_auto_sync_pushes_deletes() {
721721
let ws_url = &nats.ws_url;
722722
let tenant = "f12c_auto_sync_pushes_deletes";
723723
let mut server = spawn_server(&server_path, tenant, nats_url);
724+
assert!(
725+
wait_for_sync_server_ready(nats_url, tenant, Duration::from_secs(15)).await,
726+
"sync server should be ready before f12c setup push begins"
727+
);
724728

725729
let mut child = spawn_cli(&edge_path, &["--tenant-id", tenant, "--nats-url", ws_url]);
726730

@@ -735,23 +739,31 @@ async fn f12c_auto_sync_pushes_deletes() {
735739
DELETE FROM sensors WHERE id = '00000000-0000-0000-0000-000000000002'\n",
736740
);
737741

738-
// Wait for DELETE to propagate — server should have 1 row, not 2
739-
// Each checker uses a unique path to avoid stale local data from previous pulls
740-
let mut checker_idx = 0u32;
741-
let mut last_stdout = String::new();
742+
let checker_path = edge_path.with_file_name("f12c-checker.db");
743+
let checker_setup = run_cli_script(
744+
&checker_path,
745+
&["--tenant-id", tenant, "--nats-url", ws_url],
746+
"CREATE TABLE sensors (id UUID PRIMARY KEY, name TEXT)\n.quit\n",
747+
);
748+
assert!(
749+
checker_setup.status.success(),
750+
"checker edge setup must succeed"
751+
);
752+
753+
// Wait for DELETE to propagate while the writer CLI is still running.
754+
// Reuse a single checker edge so the probe stays end-to-end without creating a new
755+
// edge process and schema bootstrap on every polling iteration.
756+
let mut last_checker_stdout = String::new();
742757
let found = wait_until(Duration::from_secs(30), || {
743-
checker_idx += 1;
744-
let fresh_path = edge_path.with_file_name(format!("f12c-checker-{checker_idx}.db"));
745758
let check = run_cli_script(
746-
&fresh_path,
759+
&checker_path,
747760
&["--tenant-id", tenant, "--nats-url", ws_url],
748-
"CREATE TABLE sensors (id UUID PRIMARY KEY, name TEXT)\n\
749-
.sync pull\n\
761+
".sync pull\n\
750762
SELECT count(*) FROM sensors\n\
751763
.quit\n",
752764
);
753765
let stdout = output_string(&check.stdout);
754-
last_stdout = stdout.clone();
766+
last_checker_stdout = stdout.clone();
755767
stdout.contains("| 1")
756768
});
757769

@@ -761,7 +773,7 @@ async fn f12c_auto_sync_pushes_deletes() {
761773

762774
assert!(
763775
found,
764-
"DELETE must auto-sync to server while CLI is still running; child stdout={}; child stderr={}; last checker stdout={last_stdout}",
776+
"DELETE must auto-sync to server while CLI is still running; child stdout={}; child stderr={}; last checker stdout={last_checker_stdout}",
765777
output_string(&output.stdout),
766778
output_string(&output.stderr),
767779
);

0 commit comments

Comments
 (0)