Skip to content

Commit 6e689cc

Browse files
committed
Added support for schema
1 parent ff4966b commit 6e689cc

File tree

11 files changed

+409
-164
lines changed

11 files changed

+409
-164
lines changed

src/cloudsync.c

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@
6363

6464
#define DEBUG_DBERROR(_rc, _fn, _data) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
6565

66+
#if CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
67+
bool schema_hash_disabled = true;
68+
#endif
69+
6670
typedef enum {
6771
CLOUDSYNC_PK_INDEX_TBL = 0,
6872
CLOUDSYNC_PK_INDEX_PK = 1,
@@ -131,6 +135,9 @@ struct cloudsync_context {
131135
// used to set an order inside each transaction
132136
int seq;
133137

138+
// optional schema_name to be set in the cloudsync_table_context
139+
char *current_schema;
140+
134141
// augmented tables are stored in-memory so we do not need to retrieve information about
135142
// col_names and cid from the disk each time a write statement is performed
136143
// we do also not need to use an hash map here because for few tables the direct
@@ -145,6 +152,9 @@ struct cloudsync_context {
145152
struct cloudsync_table_context {
146153
table_algo algo; // CRDT algoritm associated to the table
147154
char *name; // table name
155+
char *schema; // table schema
156+
char *meta_ref; // schema-qualified meta table name (e.g. "schema"."name_cloudsync")
157+
char *base_ref; // schema-qualified base table name (e.g. "schema"."name")
148158
char **col_name; // array of column names
149159
dbvm_t **col_merge_stmt; // array of merge insert stmt (indexed by col_name)
150160
dbvm_t **col_value_stmt; // array of column value stmt (indexed by col_name)
@@ -552,6 +562,16 @@ void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
552562
data->aux_data = xdata;
553563
}
554564

565+
void cloudsync_set_schema (cloudsync_context *data, const char *schema) {
566+
if (data->current_schema) cloudsync_memory_free(data->current_schema);
567+
data->current_schema = NULL;
568+
if (schema) data->current_schema = cloudsync_string_dup_lowercase(schema);
569+
}
570+
571+
const char *cloudsync_schema (cloudsync_context *data) {
572+
return data->current_schema;
573+
}
574+
555575
// MARK: - Table Utils -
556576

557577
void table_pknames_free (char **names, int nrows) {
@@ -618,10 +638,13 @@ cloudsync_table_context *table_create (cloudsync_context *data, const char *name
618638
table->context = data;
619639
table->algo = algo;
620640
table->name = cloudsync_string_dup_lowercase(name);
641+
table->schema = (data->current_schema) ? cloudsync_string_dup(data->current_schema) : NULL;
621642
if (!table->name) {
622643
cloudsync_memory_free(table);
623644
return NULL;
624645
}
646+
table->meta_ref = database_build_meta_ref(table->schema, table->name);
647+
table->base_ref = database_build_base_ref(table->schema, table->name);
625648
table->enabled = true;
626649

627650
return table;
@@ -656,6 +679,9 @@ void table_free (cloudsync_table_context *table) {
656679
}
657680

658681
if (table->name) cloudsync_memory_free(table->name);
682+
if (table->schema) cloudsync_memory_free(table->schema);
683+
if (table->meta_ref) cloudsync_memory_free(table->meta_ref);
684+
if (table->base_ref) cloudsync_memory_free(table->base_ref);
659685
if (table->pk_name) table_pknames_free(table->pk_name, table->npks);
660686
if (table->meta_pkexists_stmt) databasevm_finalize(table->meta_pkexists_stmt);
661687
if (table->meta_sentinel_update_stmt) databasevm_finalize(table->meta_sentinel_update_stmt);
@@ -689,16 +715,16 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
689715
// precompile the pk exists statement
690716
// we do not need an index on the pk column because it is already covered by the fact that it is part of the prikeys
691717
// EXPLAIN QUERY PLAN reports: SEARCH table_name USING PRIMARY KEY (pk=?)
692-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->name);
718+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_ROW_EXISTS_BY_PK, table->meta_ref);
693719
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
694720
DEBUG_SQL("meta_pkexists_stmt: %s", sql);
695-
721+
696722
rc = databasevm_prepare(data, sql, (void **)&table->meta_pkexists_stmt, DBFLAG_PERSISTENT);
697723
cloudsync_memory_free(sql);
698724
if (rc != DBRES_OK) goto cleanup;
699-
725+
700726
// precompile the update local sentinel statement
701-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
727+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPDATE_COL_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
702728
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
703729
DEBUG_SQL("meta_sentinel_update_stmt: %s", sql);
704730

@@ -707,7 +733,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
707733
if (rc != DBRES_OK) goto cleanup;
708734

709735
// precompile the insert local sentinel statement
710-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
736+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_COL_INIT_OR_BUMP_VERSION, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
711737
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
712738
DEBUG_SQL("meta_sentinel_insert_stmt: %s", sql);
713739

@@ -716,7 +742,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
716742
if (rc != DBRES_OK) goto cleanup;
717743

718744
// precompile the insert/update local row statement
719-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->name, table->name);
745+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_UPSERT_RAW_COLVERSION, table->meta_ref, table->meta_ref);
720746
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
721747
DEBUG_SQL("meta_row_insert_update_stmt: %s", sql);
722748

@@ -725,10 +751,10 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
725751
if (rc != DBRES_OK) goto cleanup;
726752

727753
// precompile the delete rows from meta
728-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
754+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
729755
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
730756
DEBUG_SQL("meta_row_drop_stmt: %s", sql);
731-
757+
732758
rc = databasevm_prepare(data, sql, (void **)&table->meta_row_drop_stmt, DBFLAG_PERSISTENT);
733759
cloudsync_memory_free(sql);
734760
if (rc != DBRES_OK) goto cleanup;
@@ -744,7 +770,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
744770
if (rc != DBRES_OK) goto cleanup;
745771

746772
// local cl
747-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->name, CLOUDSYNC_TOMBSTONE_VALUE, table->name);
773+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GET_COL_VERSION_OR_ROW_EXISTS, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, table->meta_ref);
748774
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
749775
DEBUG_SQL("meta_local_cl_stmt: %s", sql);
750776

@@ -753,24 +779,24 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
753779
if (rc != DBRES_OK) goto cleanup;
754780

755781
// rowid of the last inserted/updated row in the meta table
756-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->name);
782+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_RETURN_CHANGE_ID, table->meta_ref);
757783
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
758784
DEBUG_SQL("meta_winner_clock_stmt: %s", sql);
759785

760786
rc = databasevm_prepare(data, sql, (void **)&table->meta_winner_clock_stmt, DBFLAG_PERSISTENT);
761787
cloudsync_memory_free(sql);
762788
if (rc != DBRES_OK) goto cleanup;
763789

764-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
790+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
765791
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
766792
DEBUG_SQL("meta_merge_delete_drop: %s", sql);
767-
793+
768794
rc = databasevm_prepare(data, sql, (void **)&table->meta_merge_delete_drop, DBFLAG_PERSISTENT);
769795
cloudsync_memory_free(sql);
770796
if (rc != DBRES_OK) goto cleanup;
771797

772798
// zero clock
773-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
799+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_TOMBSTONE_PK_EXCEPT_COL, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE);
774800
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
775801
DEBUG_SQL("meta_zero_clock_stmt: %s", sql);
776802

@@ -779,7 +805,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
779805
if (rc != DBRES_OK) goto cleanup;
780806

781807
// col_version
782-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->name);
808+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_COL_VERSION_BY_PK_COL, table->meta_ref);
783809
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
784810
DEBUG_SQL("meta_col_version_stmt: %s", sql);
785811

@@ -788,7 +814,7 @@ int table_add_stmts (cloudsync_table_context *table, int ncols) {
788814
if (rc != DBRES_OK) goto cleanup;
789815

790816
// site_id
791-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->name);
817+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_SITE_ID_BY_PK_COL, table->meta_ref);
792818
if (!sql) {rc = DBRES_NOMEM; goto cleanup;}
793819
DEBUG_SQL("meta_site_id_stmt: %s", sql);
794820

@@ -1046,6 +1072,10 @@ bool table_algo_isgos (cloudsync_table_context *table) {
10461072
return (table->algo == table_algo_crdt_gos);
10471073
}
10481074

1075+
const char *table_schema (cloudsync_table_context *table) {
1076+
return table->schema;
1077+
}
1078+
10491079
// MARK: - Merge Insert -
10501080

10511081
int64_t merge_get_local_cl (cloudsync_table_context *table, const char *pk, int pklen) {
@@ -1512,6 +1542,11 @@ void cloudsync_sync_key (cloudsync_context *data, const char *key, const char *v
15121542
if (value && (value[0] != 0) && (value[0] != '0')) data->debug = 1;
15131543
return;
15141544
}
1545+
1546+
if (strcmp(key, "schema") == 0) {
1547+
cloudsync_set_schema(data, value);
1548+
return;
1549+
}
15151550
}
15161551

15171552
#if 0
@@ -1627,7 +1662,7 @@ int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *
16271662

16281663
if (pk_diff) {
16291664
// drop meta-table, it will be recreated
1630-
char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->name);
1665+
char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
16311666
rc = database_exec(data, sql);
16321667
cloudsync_memory_free(sql);
16331668
if (rc != DBRES_OK) {
@@ -1637,7 +1672,7 @@ int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *
16371672
} else {
16381673
// compact meta-table
16391674
// delete entries for removed columns
1640-
char *sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_COLS_NOT_IN_SCHEMA_OR_PKCOL, table->name, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
1675+
char *sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_DELETE_COLS_NOT_IN_SCHEMA_OR_PKCOL, table->meta_ref, table->name, CLOUDSYNC_TOMBSTONE_VALUE);
16411676
rc = database_exec(data, sql);
16421677
cloudsync_memory_free(sql);
16431678
if (rc != DBRES_OK) {
@@ -1657,7 +1692,7 @@ int cloudsync_finalize_alter (cloudsync_context *data, cloudsync_table_context *
16571692
char *pkvalues = (pkclause) ? pkclause : "rowid";
16581693

16591694
// delete entries related to rows that no longer exist in the original table, but preserve tombstone
1660-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->name, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->name, table->name, pkvalues);
1695+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_GC_DELETE_ORPHANED_PK, table->meta_ref, CLOUDSYNC_TOMBSTONE_VALUE, CLOUDSYNC_TOMBSTONE_VALUE, table->base_ref, table->meta_ref, pkvalues);
16611696
rc = database_exec(data, sql);
16621697
if (pkclause) cloudsync_memory_free(pkclause);
16631698
cloudsync_memory_free(sql);
@@ -1748,7 +1783,7 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
17481783
if (rc != DBRES_OK) goto finalize;
17491784
char *pkdecodeval = (pkdecode) ? pkdecode : "cloudsync_pk_decode(pk, 1) AS rowid";
17501785

1751-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_MISSING_PKS_FROM_BASE_EXCEPT_SYNC, table_name, pkvalues_identifiers, pkvalues_identifiers, table_name, pkdecodeval, table_name);
1786+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_INSERT_MISSING_PKS_FROM_BASE_EXCEPT_SYNC, table_name, pkvalues_identifiers, pkvalues_identifiers, table->base_ref, pkdecodeval, table->meta_ref);
17521787
rc = database_exec(data, sql);
17531788
cloudsync_memory_free(sql);
17541789
if (rc != DBRES_OK) goto finalize;
@@ -1758,7 +1793,7 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
17581793
// The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
17591794
// The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.
17601795

1761-
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table_name, table_name);
1796+
sql = cloudsync_memory_mprintf(SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL, pkvalues_identifiers, table->base_ref, table->meta_ref);
17621797
rc = databasevm_prepare(data, sql, (void **)&vm, DBFLAG_PERSISTENT);
17631798
cloudsync_memory_free(sql);
17641799
if (rc != DBRES_OK) goto finalize;
@@ -2521,7 +2556,7 @@ int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context
25212556

25222557
// drop meta-table
25232558
const char *table_name = table->name;
2524-
char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table_name);
2559+
char *sql = cloudsync_memory_mprintf(SQL_DROP_CLOUDSYNC_TABLE, table->meta_ref);
25252560
int rc = database_exec(data, sql);
25262561
cloudsync_memory_free(sql);
25272562
if (rc != DBRES_OK) {
@@ -2595,11 +2630,13 @@ int cloudsync_terminate (cloudsync_context *data) {
25952630
if (data->data_version_stmt) databasevm_finalize(data->data_version_stmt);
25962631
if (data->db_version_stmt) databasevm_finalize(data->db_version_stmt);
25972632
if (data->getset_siteid_stmt) databasevm_finalize(data->getset_siteid_stmt);
2633+
if (data->current_schema) cloudsync_memory_free(data->current_schema);
25982634

25992635
data->schema_version_stmt = NULL;
26002636
data->data_version_stmt = NULL;
26012637
data->db_version_stmt = NULL;
26022638
data->getset_siteid_stmt = NULL;
2639+
data->current_schema = NULL;
26032640

26042641
// reset the site_id so the cloudsync_context_init will be executed again
26052642
// if any other cloudsync function is called after terminate

src/cloudsync.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
extern "C" {
1818
#endif
1919

20-
#define CLOUDSYNC_VERSION "0.9.6"
20+
#define CLOUDSYNC_VERSION "0.9.7"
2121
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2222

2323
#define CLOUDSYNC_VALUE_NOTSET -1
@@ -77,6 +77,8 @@ int cloudsync_errcode (cloudsync_context *data);
7777
void cloudsync_reset_error (cloudsync_context *data);
7878
int cloudsync_commit_hook (void *ctx);
7979
void cloudsync_rollback_hook (void *ctx);
80+
void cloudsync_set_schema (cloudsync_context *data, const char *schema);
81+
const char *cloudsync_schema (cloudsync_context *data);
8082

8183
// Payload
8284
int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int blen, int *nrows);
@@ -100,6 +102,7 @@ const char *table_colname (cloudsync_table_context *table, int index);
100102
char **table_pknames (cloudsync_table_context *table);
101103
void table_set_pknames (cloudsync_table_context *table, char **pknames);
102104
bool table_algo_isgos (cloudsync_table_context *table);
105+
const char *table_schema (cloudsync_table_context *table);
103106
int table_remove (cloudsync_context *data, cloudsync_table_context *table);
104107
void table_free (cloudsync_table_context *table);
105108

src/database.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ char *sql_build_upsert_pk_and_col (cloudsync_context *data, const char *table_na
144144
char *sql_build_select_cols_by_pk (cloudsync_context *data, const char *table_name, const char *colname);
145145
char *sql_build_rekey_pk_and_reset_version_except_col (cloudsync_context *data, const char *table_name, const char *except_col);
146146

147+
char *database_build_meta_ref(const char *schema, const char *table_name);
148+
char *database_build_base_ref(const char *schema, const char *table_name);
149+
147150
// USED ONLY by SQLite Cloud to implement RLS
148151
typedef struct cloudsync_pk_decode_bind_context cloudsync_pk_decode_bind_context;
149152
typedef bool (*cloudsync_payload_apply_callback_t)(void **xdata, cloudsync_pk_decode_bind_context *decoded_change, void *db, void *data, int step, int rc);

src/postgresql/cloudsync--1.0.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,3 +258,16 @@ CREATE OR REPLACE TRIGGER cloudsync_changes_insert
258258
INSTEAD OF INSERT ON cloudsync_changes
259259
FOR EACH ROW
260260
EXECUTE FUNCTION cloudsync_changes_insert_trigger();
261+
262+
-- Set current schema name
263+
CREATE OR REPLACE FUNCTION cloudsync_set_schema(schema text)
264+
RETURNS boolean
265+
AS 'MODULE_PATHNAME', 'pg_cloudsync_set_schema'
266+
LANGUAGE C VOLATILE;
267+
268+
-- Get current schema name (if any)
269+
CREATE OR REPLACE FUNCTION cloudsync_schema()
270+
RETURNS text
271+
AS 'MODULE_PATHNAME', 'pg_cloudsync_schema'
272+
LANGUAGE C VOLATILE;
273+

0 commit comments

Comments
 (0)