Skip to content

Commit ce9b221

Browse files
andinuxclaude
andcommitted
feat(payload): download spool for chunked /check
Add a server-side spool so the /check download path can page a window's chunk stream one chunk per call, instead of having the network driver (libpq / sqlitecloud-go) re-materialize the whole stream into memory. - cloudsync_payload_spool table + cloudsync_payload_spool_fill/_drop on both engines (SQLite C, lazy-create; PG plpgsql, table created at install). fill generates a window's chunks once (idempotent, atomic), marks the last chunk is_final, and self-GCs abandoned streams (24h TTL). - cloudsync_network_check_internal echoes a best-effort page cursor to/from /check so the stateless server serves the next spool page; retrocompatible (optional response field, sent in every request). - Tests: do_test_payload_spool (SQLite) and a spool block in 52_payload_chunks.sql (PG) covering byte-identity vs direct generation, idempotent re-fill, empty window, drop, and stale-GC. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 3b86d17 commit ce9b221

8 files changed

Lines changed: 599 additions & 6 deletions

File tree

src/network/network.c

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ struct network_data {
8383
char *apply_endpoint;
8484
char *status_endpoint;
8585
int ticket_enabled;
86+
// Best-effort page cursor for the chunked /check download drain. The durable
87+
// receive cursor (check_dbversion/check_seq) is frozen at "since" for the whole
88+
// drain, so the server (which is stateless across /check calls) needs the client
89+
// to echo which spool page to serve next. In-memory only: losing it just
90+
// restarts the drain from page 0, which is safe because apply is idempotent.
91+
int64_t check_cursor; // next page index to request (0 = fresh drain)
92+
int64_t check_cursor_since; // the check_dbversion check_cursor belongs to
8693
#ifndef CLOUDSYNC_OMIT_CURL
8794
CURL *api_curl;
8895
CURL *artifact_curl;
@@ -1636,11 +1643,22 @@ int cloudsync_network_check_internal(sqlite3_context *context, int *pnrows, sync
16361643
int seq = dbutils_settings_get_int_value(data, CLOUDSYNC_KEY_CHECK_SEQ);
16371644
if (seq<0) {sqlite3_result_error(context, "Unable to retrieve seq.", -1); return -1;}
16381645

1646+
// Restart paging whenever the durable receive window changes: the page cursor
1647+
// is only meaningful within a single drain (check_dbversion held at "since").
1648+
if (netdata->check_cursor_since != db_version) {
1649+
netdata->check_cursor = 0;
1650+
netdata->check_cursor_since = db_version;
1651+
}
1652+
16391653
// Capture local db_version before download so we can query cloudsync_changes afterwards
16401654
int64_t prev_dbv = cloudsync_dbversion(data);
16411655

1656+
// "cursor" is the spool page to serve. Old/legacy servers ignore the unknown
1657+
// request field and omit it from the response; the client then never self-pages
1658+
// (check_cursor stays 0), preserving current behavior.
16421659
char json_payload[2024];
1643-
snprintf(json_payload, sizeof(json_payload), "{\"dbVersion\":%lld, \"seq\":%d}", (long long)db_version, seq);
1660+
snprintf(json_payload, sizeof(json_payload), "{\"dbVersion\":%lld, \"seq\":%d, \"cursor\":%lld}",
1661+
(long long)db_version, seq, (long long)netdata->check_cursor);
16441662

16451663
NETWORK_RESULT result = network_receive_buffer(netdata, netdata->check_endpoint, netdata->authentication, true, true, json_payload, cloudsync_check_headers, ARRAY_LEN(cloudsync_check_headers));
16461664
int rc = SQLITE_OK;
@@ -1663,20 +1681,36 @@ int cloudsync_network_check_internal(sqlite3_context *context, int *pnrows, sync
16631681
// (CHECKPOINT_NONE) so it never lands in
16641682
// the middle of a source db_version.
16651683
// The server serves successive chunks of the same stream across /check
1666-
// calls (the durable cursor stays at "since" until "final"), tracking
1667-
// chunk-level progress on its side; apply is idempotent, so a stop
1668-
// before "final" simply re-delivers the stream from "since".
1684+
// calls (the durable cursor stays at "since" until "final"). The server
1685+
// is stateless between calls, so it tells us the next spool page via
1686+
// "cursor" and we echo it back (netdata->check_cursor); apply is
1687+
// idempotent, so a stop before "final" simply re-delivers from "since".
16691688
int64_t watermark = json_extract_int(result.buffer, result.blen, "watermark", -1);
16701689
int64_t checkpoint_db_version;
16711690
int64_t checkpoint_seq = 0;
1691+
bool final_chunk = true;
16721692
if (watermark < 0) {
16731693
checkpoint_db_version = CLOUDSYNC_CHECKPOINT_LAST_APPLIED;
16741694
} else {
1675-
bool final_chunk = json_extract_bool(result.buffer, result.blen, "final", true);
1695+
final_chunk = json_extract_bool(result.buffer, result.blen, "final", true);
16761696
checkpoint_db_version = final_chunk ? watermark : CLOUDSYNC_CHECKPOINT_NONE;
16771697
}
1698+
// Optional next-page cursor. Absent on legacy/old servers (-1 sentinel)
1699+
// -> the client does not self-page.
1700+
int64_t next_cursor = json_extract_int(result.buffer, result.blen, "cursor", -1);
16781701
rc = network_download_changes(context, download_url, pnrows, err_out, checkpoint_db_version, checkpoint_seq);
16791702
cloudsync_memory_free(download_url);
1703+
if (rc == SQLITE_OK) {
1704+
// Advance only after the page is applied/staged, mirroring the durable
1705+
// cursor: a non-final chunk that carried a cursor pages forward;
1706+
// final or no-cursor ends the drain and resets paging to 0. On a
1707+
// download/apply failure we leave check_cursor unchanged so a retry
1708+
// re-requests the same page.
1709+
netdata->check_cursor = (next_cursor >= 0 && !final_chunk) ? next_cursor : 0;
1710+
}
1711+
} else {
1712+
// 202 / "up to date": no artifact in flight -> reset paging.
1713+
netdata->check_cursor = 0;
16801714
}
16811715
// failures.check may appear in either shape; extract opportunistically.
16821716
if (out) {

src/postgresql/cloudsync.sql.in

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,97 @@ RETURNS bytea
180180
AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob'
181181
LANGUAGE C IMMUTABLE;
182182

183+
-- Download spool (server-side /check chunk staging)
184+
--
185+
-- The /check endpoint runs on a separate host and reaches the tenant DB over a
186+
-- network driver, which materializes the whole result set. Streaming chunks
187+
-- directly with SELECT * FROM cloudsync_payload_chunks(...) would therefore pull
188+
-- every chunk into the server's memory at once. Instead the server fills a
189+
-- window's chunk stream once into this table and pages it out one chunk per call.
190+
--
191+
-- UNLOGGED: this is regenerable scratch state, so skip WAL. Created at install
192+
-- time (not lazily inside the function) to avoid the plpgsql create-then-use
193+
-- cached-plan pitfall.
194+
CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (
195+
stream_id text NOT NULL,
196+
chunk_index bigint NOT NULL,
197+
payload bytea NOT NULL,
198+
payload_size bigint NOT NULL,
199+
db_version_min bigint NOT NULL,
200+
db_version_max bigint NOT NULL,
201+
watermark bigint NOT NULL,
202+
is_final boolean NOT NULL DEFAULT false,
203+
created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint,
204+
PRIMARY KEY (stream_id, chunk_index)
205+
);
206+
207+
-- cloudsync_payload_spool_fill(stream_id, since, filter_site_id, exclude)
208+
-- Generate the whole chunk stream for a window once into cloudsync_payload_spool.
209+
-- Returns the number of chunks spooled. Idempotent: a prior complete fill is kept.
210+
-- Parameters are p_-prefixed so they never collide with the table's columns
211+
-- (an unqualified `stream_id` would otherwise be ambiguous with the parameter).
212+
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill(
213+
p_stream_id text,
214+
p_since_db_version bigint,
215+
p_filter_site_id bytea DEFAULT NULL,
216+
p_exclude_filter_site_id boolean DEFAULT false
217+
) RETURNS bigint AS $$
218+
DECLARE
219+
existing bigint;
220+
cnt bigint := 0;
221+
rec record;
222+
BEGIN
223+
-- Stale-GC of abandoned streams. fill runs once per stream (coarse-grained),
224+
-- so unlike per-fragment cleanup there is no O(n^2) risk and no throttle.
225+
DELETE FROM cloudsync_payload_spool
226+
WHERE stream_id IN (
227+
SELECT s.stream_id FROM cloudsync_payload_spool s
228+
GROUP BY s.stream_id
229+
HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400);
230+
231+
-- Idempotent: a prior complete fill stays as-is (fill is atomic, so rows
232+
-- present == complete stream).
233+
SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
234+
IF existing > 0 THEN
235+
RETURN existing;
236+
END IF;
237+
238+
-- Generate the stream one chunk at a time. A cursor FOR loop fetches in
239+
-- batches rather than materializing the whole SRF into a tuplestore.
240+
FOR rec IN
241+
SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version
242+
FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c
243+
LOOP
244+
INSERT INTO cloudsync_payload_spool
245+
(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final)
246+
VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size,
247+
rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false);
248+
cnt := cnt + 1;
249+
END LOOP;
250+
251+
IF cnt > 0 THEN
252+
UPDATE cloudsync_payload_spool SET is_final = true
253+
WHERE stream_id = p_stream_id
254+
AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x
255+
WHERE x.stream_id = p_stream_id);
256+
END IF;
257+
258+
RETURN cnt;
259+
END;
260+
$$ LANGUAGE plpgsql VOLATILE;
261+
262+
-- cloudsync_payload_spool_drop(stream_id) -> number of chunks removed.
263+
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text)
264+
RETURNS bigint AS $$
265+
DECLARE
266+
deleted bigint := 0;
267+
BEGIN
268+
DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
269+
GET DIAGNOSTICS deleted = ROW_COUNT;
270+
RETURN deleted;
271+
END;
272+
$$ LANGUAGE plpgsql VOLATILE;
273+
183274
-- Payload decoding and application
184275
CREATE OR REPLACE FUNCTION cloudsync_payload_decode(payload bytea)
185276
RETURNS integer

src/postgresql/migrations/cloudsync--1.0--1.1.sql

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,74 @@ CREATE OR REPLACE FUNCTION cloudsync_uuid_blob(uuid text)
3434
RETURNS bytea
3535
AS 'MODULE_PATHNAME', 'cloudsync_uuid_blob'
3636
LANGUAGE C IMMUTABLE;
37+
38+
-- Download spool: the /check path fills a window's chunk stream once and pages it
39+
-- out one chunk per call so the network driver never re-materializes the whole
40+
-- stream. See cloudsync.sql.in for the rationale.
41+
CREATE TABLE IF NOT EXISTS cloudsync_payload_spool (
42+
stream_id text NOT NULL,
43+
chunk_index bigint NOT NULL,
44+
payload bytea NOT NULL,
45+
payload_size bigint NOT NULL,
46+
db_version_min bigint NOT NULL,
47+
db_version_max bigint NOT NULL,
48+
watermark bigint NOT NULL,
49+
is_final boolean NOT NULL DEFAULT false,
50+
created_at bigint NOT NULL DEFAULT extract(epoch FROM now())::bigint,
51+
PRIMARY KEY (stream_id, chunk_index)
52+
);
53+
54+
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_fill(
55+
p_stream_id text,
56+
p_since_db_version bigint,
57+
p_filter_site_id bytea DEFAULT NULL,
58+
p_exclude_filter_site_id boolean DEFAULT false
59+
) RETURNS bigint AS $$
60+
DECLARE
61+
existing bigint;
62+
cnt bigint := 0;
63+
rec record;
64+
BEGIN
65+
DELETE FROM cloudsync_payload_spool
66+
WHERE stream_id IN (
67+
SELECT s.stream_id FROM cloudsync_payload_spool s
68+
GROUP BY s.stream_id
69+
HAVING max(s.created_at) < extract(epoch FROM now())::bigint - 86400);
70+
71+
SELECT count(*) INTO existing FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
72+
IF existing > 0 THEN
73+
RETURN existing;
74+
END IF;
75+
76+
FOR rec IN
77+
SELECT c.payload, c.chunk_index, c.payload_size, c.db_version_min, c.db_version_max, c.watermark_db_version
78+
FROM cloudsync_payload_chunks(p_since_db_version, p_filter_site_id, NULL, p_exclude_filter_site_id) c
79+
LOOP
80+
INSERT INTO cloudsync_payload_spool
81+
(stream_id, chunk_index, payload, payload_size, db_version_min, db_version_max, watermark, is_final)
82+
VALUES (p_stream_id, rec.chunk_index, rec.payload, rec.payload_size,
83+
rec.db_version_min, rec.db_version_max, rec.watermark_db_version, false);
84+
cnt := cnt + 1;
85+
END LOOP;
86+
87+
IF cnt > 0 THEN
88+
UPDATE cloudsync_payload_spool SET is_final = true
89+
WHERE stream_id = p_stream_id
90+
AND chunk_index = (SELECT max(x.chunk_index) FROM cloudsync_payload_spool x
91+
WHERE x.stream_id = p_stream_id);
92+
END IF;
93+
94+
RETURN cnt;
95+
END;
96+
$$ LANGUAGE plpgsql VOLATILE;
97+
98+
CREATE OR REPLACE FUNCTION cloudsync_payload_spool_drop(p_stream_id text)
99+
RETURNS bigint AS $$
100+
DECLARE
101+
deleted bigint := 0;
102+
BEGIN
103+
DELETE FROM cloudsync_payload_spool WHERE stream_id = p_stream_id;
104+
GET DIAGNOSTICS deleted = ROW_COUNT;
105+
RETURN deleted;
106+
END;
107+
$$ LANGUAGE plpgsql VOLATILE;

src/sql.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ extern const char * const SQL_PAYLOAD_FRAGMENTS_SELECT;
7474
extern const char * const SQL_PAYLOAD_FRAGMENTS_DELETE;
7575
extern const char * const SQL_PAYLOAD_FRAGMENTS_CLEANUP_STALE;
7676

77+
extern const char * const SQL_PAYLOAD_SPOOL_CREATE_TABLE;
78+
extern const char * const SQL_PAYLOAD_SPOOL_COUNT;
79+
extern const char * const SQL_PAYLOAD_SPOOL_FILL_INSERT;
80+
extern const char * const SQL_PAYLOAD_SPOOL_MARK_FINAL;
81+
extern const char * const SQL_PAYLOAD_SPOOL_DELETE;
82+
extern const char * const SQL_PAYLOAD_SPOOL_CLEANUP_STALE;
83+
7784
// BLOCKS (block-level LWW)
7885
extern const char * const SQL_BLOCKS_CREATE_TABLE;
7986
extern const char * const SQL_BLOCKS_UPSERT;

0 commit comments

Comments
 (0)