Skip to content

Commit 3b86d17

Browse files
andinuxclaude
andcommitted
fix: gate chunked-receive checkpoint on stream completion
The receive path advanced check_dbversion/check_seq per applied chunk to the chunk's last row, which can fall mid-db_version. Since the server's cloudsync_payload_chunks resumes on db_version > since with no seq cursor, a stop between chunks of a split db_version silently skipped the un-applied rows on the next /check (data loss). Mirror the send path: advance the receive cursor only after the whole chunk stream is applied, to the stream watermark, never per chunk. cloudsync_payload_apply gains a C-level checkpoint argument (watermark / none / last-applied); the /check response signals watermark + final chunk, and falls back to legacy monolithic behavior when absent. The public single-arg SQL function and the send path are unchanged; re-delivered rows stay idempotent. Adds do_test_payload_chunks_split_dbversion reproducing a single db_version split across >=2 v2 chunks with partial apply. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 0f81be8 commit 3b86d17

8 files changed

Lines changed: 350 additions & 32 deletions

File tree

API.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,8 @@ If a package of new changes is already available for the local site, the server
759759
This function is designed to be called periodically to keep the local database in sync.
760760
To force an update and wait for changes (with a timeout), use [`cloudsync_network_sync(wait_ms, max_retries)`].
761761

762+
When the server delivers a download as a stream of chunks, the local receive checkpoint is advanced only after the **whole stream** has been applied — never in the middle of a source `db_version`. Mirroring the send path, the server tags the stream with its watermark and marks the final chunk; the checkpoint jumps straight to that watermark on completion. A stop between chunks therefore re-delivers the stream from the unchanged checkpoint on the next call (apply is idempotent, so re-delivered rows are harmless), so no changes can be skipped. A non-chunked (monolithic) download advances the checkpoint to the artifact's last applied position, as before.
763+
762764
If the network is misconfigured or the remote server is unreachable, the function raises a SQL error. If the received payload cannot be applied locally (for example because of an unknown schema hash), the error is returned as a `receive.error` field in the JSON response. If the server reports an unresolved failed check job (e.g. an `encode_changes` failure), that failure is forwarded as a `receive.lastFailure` object.
763765

764766
**Parameters:** None.

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
1919

2020
- `cloudsync_payload_apply()` now accepts legacy payloads, monolithic payloads, and v3 fragment payloads without enforcing the local `payload_max_chunk_size`, preserving compatibility between peers with different settings.
2121
- `cloudsync_network_send_changes()` now streams outgoing changes through `cloudsync_payload_chunks()` instead of first building one monolithic payload. This bounds transport payload size for the built-in network path and lets large rowsets or oversized BLOB/TEXT values flow through the same `/apply` endpoint as regular payloads.
22+
- The chunked-download receive path advances the local receive checkpoint (`check_dbversion` / `check_seq`) **only after a chunk stream has been fully applied**, jumping straight to the stream watermark — never into the middle of a source `db_version`. This mirrors the send path and ensures a stop between chunks cannot skip the un-applied rows of a `db_version` split across chunks on the next `/check` (the server resumes on `db_version > since`, with no intra-version cursor). `cloudsync_payload_apply()` no longer advances the receive checkpoint per applied chunk; the built-in network `/check` path drives it from the server's watermark and final-chunk signal, and falls back to the previous monolithic behavior when the server sends no watermark. Re-delivered rows remain idempotent.
2223

2324
## [1.0.20] - 2026-05-26
2425

src/cloudsync.c

Lines changed: 72 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,12 @@ struct cloudsync_context {
186186

187187
// deferred column-batch merge (active during payload_apply)
188188
merge_pending_batch *pending_batch;
189+
190+
// last (db_version, seq) successfully applied during the current
191+
// cloudsync_payload_apply call; used to resolve the
192+
// CLOUDSYNC_CHECKPOINT_LAST_APPLIED receive-checkpoint mode (-1 = none yet).
193+
int64_t apply_last_db_version;
194+
int64_t apply_last_seq;
189195
};
190196

191197
struct cloudsync_table_context {
@@ -3770,16 +3776,17 @@ static int cloudsync_payload_apply_single_decoded_row (cloudsync_context *data,
37703776
if (rc != DBRES_OK) goto cleanup;
37713777
}
37723778

3773-
int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
3774-
int seq_setting = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
3775-
if (db_version >= dbversion) {
3776-
char buf[256];
3777-
snprintf(buf, sizeof(buf), "%" PRId64, db_version);
3778-
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
3779-
if (seq != seq_setting) {
3780-
snprintf(buf, sizeof(buf), "%" PRId64, seq);
3781-
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
3782-
}
3779+
// Do NOT advance the receive cursor here: a v3 value carries a single
3780+
// (db_version, seq) that can be in the middle of its source db_version, and a
3781+
// db_version's chunks can span multiple /check artifacts. Advancing per value
3782+
// would leave the cursor mid-db_version. The durable cursor is advanced once,
3783+
// after the whole payload/stream is applied, via cloudsync_payload_apply's
3784+
// checkpoint argument. Record the last applied position for the
3785+
// CLOUDSYNC_CHECKPOINT_LAST_APPLIED mode.
3786+
if (db_version > data->apply_last_db_version ||
3787+
(db_version == data->apply_last_db_version && seq > data->apply_last_seq)) {
3788+
data->apply_last_db_version = db_version;
3789+
data->apply_last_seq = seq;
37833790
}
37843791

37853792
if (pnrows) *pnrows += 1;
@@ -3985,7 +3992,42 @@ static int cloudsync_payload_apply_fragment_row (cloudsync_context *data, clouds
39853992

39863993
// #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION
39873994

3988-
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows) {
3995+
// Advance the durable receive cursor (check_dbversion/check_seq) after a payload
3996+
// (or a fully-applied chunk stream) has been applied. See the checkpoint-mode
3997+
// documentation on cloudsync_payload_apply in cloudsync.h. The advance is
3998+
// strictly monotonic so re-delivered rows never regress the cursor.
3999+
static void cloudsync_payload_apply_checkpoint (cloudsync_context *data, int64_t checkpoint_db_version, int64_t checkpoint_seq) {
4000+
int64_t target_db_version;
4001+
int64_t target_seq;
4002+
4003+
if (checkpoint_db_version == CLOUDSYNC_CHECKPOINT_NONE) return;
4004+
if (checkpoint_db_version == CLOUDSYNC_CHECKPOINT_LAST_APPLIED) {
4005+
// Nothing applied -> nothing to checkpoint.
4006+
if (data->apply_last_db_version < 0) return;
4007+
target_db_version = data->apply_last_db_version;
4008+
target_seq = data->apply_last_seq;
4009+
} else {
4010+
target_db_version = checkpoint_db_version;
4011+
target_seq = checkpoint_seq;
4012+
}
4013+
4014+
int64_t cur_db_version = dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
4015+
int64_t cur_seq = dbutils_settings_get_int64_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
4016+
4017+
// monotonic guard: never move the cursor backwards
4018+
if (target_db_version < cur_db_version) return;
4019+
if (target_db_version == cur_db_version && target_seq <= cur_seq) return;
4020+
4021+
char buf[256];
4022+
snprintf(buf, sizeof(buf), "%" PRId64, target_db_version);
4023+
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
4024+
if (target_seq != cur_seq) {
4025+
snprintf(buf, sizeof(buf), "%" PRId64, target_seq);
4026+
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
4027+
}
4028+
}
4029+
4030+
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *pnrows, int64_t checkpoint_db_version, int64_t checkpoint_seq) {
39894031
// Guard against calling payload_apply before cloudsync_init: without this,
39904032
// the settings lookups at the top of this function would each emit a
39914033
// "no such table: cloudsync_settings" debug line, control would fall
@@ -4001,7 +4043,12 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
40014043

40024044
// sanity check
40034045
if (blen < (int)sizeof(cloudsync_payload_header)) return cloudsync_set_error(data, "Error on cloudsync_payload_apply: invalid payload length", DBRES_MISUSE);
4004-
4046+
4047+
// track the last (db_version, seq) applied by this call so the receive
4048+
// checkpoint can be computed once, after the whole payload is applied
4049+
data->apply_last_db_version = -1;
4050+
data->apply_last_seq = -1;
4051+
40054052
// decode header
40064053
cloudsync_payload_header header;
40074054
memcpy(&header, payload, sizeof(cloudsync_payload_header));
@@ -4084,9 +4131,13 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
40844131
}
40854132
if (clone) cloudsync_memory_free(clone);
40864133
if (pnrows) *pnrows = applied_rows;
4134+
// Advance the receive cursor only after the whole payload is applied,
4135+
// gated on the caller-supplied checkpoint (a non-final chunk passes
4136+
// CLOUDSYNC_CHECKPOINT_NONE and leaves the cursor untouched).
4137+
if (rc == DBRES_OK) cloudsync_payload_apply_checkpoint(data, checkpoint_db_version, checkpoint_seq);
40874138
return rc;
40884139
}
4089-
4140+
40904141
// precompile the insert statement
40914142
dbvm_t *vm = NULL;
40924143
int rc = databasevm_prepare(data, SQL_CHANGES_INSERT_ROW, &vm, 0);
@@ -4099,8 +4150,6 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
40994150
uint16_t ncols = header.ncols;
41004151
uint32_t nrows = header.nrows;
41014152
int64_t last_payload_db_version = -1;
4102-
int dbversion = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION);
4103-
int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
41044153
cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
41054154

41064155
// Initialize deferred column-batch merge
@@ -4213,16 +4262,15 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
42134262

42144263
if (rc == DBRES_DONE) rc = DBRES_OK;
42154264
if (rc == DBRES_OK) {
4216-
char buf[256];
4217-
if (decoded_context.db_version >= dbversion) {
4218-
snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.db_version);
4219-
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
4220-
4221-
if (decoded_context.seq != seq) {
4222-
snprintf(buf, sizeof(buf), "%" PRId64, decoded_context.seq);
4223-
dbutils_settings_set_key_value(data, CLOUDSYNC_KEY_CHECK_SEQ, buf);
4224-
}
4265+
// Record the last applied (db_version, seq) and advance the receive cursor
4266+
// once, gated on the caller-supplied checkpoint. A non-final chunk passes
4267+
// CLOUDSYNC_CHECKPOINT_NONE so the cursor never lands mid-db_version.
4268+
if (decoded_context.db_version > data->apply_last_db_version ||
4269+
(decoded_context.db_version == data->apply_last_db_version && decoded_context.seq > data->apply_last_seq)) {
4270+
data->apply_last_db_version = decoded_context.db_version;
4271+
data->apply_last_seq = decoded_context.seq;
42254272
}
4273+
cloudsync_payload_apply_checkpoint(data, checkpoint_db_version, checkpoint_seq);
42264274
}
42274275

42284276
cleanup:

src/cloudsync.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,23 @@ const char *cloudsync_schema (cloudsync_context *data);
9494
const char *cloudsync_table_schema (cloudsync_context *data, const char *table_name);
9595

9696
// Payload
97-
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *nrows);
97+
// Receive-checkpoint modes for cloudsync_payload_apply's checkpoint_db_version
98+
// argument. The receive cursor (check_dbversion/check_seq) must only ever land
99+
// on a complete db_version boundary, otherwise a stop between chunks of a single
100+
// source db_version silently skips the unapplied rows on the next /check (the
101+
// server's cloudsync_payload_chunks uses db_version > since with no seq cursor).
102+
// >= 0 advance the cursor to exactly this
103+
// (watermark_db_version), with checkpoint_seq.
104+
// Used once a chunk stream is fully applied.
105+
// CLOUDSYNC_CHECKPOINT_NONE do not advance the cursor. Used for a
106+
// non-final chunk of a multi-chunk stream.
107+
// CLOUDSYNC_CHECKPOINT_LAST_APPLIED advance to this artifact's last applied
108+
// (db_version, seq). Legacy/monolithic
109+
// behavior: safe only for a complete payload
110+
// that ends on a db_version boundary.
111+
#define CLOUDSYNC_CHECKPOINT_NONE (-1)
112+
#define CLOUDSYNC_CHECKPOINT_LAST_APPLIED (-2)
113+
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *nrows, int64_t checkpoint_db_version, int64_t checkpoint_seq);
98114
int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync_context *data, int argc, dbvalue_t **argv);
99115
int cloudsync_payload_encode_final (cloudsync_payload_context *payload, cloudsync_context *data);
100116
char *cloudsync_payload_blob (cloudsync_payload_context *payload, int64_t *blob_size, int64_t *nrows);

src/network/network.c

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ int network_set_sqlite_result (sqlite3_context *context, NETWORK_RESULT *result)
726726
// on the sqlite3_context. This lets composite callers (cloudsync_network_sync)
727727
// surface apply errors as structured JSON. Endpoint/network errors always raise
728728
// a SQL error regardless of err_out.
729-
int network_download_changes (sqlite3_context *context, const char *download_url, int *pnrows, char **err_out) {
729+
int network_download_changes (sqlite3_context *context, const char *download_url, int *pnrows, char **err_out, int64_t checkpoint_db_version, int64_t checkpoint_seq) {
730730
DEBUG_FUNCTION("network_download_changes");
731731

732732
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
@@ -740,7 +740,7 @@ int network_download_changes (sqlite3_context *context, const char *download_url
740740

741741
int rc = SQLITE_OK;
742742
if (result.code == CLOUDSYNC_NETWORK_BUFFER) {
743-
rc = cloudsync_payload_apply(data, result.buffer, (int)result.blen, pnrows);
743+
rc = cloudsync_payload_apply(data, result.buffer, (int)result.blen, pnrows, checkpoint_db_version, checkpoint_seq);
744744
if (rc != DBRES_OK) {
745745
const char *msg = cloudsync_errmsg(data);
746746
if (!msg || !msg[0]) msg = "cloudsync_payload_apply failed";
@@ -861,6 +861,28 @@ static int64_t json_extract_int(const char *json, size_t json_len, const char *k
861861
return strtoll(json + val->start, NULL, 10);
862862
}
863863

864+
static bool json_extract_bool(const char *json, size_t json_len, const char *key, bool default_value) {
865+
if (!json || json_len == 0 || !key) return default_value;
866+
867+
jsmn_parser parser;
868+
jsmntok_t tokens[JSMN_MAX_TOKENS];
869+
jsmn_init(&parser);
870+
int ntokens = jsmn_parse(&parser, json, json_len, tokens, JSMN_MAX_TOKENS);
871+
if (ntokens < 1 || tokens[0].type != JSMN_OBJECT) return default_value;
872+
873+
int i = jsmn_find_key(json, tokens, ntokens, key);
874+
if (i < 0 || i + 1 >= ntokens) return default_value;
875+
876+
jsmntok_t *val = &tokens[i + 1];
877+
if (val->type != JSMN_PRIMITIVE) return default_value;
878+
879+
// JSON booleans (true/false) and numeric flags (1/0) are both accepted.
880+
char c = json[val->start];
881+
if (c == 't' || c == 'T') return true;
882+
if (c == 'f' || c == 'F' || c == 'n' || c == 'N') return false;
883+
return strtoll(json + val->start, NULL, 10) != 0;
884+
}
885+
864886
static int json_extract_array_size(const char *json, size_t json_len, const char *key) {
865887
if (!json || json_len == 0 || !key) return -1;
866888

@@ -1630,7 +1652,30 @@ int cloudsync_network_check_internal(sqlite3_context *context, int *pnrows, sync
16301652
// Branch on the presence of "url" rather than HTTP status; both shapes arrive as BUFFER.
16311653
char *download_url = json_extract_string(result.buffer, result.blen, "url");
16321654
if (download_url) {
1633-
rc = network_download_changes(context, download_url, pnrows, err_out);
1655+
// Receive checkpoint, mirroring the send-path watermark. The /check
1656+
// response signals how far the durable cursor may advance:
1657+
// no "watermark" -> legacy / monolithic artifact: advance
1658+
// to the artifact's last applied
1659+
// (db_version, seq) (CHECKPOINT_LAST_APPLIED).
1660+
// "watermark" + "final": -> chunked stream. Advance the cursor to
1661+
// watermark only on the final chunk;
1662+
// non-final chunks leave it untouched
1663+
// (CHECKPOINT_NONE) so it never lands in
1664+
// the middle of a source db_version.
1665+
// The server serves successive chunks of the same stream across /check
1666+
// calls (the durable cursor stays at "since" until "final"), tracking
1667+
// chunk-level progress on its side; apply is idempotent, so a stop
1668+
// before "final" simply re-delivers the stream from "since".
1669+
int64_t watermark = json_extract_int(result.buffer, result.blen, "watermark", -1);
1670+
int64_t checkpoint_db_version;
1671+
int64_t checkpoint_seq = 0;
1672+
if (watermark < 0) {
1673+
checkpoint_db_version = CLOUDSYNC_CHECKPOINT_LAST_APPLIED;
1674+
} else {
1675+
bool final_chunk = json_extract_bool(result.buffer, result.blen, "final", true);
1676+
checkpoint_db_version = final_chunk ? watermark : CLOUDSYNC_CHECKPOINT_NONE;
1677+
}
1678+
rc = network_download_changes(context, download_url, pnrows, err_out, checkpoint_db_version, checkpoint_seq);
16341679
cloudsync_memory_free(download_url);
16351680
}
16361681
// failures.check may appear in either shape; extract opportunistically.

src/postgresql/cloudsync_postgresql.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,9 @@ Datum cloudsync_payload_decode (PG_FUNCTION_ARGS) {
14201420

14211421
PG_TRY();
14221422
{
1423-
rc = cloudsync_payload_apply(data, payload, blen, &nrows);
1423+
// PostgreSQL applies a complete monolithic payload: legacy last-applied
1424+
// checkpoint (ends on a db_version boundary, so it is safe).
1425+
rc = cloudsync_payload_apply(data, payload, blen, &nrows, CLOUDSYNC_CHECKPOINT_LAST_APPLIED, 0);
14241426
}
14251427
PG_CATCH();
14261428
{

src/sqlite/cloudsync_sqlite.c

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,11 +1092,16 @@ void dbsync_payload_decode (sqlite3_context *context, int argc, sqlite3_value **
10921092

10931093
// obtain payload
10941094
const char *payload = (const char *)database_value_blob(argv[0]);
1095-
1095+
10961096
// apply changes
1097+
// The public SQL function applies a single complete payload: advance the
1098+
// receive cursor to its last applied (db_version, seq) (legacy behavior, safe
1099+
// for a payload that ends on a db_version boundary). The chunked-download
1100+
// receive path gates cursor advancement on stream completion via the C-level
1101+
// checkpoint argument instead (see cloudsync_payload_apply in cloudsync.h).
10971102
int nrows = 0;
10981103
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
1099-
int rc = cloudsync_payload_apply(data, payload, blen, &nrows);
1104+
int rc = cloudsync_payload_apply(data, payload, blen, &nrows, CLOUDSYNC_CHECKPOINT_LAST_APPLIED, 0);
11001105
if (rc != SQLITE_OK) {
11011106
sqlite3_result_error(context, cloudsync_errmsg(data), -1);
11021107
sqlite3_result_error_code(context, rc);
@@ -1541,7 +1546,9 @@ void dbsync_payload_load (sqlite3_context *context, int argc, sqlite3_value **ar
15411546

15421547
int nrows = 0;
15431548
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
1544-
int rc = cloudsync_payload_apply (data, payload, (int)payload_size, &nrows);
1549+
// File-based load applies a complete monolithic payload: legacy last-applied
1550+
// checkpoint (ends on a db_version boundary, so it is safe).
1551+
int rc = cloudsync_payload_apply (data, payload, (int)payload_size, &nrows, CLOUDSYNC_CHECKPOINT_LAST_APPLIED, 0);
15451552
if (payload) cloudsync_memory_free(payload);
15461553

15471554
if (rc != SQLITE_OK) {

0 commit comments

Comments
 (0)