Skip to content

Commit 9ea552f

Browse files
committed
Merge branch 'main' into feat/batch-merge-and-rls
2 parents b063154 + 1da92bd commit 9ea552f

File tree

13 files changed

+322
-133
lines changed

13 files changed

+322
-133
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ jniLibs/
5050
.DS_Store
5151
Thumbs.db
5252
CLAUDE.md
53+
*.o

src/cloudsync.c

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ struct cloudsync_context {
146146
void *aux_data;
147147

148148
// stmts and context values
149-
bool pragma_checked; // we need to check PRAGMAs only once per transaction
150149
dbvm_t *schema_version_stmt;
151150
dbvm_t *data_version_stmt;
152151
dbvm_t *db_version_stmt;
@@ -289,13 +288,15 @@ const char *cloudsync_algo_name (table_algo algo) {
289288
// MARK: - DBVM Utils -
290289

291290
DBVM_VALUE dbvm_execute (dbvm_t *stmt, cloudsync_context *data) {
291+
if (!stmt) return DBVM_VALUE_ERROR;
292+
292293
int rc = databasevm_step(stmt);
293294
if (rc != DBRES_ROW && rc != DBRES_DONE) {
294295
if (data) DEBUG_DBERROR(rc, "stmt_execute", data);
295296
databasevm_reset(stmt);
296297
return DBVM_VALUE_ERROR;
297298
}
298-
299+
299300
DBVM_VALUE result = DBVM_VALUE_CHANGED;
300301
if (stmt == data->data_version_stmt) {
301302
int version = (int)database_column_int(stmt, 0);
@@ -399,12 +400,17 @@ int cloudsync_dbversion_rebuild (cloudsync_context *data) {
399400
int cloudsync_dbversion_rerun (cloudsync_context *data) {
400401
DBVM_VALUE schema_changed = dbvm_execute(data->schema_version_stmt, data);
401402
if (schema_changed == DBVM_VALUE_ERROR) return -1;
402-
403+
403404
if (schema_changed == DBVM_VALUE_CHANGED) {
404405
int rc = cloudsync_dbversion_rebuild(data);
405406
if (rc != DBRES_OK) return -1;
406407
}
407-
408+
409+
if (!data->db_version_stmt) {
410+
data->db_version = CLOUDSYNC_MIN_DB_VERSION;
411+
return 0;
412+
}
413+
408414
DBVM_VALUE rc = dbvm_execute(data->db_version_stmt, data);
409415
if (rc == DBVM_VALUE_ERROR) return -1;
410416
return 0;
@@ -593,7 +599,7 @@ void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
593599
}
594600

595601
void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
596-
if (data->current_schema == schema) return;
602+
if (data->current_schema && schema && strcmp(data->current_schema, schema) == 0) return;
597603
if (data->current_schema) cloudsync_memory_free(data->current_schema);
598604
data->current_schema = NULL;
599605
if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
@@ -782,7 +788,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
782788
if (rc != DBRES_OK) goto cleanup;
783789

784790
// precompile the insert local sentinel statement
785-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
791+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref, table->meta_ref, table->meta_ref);
786792
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
787793
DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
788794

@@ -954,37 +960,44 @@ int table_remove (cloudsync_context *data, cloudsync_table_context *table) {
954960
int table_add_to_context_cb (void *xdata, int ncols, char **values, char **names) {
955961
cloudsync_table_context *table = (cloudsync_table_context *)xdata;
956962
cloudsync_context *data = table->context;
957-
963+
958964
int index = table->ncols;
959965
for (int i=0; i<ncols; i+=2) {
960966
const char *name = values[i];
961967
int cid = (int)strtol(values[i+1], NULL, 0);
962-
968+
963969
table->col_id[index] = cid;
964970
table->col_name[index] = cloudsync_string_dup_lowercase(name);
965-
if (!table->col_name[index]) return 1;
966-
971+
if (!table->col_name[index]) goto error;
972+
967973
char *sql = table_build_mergeinsert_sql(table, name);
968-
if (!sql) return DBRES_NOMEM;
974+
if (!sql) goto error;
969975
DEBUG_SQL("col_merge_stmt[%d]: %s", index, sql);
970-
976+
971977
int rc = databasevm_prepare(data, sql, (void **)&table->col_merge_stmt[index], DBFLAG_PERSISTENT);
972978
cloudsync_memory_free(sql);
973-
if (rc != DBRES_OK) return rc;
974-
if (!table->col_merge_stmt[index]) return DBRES_MISUSE;
975-
979+
if (rc != DBRES_OK) goto error;
980+
if (!table->col_merge_stmt[index]) goto error;
981+
976982
sql = table_build_value_sql(table, name);
977-
if (!sql) return DBRES_NOMEM;
983+
if (!sql) goto error;
978984
DEBUG_SQL("col_value_stmt[%d]: %s", index, sql);
979-
985+
980986
rc = databasevm_prepare(data, sql, (void **)&table->col_value_stmt[index], DBFLAG_PERSISTENT);
981987
cloudsync_memory_free(sql);
982-
if (rc != DBRES_OK) return rc;
983-
if (!table->col_value_stmt[index]) return DBRES_MISUSE;
988+
if (rc != DBRES_OK) goto error;
989+
if (!table->col_value_stmt[index]) goto error;
984990
}
985991
table->ncols += 1;
986-
992+
987993
return 0;
994+
995+
error:
996+
// clean up partially-initialized entry at index
997+
if (table->col_name[index]) {cloudsync_memory_free(table->col_name[index]); table->col_name[index] = NULL;}
998+
if (table->col_merge_stmt[index]) {databasevm_finalize(table->col_merge_stmt[index]); table->col_merge_stmt[index] = NULL;}
999+
if (table->col_value_stmt[index]) {databasevm_finalize(table->col_value_stmt[index]); table->col_value_stmt[index] = NULL;}
1000+
return 1;
9881001
}
9891002

9901003
bool table_ensure_capacity (cloudsync_context *data) {
@@ -1026,7 +1039,7 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
10261039
table->npks = count;
10271040
if (table->npks == 0) {
10281041
#if CLOUDSYNC_DISABLE_ROWIDONLY_TABLES
1029-
return false;
1042+
goto abort_add_table;
10301043
#else
10311044
table->rowid_only = true;
10321045
table->npks = 1; // rowid
@@ -1073,7 +1086,8 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
10731086

10741087
dbvm_t *cloudsync_colvalue_stmt (cloudsync_context *data, const char *tbl_name, bool *persistent) {
10751088
dbvm_t *vm = NULL;
1076-
1089+
*persistent = false;
1090+
10771091
cloudsync_table_context *table = table_lookup(data, tbl_name);
10781092
if (table) {
10791093
char *col_name = NULL;
@@ -1116,7 +1130,7 @@ const char *table_colname (cloudsync_table_context *table, int index) {
11161130
bool table_pk_exists (cloudsync_table_context *table, const char *value, size_t len) {
11171131
// check if a row with the same primary key already exists
11181132
// if so, this means the row might have been previously deleted (sentinel)
1119-
return (bool)dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB);
1133+
return (dbvm_count(table->meta_pkexists_stmt, value, len, DBTYPE_BLOB) > 0);
11201134
}
11211135

11221136
char **table_pknames (cloudsync_table_context *table) {
@@ -1642,6 +1656,10 @@ int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table,
16421656
rc = databasevm_step(vm);
16431657
if (rc == DBRES_ROW) {
16441658
const void *local_site_id = database_column_blob(vm, 0);
1659+
if (!local_site_id) {
1660+
dbvm_reset(vm);
1661+
return cloudsync_set_error(data, "NULL site_id in cloudsync table, table is probably corrupted", DBRES_ERROR);
1662+
}
16451663
ret = memcmp(site_id, local_site_id, site_len);
16461664
*didwin_flag = (ret > 0);
16471665
dbvm_reset(vm);
@@ -2222,6 +2240,7 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
22222240
rc = databasevm_step(vm);
22232241
if (rc == DBRES_ROW) {
22242242
const char *pk = (const char *)database_column_text(vm, 0);
2243+
if (!pk) { rc = DBRES_ERROR; break; }
22252244
size_t pklen = strlen(pk);
22262245
rc = local_mark_insert_or_update_meta(table, pk, pklen, col_name, db_version, cloudsync_bumpseq(data));
22272246
} else if (rc == DBRES_DONE) {
@@ -2761,11 +2780,8 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
27612780
if (rc != DBRES_OK) {
27622781
merge_pending_free_entries(&batch);
27632782
data->pending_batch = NULL;
2764-
if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
2765-
if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
2766-
if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
2767-
if (clone) cloudsync_memory_free(clone);
2768-
return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
2783+
cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to release a savepoint", rc);
2784+
goto cleanup;
27692785
}
27702786
in_savepoint = false;
27712787
}
@@ -2775,11 +2791,8 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
27752791
if (rc != DBRES_OK) {
27762792
merge_pending_free_entries(&batch);
27772793
data->pending_batch = NULL;
2778-
if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
2779-
if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
2780-
if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
2781-
if (clone) cloudsync_memory_free(clone);
2782-
return cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
2794+
cloudsync_set_error(data, "Error on cloudsync_payload_apply: unable to start a transaction", rc);
2795+
goto cleanup;
27832796
}
27842797
in_savepoint = true;
27852798
}
@@ -2812,9 +2825,6 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
28122825
if (flush_rc != DBRES_OK && rc == DBRES_OK) rc = flush_rc;
28132826
}
28142827
data->pending_batch = NULL;
2815-
if (batch.cached_vm) databasevm_finalize(batch.cached_vm);
2816-
if (batch.cached_col_names) cloudsync_memory_free(batch.cached_col_names);
2817-
if (batch.entries) cloudsync_memory_free(batch.entries);
28182828

28192829
if (in_savepoint) {
28202830
int rc1 = database_commit_savepoint(data, "cloudsync_payload_apply");
@@ -2841,6 +2851,11 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
28412851
}
28422852

28432853
cleanup:
2854+
// cleanup merge_pending_batch
2855+
if (batch.cached_vm) { databasevm_finalize(batch.cached_vm); batch.cached_vm = NULL; }
2856+
if (batch.cached_col_names) { cloudsync_memory_free(batch.cached_col_names); batch.cached_col_names = NULL; }
2857+
if (batch.entries) { cloudsync_memory_free(batch.entries); batch.entries = NULL; }
2858+
28442859
// cleanup vm
28452860
if (vm) databasevm_finalize(vm);
28462861

@@ -2876,7 +2891,7 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
28762891
if (rc != DBRES_OK) return rc;
28772892

28782893
// exit if there is no data to send
2879-
if (blob == NULL || *blob_size == 0) return DBRES_OK;
2894+
if (*blob == NULL || *blob_size == 0) return DBRES_OK;
28802895
return rc;
28812896
}
28822897

src/cloudsync.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.110"
20+
#define CLOUDSYNC_VERSION "0.9.111"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1

src/dbutils.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -411,14 +411,16 @@ int dbutils_settings_init (cloudsync_context *data) {
411411
if (rc != DBRES_OK) return rc;
412412

413413
// library version
414-
char sql[1024];
415-
snprintf(sql, sizeof(sql), SQL_INSERT_SETTINGS_STR_FORMAT, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
414+
char *sql = cloudsync_memory_mprintf(SQL_INSERT_SETTINGS_STR_FORMAT, CLOUDSYNC_KEY_LIBVERSION, CLOUDSYNC_VERSION);
415+
if (!sql) return DBRES_NOMEM;
416416
rc = database_exec(data, sql);
417+
cloudsync_memory_free(sql);
417418
if (rc != DBRES_OK) return rc;
418-
419+
419420
// schema version
420-
snprintf(sql, sizeof(sql), SQL_INSERT_SETTINGS_INT_FORMAT, CLOUDSYNC_KEY_SCHEMAVERSION, (long long)database_schema_version(data));
421-
rc = database_exec(data, sql);
421+
char sql_int[1024];
422+
snprintf(sql_int, sizeof(sql_int), SQL_INSERT_SETTINGS_INT_FORMAT, CLOUDSYNC_KEY_SCHEMAVERSION, (long long)database_schema_version(data));
423+
rc = database_exec(data, sql_int);
422424
if (rc != DBRES_OK) return rc;
423425
}
424426

src/network.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,7 @@ void cloudsync_network_logout (sqlite3_context *context, int argc, sqlite3_value
942942
}
943943

944944
// run everything in a savepoint
945-
rc = database_begin_savepoint(data, "cloudsync_logout_savepoint;");
945+
rc = database_begin_savepoint(data, "cloudsync_logout_savepoint");
946946
if (rc != SQLITE_OK) {
947947
errmsg = cloudsync_memory_mprintf("Unable to create cloudsync_logout savepoint %s", cloudsync_errmsg(data));
948948
goto finalize;

src/postgresql/cloudsync_postgresql.c

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ Datum pg_cloudsync_terminate (PG_FUNCTION_ARGS) {
473473
PG_END_TRY();
474474

475475
if (spi_connected) SPI_finish();
476-
PG_RETURN_INT32(rc);
476+
PG_RETURN_BOOL(rc == DBRES_OK);
477477
}
478478

479479
// MARK: - Settings Functions -
@@ -820,8 +820,7 @@ Datum cloudsync_payload_encode_transfn (PG_FUNCTION_ARGS) {
820820
// Get or allocate aggregate state
821821
if (PG_ARGISNULL(0)) {
822822
MemoryContext oldContext = MemoryContextSwitchTo(aggContext);
823-
payload = (cloudsync_payload_context *)cloudsync_memory_alloc(cloudsync_payload_context_size(NULL));
824-
memset(payload, 0, cloudsync_payload_context_size(NULL));
823+
payload = (cloudsync_payload_context *)palloc0(cloudsync_payload_context_size(NULL));
825824
MemoryContextSwitchTo(oldContext);
826825
} else {
827826
payload = (cloudsync_payload_context *)PG_GETARG_POINTER(0);
@@ -1819,13 +1818,16 @@ static Oid get_column_oid(const char *schema, const char *table_name, const char
18191818
pfree(DatumGetPointer(values[1]));
18201819
if (schema) pfree(DatumGetPointer(values[2]));
18211820

1822-
if (ret != SPI_OK_SELECT || SPI_processed == 0) return InvalidOid;
1821+
if (ret != SPI_OK_SELECT || SPI_processed == 0) {
1822+
if (SPI_tuptable) SPI_freetuptable(SPI_tuptable);
1823+
return InvalidOid;
1824+
}
18231825

18241826
bool isnull;
18251827
Datum col_oid = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull);
1826-
if (isnull) return InvalidOid;
1827-
1828-
return DatumGetObjectId(col_oid);
1828+
Oid result = isnull ? InvalidOid : DatumGetObjectId(col_oid);
1829+
SPI_freetuptable(SPI_tuptable);
1830+
return result;
18291831
}
18301832

18311833
// Decode encoded bytea into a pgvalue_t with the decoded base type.
@@ -1958,23 +1960,34 @@ Datum cloudsync_col_value(PG_FUNCTION_ARGS) {
19581960
}
19591961

19601962
// execute vm
1961-
Datum d = (Datum)0;
19621963
int rc = databasevm_step(vm);
19631964
if (rc == DBRES_DONE) {
1964-
rc = DBRES_OK;
1965-
PG_RETURN_CSTRING(CLOUDSYNC_RLS_RESTRICTED_VALUE);
1965+
databasevm_reset(vm);
1966+
// row not found (RLS or genuinely missing) — return the RLS sentinel as bytea
1967+
const char *rls = CLOUDSYNC_RLS_RESTRICTED_VALUE;
1968+
size_t rls_len = strlen(rls);
1969+
bytea *result = (bytea *)palloc(VARHDRSZ + rls_len);
1970+
SET_VARSIZE(result, VARHDRSZ + rls_len);
1971+
memcpy(VARDATA(result), rls, rls_len);
1972+
PG_RETURN_BYTEA_P(result);
19661973
} else if (rc == DBRES_ROW) {
1967-
// store value result
1968-
rc = DBRES_OK;
1969-
d = database_column_datum(vm, 0);
1970-
}
1971-
1972-
if (rc != DBRES_OK) {
1974+
// copy value before reset invalidates SPI tuple memory
1975+
const void *blob = database_column_blob(vm, 0);
1976+
int blob_len = database_column_bytes(vm, 0);
1977+
bytea *result = NULL;
1978+
if (blob && blob_len > 0) {
1979+
result = (bytea *)palloc(VARHDRSZ + blob_len);
1980+
SET_VARSIZE(result, VARHDRSZ + blob_len);
1981+
memcpy(VARDATA(result), blob, blob_len);
1982+
}
19731983
databasevm_reset(vm);
1974-
ereport(ERROR, (errmsg("cloudsync_col_value error: %s", cloudsync_errmsg(data))));
1984+
if (result) PG_RETURN_BYTEA_P(result);
1985+
PG_RETURN_NULL();
19751986
}
1987+
19761988
databasevm_reset(vm);
1977-
PG_RETURN_DATUM(d);
1989+
ereport(ERROR, (errmsg("cloudsync_col_value error: %s", cloudsync_errmsg(data))));
1990+
PG_RETURN_NULL(); // unreachable, silences compiler
19781991
}
19791992

19801993
// Track SRF execution state across calls

0 commit comments

Comments
 (0)