@@ -113,7 +113,8 @@ struct cloudsync_context {
113113 int debug ;
114114 bool merge_equal_values ;
115115 void * aux_data ;
116-
116+ int step_depth ;
117+
117118 // stmts and context values
118119 dbvm_t * schema_version_stmt ;
119120 dbvm_t * data_version_stmt ;
@@ -430,6 +431,23 @@ int64_t cloudsync_pk_context_dbversion (cloudsync_pk_decode_bind_context *ctx) {
430431 return ctx -> db_version ;
431432}
432433
434+ int64_t cloudsync_pk_context_colversion (cloudsync_pk_decode_bind_context * ctx ) {
435+ return ctx -> col_version ;
436+ }
437+
438+ int64_t cloudsync_pk_context_seq (cloudsync_pk_decode_bind_context * ctx ) {
439+ return ctx -> seq ;
440+ }
441+
442+ void * cloudsync_pk_context_siteid (cloudsync_pk_decode_bind_context * ctx , int64_t * siteid_len ) {
443+ * siteid_len = ctx -> site_id_len ;
444+ return (void * )ctx -> site_id ;
445+ }
446+
447+ dbvm_t * cloudsync_pk_context_vm (cloudsync_pk_decode_bind_context * ctx ) {
448+ return ctx -> vm ;
449+ }
450+
433451// MARK: - CloudSync Context -
434452
435453int cloudsync_insync (cloudsync_context * data ) {
@@ -564,6 +582,14 @@ void cloudsync_set_auxdata (cloudsync_context *data, void *xdata) {
564582 data -> aux_data = xdata ;
565583}
566584
585+ int cloudsync_step_depth (cloudsync_context * data ) {
586+ return data -> step_depth ;
587+ }
588+
589+ void cloudsync_set_step_depth (cloudsync_context * data , int depth ) {
590+ data -> step_depth = depth ;
591+ }
592+
567593void cloudsync_set_schema (cloudsync_context * data , const char * schema ) {
568594 if (data -> current_schema && schema && strcmp (data -> current_schema , schema ) == 0 ) return ;
569595 if (data -> current_schema ) cloudsync_memory_free (data -> current_schema );
@@ -1011,6 +1037,11 @@ bool table_add_to_context (cloudsync_context *data, table_algo algo, const char
10111037 table -> npks = 1 ; // rowid
10121038 #endif
10131039 }
1040+
1041+ // NOTE: pk_name array is populated lazily (e.g. in DuckDB's
1042+ // BuildChangesSelectSQL) rather than here, because table_add_to_context
1043+ // can be called from database_exec_callback (settings load) where
1044+ // issuing another query on the same connection would recurse.
10141045
10151046 int ncols = database_count_nonpk (data , table_name , table -> schema );
10161047 if (ncols < 0 ) {cloudsync_set_dberror (data ); goto abort_add_table ;}
@@ -1093,6 +1124,23 @@ const char *table_colname (cloudsync_table_context *table, int index) {
10931124 return table -> col_name [index ];
10941125}
10951126
1127+ const char * table_name (cloudsync_table_context * table ) {
1128+ return table -> name ;
1129+ }
1130+
1131+ const char * table_metaref (cloudsync_table_context * table ) {
1132+ return table -> meta_ref ;
1133+ }
1134+
1135+ int cloudsync_table_count (cloudsync_context * data ) {
1136+ return data -> tables_count ;
1137+ }
1138+
1139+ cloudsync_table_context * cloudsync_table_at (cloudsync_context * data , int index ) {
1140+ if (index < 0 || index >= data -> tables_count ) return NULL ;
1141+ return data -> tables [index ];
1142+ }
1143+
10961144bool table_pk_exists (cloudsync_table_context * table , const char * value , size_t len ) {
10971145 // check if a row with the same primary key already exists
10981146 // if so, this means the row might have been previously deleted (sentinel)
@@ -2224,6 +2272,42 @@ int cloudsync_payload_encode_step (cloudsync_payload_context *payload, cloudsync
22242272 return DBRES_OK ;
22252273}
22262274
2275+ int cloudsync_payload_encode_combine (cloudsync_payload_context * target , cloudsync_payload_context * source ) {
2276+ if (!source || source -> nrows == 0 ) return DBRES_OK ;
2277+ if (!target ) return DBRES_ERROR ;
2278+
2279+ // If target is empty, just take over source's data
2280+ if (target -> nrows == 0 ) {
2281+ target -> buffer = source -> buffer ;
2282+ target -> bsize = source -> bsize ;
2283+ target -> balloc = source -> balloc ;
2284+ target -> bused = source -> bused ;
2285+ target -> nrows = source -> nrows ;
2286+ target -> ncols = source -> ncols ;
2287+ // Clear source so it won't free the buffer
2288+ source -> buffer = NULL ;
2289+ source -> bsize = 0 ;
2290+ source -> balloc = 0 ;
2291+ source -> bused = 0 ;
2292+ source -> nrows = 0 ;
2293+ return DBRES_OK ;
2294+ }
2295+
2296+ // Append source buffer to target
2297+ size_t needed = target -> bused + source -> bused ;
2298+ if (needed > target -> balloc ) {
2299+ size_t new_alloc = needed * 2 ;
2300+ char * new_buf = cloudsync_memory_realloc (target -> buffer , (uint64_t )new_alloc );
2301+ if (!new_buf ) return DBRES_NOMEM ;
2302+ target -> buffer = new_buf ;
2303+ target -> balloc = new_alloc ;
2304+ }
2305+ memcpy (target -> buffer + target -> bused , source -> buffer , source -> bused );
2306+ target -> bused += source -> bused ;
2307+ target -> nrows += source -> nrows ;
2308+ return DBRES_OK ;
2309+ }
2310+
22272311int cloudsync_payload_encode_final (cloudsync_payload_context * payload , cloudsync_context * data ) {
22282312 DEBUG_FUNCTION ("cloudsync_payload_encode_final" );
22292313
@@ -2559,7 +2643,7 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
25592643 // retrieve BLOB
25602644 char sql [1024 ];
25612645 snprintf (sql , sizeof (sql ), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid()) "
2562- "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF( db_version = max_db_version, seq, 0) ) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL" , * db_version , * db_version , * seq );
2646+ "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, MAX( max_db_version) AS max_db_version, MAX(CASE WHEN db_version = max_db_version THEN seq ELSE 0 END ) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL" , * db_version , * db_version , * seq );
25632647
25642648 int64_t len = 0 ;
25652649 int rc = database_select_blob_2int (data , sql , blob , & len , new_db_version , new_seq );
@@ -2726,6 +2810,8 @@ int cloudsync_cleanup_internal (cloudsync_context *data, cloudsync_table_context
27262810 return DBRES_OK ;
27272811}
27282812
2813+ static void cloudsync_finalize_context_stmts (cloudsync_context * data );
2814+
27292815int cloudsync_cleanup (cloudsync_context * data , const char * table_name ) {
27302816 cloudsync_table_context * table = table_lookup (data , table_name );
27312817 if (!table ) return DBRES_OK ;
@@ -2742,6 +2828,7 @@ int cloudsync_cleanup (cloudsync_context *data, const char *table_name) {
27422828 // cleanup database on last table
27432829 cloudsync_reset_siteid (data );
27442830 dbutils_settings_cleanup (data );
2831+ cloudsync_finalize_context_stmts (data );
27452832 } else {
27462833 if (database_internal_table_exists (data , CLOUDSYNC_TABLE_SETTINGS_NAME ) == true) {
27472834 cloudsync_update_schema_hash (data );
@@ -2755,38 +2842,38 @@ int cloudsync_cleanup_all (cloudsync_context *data) {
27552842 return database_cleanup (data );
27562843}
27572844
2845+ // Finalize and NULL out all context-level prepared statements and cached schema.
2846+ // Shared by cloudsync_cleanup (last table) and cloudsync_terminate.
2847+ static void cloudsync_finalize_context_stmts (cloudsync_context * data ) {
2848+ if (data -> schema_version_stmt ) { databasevm_finalize (data -> schema_version_stmt ); data -> schema_version_stmt = NULL ; }
2849+ if (data -> data_version_stmt ) { databasevm_finalize (data -> data_version_stmt ); data -> data_version_stmt = NULL ; }
2850+ if (data -> db_version_stmt ) { databasevm_finalize (data -> db_version_stmt ); data -> db_version_stmt = NULL ; }
2851+ if (data -> getset_siteid_stmt ) { databasevm_finalize (data -> getset_siteid_stmt ); data -> getset_siteid_stmt = NULL ; }
2852+ if (data -> current_schema ) { cloudsync_memory_free (data -> current_schema ); data -> current_schema = NULL ; }
2853+ }
2854+
27582855int cloudsync_terminate (cloudsync_context * data ) {
27592856 // can't use for/loop here because data->tables_count is changed by table_remove
27602857 while (data -> tables_count > 0 ) {
27612858 cloudsync_table_context * t = data -> tables [data -> tables_count - 1 ];
27622859 table_remove (data , t );
27632860 table_free (t );
27642861 }
2765-
2766- if (data -> schema_version_stmt ) databasevm_finalize (data -> schema_version_stmt );
2767- if (data -> data_version_stmt ) databasevm_finalize (data -> data_version_stmt );
2768- if (data -> db_version_stmt ) databasevm_finalize (data -> db_version_stmt );
2769- if (data -> getset_siteid_stmt ) databasevm_finalize (data -> getset_siteid_stmt );
2770- if (data -> current_schema ) cloudsync_memory_free (data -> current_schema );
2771-
2772- data -> schema_version_stmt = NULL ;
2773- data -> data_version_stmt = NULL ;
2774- data -> db_version_stmt = NULL ;
2775- data -> getset_siteid_stmt = NULL ;
2776- data -> current_schema = NULL ;
2777-
2862+
2863+ cloudsync_finalize_context_stmts (data );
2864+
27782865 // reset the site_id so the cloudsync_context_init will be executed again
27792866 // if any other cloudsync function is called after terminate
27802867 data -> site_id [0 ] = 0 ;
2781-
2868+
27822869 return 1 ;
27832870}
27842871
27852872int cloudsync_init_table (cloudsync_context * data , const char * table_name , const char * algo_name , bool skip_int_pk_check ) {
27862873 // sanity check table and its primary key(s)
27872874 int rc = cloudsync_table_sanity_check (data , table_name , skip_int_pk_check );
27882875 if (rc != DBRES_OK ) return rc ;
2789-
2876+
27902877 // init cloudsync_settings
27912878 if (cloudsync_context_init (data ) == NULL ) {
27922879 return cloudsync_set_error (data , "Unable to initialize cloudsync context" , DBRES_MISUSE );
@@ -2812,7 +2899,7 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
28122899
28132900 // check if table name was already augmented
28142901 table_algo algo_current = dbutils_table_settings_get_algo (data , table_name );
2815-
2902+
28162903 // sanity check algorithm
28172904 if ((algo_new == algo_current ) && (algo_current != table_algo_none )) {
28182905 // if table algorithms and the same and not none, do nothing
@@ -2845,16 +2932,16 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
28452932 // check triggers
28462933 rc = database_create_triggers (data , table_name , algo_new , init_filter );
28472934 if (rc != DBRES_OK ) return cloudsync_set_error (data , "An error occurred while creating triggers" , DBRES_MISUSE );
2848-
2935+
28492936 // check meta-table
28502937 rc = database_create_metatable (data , table_name );
28512938 if (rc != DBRES_OK ) return cloudsync_set_error (data , "An error occurred while creating metatable" , DBRES_MISUSE );
2852-
2939+
28532940 // add prepared statements
28542941 if (cloudsync_add_dbvms (data ) != DBRES_OK ) {
28552942 return cloudsync_set_error (data , "An error occurred while trying to compile prepared SQL statements" , DBRES_MISUSE );
28562943 }
2857-
2944+
28582945 // add table to in-memory data context
28592946 if (table_add_to_context (data , algo_new , table_name ) == false) {
28602947 char buffer [1024 ];
@@ -2865,6 +2952,6 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
28652952 if (cloudsync_refill_metatable (data , table_name ) != DBRES_OK ) {
28662953 return cloudsync_set_error (data , "An error occurred while trying to fill the augmented table" , DBRES_MISUSE );
28672954 }
2868-
2955+
28692956 return DBRES_OK ;
28702957}
0 commit comments