4949#define CLOUDSYNC_INIT_NTABLES 64
5050#define CLOUDSYNC_MIN_DB_VERSION 0
5151
52- #define CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK 1
5352#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE (512*1024)
5453#define CLOUDSYNC_PAYLOAD_SIGNATURE 0x434C5359 /* 'C','L','S','Y' */
5554#define CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL 1
5655#define CLOUDSYNC_PAYLOAD_VERSION_1 CLOUDSYNC_PAYLOAD_VERSION_ORIGNAL
5756#define CLOUDSYNC_PAYLOAD_VERSION_2 2
57+ #define CLOUDSYNC_PAYLOAD_VERSION_LATEST CLOUDSYNC_PAYLOAD_VERSION_2
5858#define CLOUDSYNC_PAYLOAD_MIN_VERSION_WITH_CHECKSUM CLOUDSYNC_PAYLOAD_VERSION_2
5959
6060#ifndef MAX
6363
6464#define DEBUG_DBERROR (_rc , _fn , _data ) do {if (_rc != DBRES_OK) printf("Error in %s: %s\n", _fn, database_errmsg(_data));} while (0)
6565
66- #if CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
67- bool schema_hash_disabled = true;
68- #endif
69-
7066typedef enum {
7167 CLOUDSYNC_PK_INDEX_TBL = 0 ,
7268 CLOUDSYNC_PK_INDEX_PK = 1 ,
@@ -1208,18 +1204,20 @@ int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, c
12081204 return rc ;
12091205 }
12101206
1211- // bind value
1207+ // bind value (always bind all expected parameters for correct prepared statement handling)
12121208 if (col_value ) {
12131209 rc = databasevm_bind_value (vm , table -> npks + 1 , col_value );
12141210 if (rc == DBRES_OK ) rc = databasevm_bind_value (vm , table -> npks + 2 , col_value );
1215- if (rc != DBRES_OK ) {
1216- cloudsync_set_dberror (data );
1217- dbvm_reset (vm );
1218- return rc ;
1219- }
1220-
1211+ } else {
1212+ rc = databasevm_bind_null (vm , table -> npks + 1 );
1213+ if (rc == DBRES_OK ) rc = databasevm_bind_null (vm , table -> npks + 2 );
12211214 }
1222-
1215+ if (rc != DBRES_OK ) {
1216+ cloudsync_set_dberror (data );
1217+ dbvm_reset (vm );
1218+ return rc ;
1219+ }
1220+
12231221 // perform real operation and disable triggers
12241222
12251223 // in case of GOS we reused the table->col_merge_stmt statement
@@ -1794,13 +1792,104 @@ int cloudsync_commit_alter (cloudsync_context *data, const char *table_name) {
17941792 return rc ;
17951793}
17961794
1795+ // MARK: - Filter Rewrite -
1796+
1797+ // Replace bare column names in a filter expression with prefix-qualified names.
1798+ // E.g., filter="user_id = 42", prefix="NEW", columns=["user_id","id"] → "NEW.\"user_id\" = 42"
1799+ // Columns must be sorted by length descending by the caller to avoid partial matches.
1800+ // Skips content inside single-quoted string literals.
1801+ // Returns a newly allocated string (caller must free with cloudsync_memory_free), or NULL on error.
1802+ // Helper: check if an identifier token matches a column name.
1803+ static bool filter_is_column (const char * token , size_t token_len , char * * columns , int ncols ) {
1804+ for (int i = 0 ; i < ncols ; ++ i ) {
1805+ if (strlen (columns [i ]) == token_len && strncmp (token , columns [i ], token_len ) == 0 )
1806+ return true;
1807+ }
1808+ return false;
1809+ }
1810+
1811+ // Helper: check if character is part of a SQL identifier.
1812+ static bool filter_is_ident_char (char c ) {
1813+ return (c >= 'a' && c <= 'z' ) || (c >= 'A' && c <= 'Z' ) ||
1814+ (c >= '0' && c <= '9' ) || c == '_' ;
1815+ }
1816+
1817+ char * cloudsync_filter_add_row_prefix (const char * filter , const char * prefix , char * * columns , int ncols ) {
1818+ if (!filter || !prefix || !columns || ncols <= 0 ) return NULL ;
1819+
1820+ size_t filter_len = strlen (filter );
1821+ size_t prefix_len = strlen (prefix );
1822+
1823+ // Each identifier match grows by at most (prefix_len + 3) bytes.
1824+ // Worst case: the entire filter is one repeated column reference separated by
1825+ // single characters, so up to (filter_len / 2) matches. Use a safe upper bound.
1826+ size_t max_growth = (filter_len / 2 + 1 ) * (prefix_len + 3 );
1827+ size_t cap = filter_len + max_growth + 64 ;
1828+ char * result = (char * )cloudsync_memory_alloc (cap );
1829+ if (!result ) return NULL ;
1830+ size_t out = 0 ;
1831+
1832+ // Single pass: tokenize into identifiers, quoted strings, and everything else.
1833+ size_t i = 0 ;
1834+ while (i < filter_len ) {
1835+ // Skip single-quoted string literals verbatim (handle '' escape)
1836+ if (filter [i ] == '\'' ) {
1837+ result [out ++ ] = filter [i ++ ];
1838+ while (i < filter_len ) {
1839+ if (filter [i ] == '\'' ) {
1840+ result [out ++ ] = filter [i ++ ];
1841+ // '' is an escaped quote — keep going
1842+ if (i < filter_len && filter [i ] == '\'' ) {
1843+ result [out ++ ] = filter [i ++ ];
1844+ continue ;
1845+ }
1846+ break ; // single ' ends the literal
1847+ }
1848+ result [out ++ ] = filter [i ++ ];
1849+ }
1850+ continue ;
1851+ }
1852+
1853+ // Extract identifier token
1854+ if (filter_is_ident_char (filter [i ])) {
1855+ size_t start = i ;
1856+ while (i < filter_len && filter_is_ident_char (filter [i ])) ++ i ;
1857+ size_t token_len = i - start ;
1858+
1859+ if (filter_is_column (& filter [start ], token_len , columns , ncols )) {
1860+ // Emit PREFIX."column_name"
1861+ memcpy (& result [out ], prefix , prefix_len ); out += prefix_len ;
1862+ result [out ++ ] = '.' ;
1863+ result [out ++ ] = '"' ;
1864+ memcpy (& result [out ], & filter [start ], token_len ); out += token_len ;
1865+ result [out ++ ] = '"' ;
1866+ } else {
1867+ // Not a column — copy as-is
1868+ memcpy (& result [out ], & filter [start ], token_len ); out += token_len ;
1869+ }
1870+ continue ;
1871+ }
1872+
1873+ // Any other character — copy as-is
1874+ result [out ++ ] = filter [i ++ ];
1875+ }
1876+
1877+ result [out ] = '\0' ;
1878+ return result ;
1879+ }
1880+
17971881int cloudsync_refill_metatable (cloudsync_context * data , const char * table_name ) {
17981882 cloudsync_table_context * table = table_lookup (data , table_name );
17991883 if (!table ) return DBRES_ERROR ;
1800-
1884+
18011885 dbvm_t * vm = NULL ;
18021886 int64_t db_version = cloudsync_dbversion_next (data , CLOUDSYNC_VALUE_NOTSET );
18031887
1888+ // Read row-level filter from settings (if any)
1889+ char filter_buf [2048 ];
1890+ int frc = dbutils_table_settings_get_value (data , table_name , "*" , "filter" , filter_buf , sizeof (filter_buf ));
1891+ const char * filter = (frc == DBRES_OK && filter_buf [0 ]) ? filter_buf : NULL ;
1892+
18041893 const char * schema = table -> schema ? table -> schema : "" ;
18051894 char * sql = sql_build_pk_collist_query (schema , table_name );
18061895 char * pkclause_identifiers = NULL ;
@@ -1810,18 +1899,22 @@ int cloudsync_refill_metatable (cloudsync_context *data, const char *table_name)
18101899 char * pkvalues_identifiers = (pkclause_identifiers ) ? pkclause_identifiers : "rowid" ;
18111900
18121901 // Use database-specific query builder to handle type differences in composite PKs
1813- sql = sql_build_insert_missing_pks_query (schema , table_name , pkvalues_identifiers , table -> base_ref , table -> meta_ref );
1902+ sql = sql_build_insert_missing_pks_query (schema , table_name , pkvalues_identifiers , table -> base_ref , table -> meta_ref , filter );
18141903 if (!sql ) {rc = DBRES_NOMEM ; goto finalize ;}
18151904 rc = database_exec (data , sql );
18161905 cloudsync_memory_free (sql );
18171906 if (rc != DBRES_OK ) goto finalize ;
1818-
1907+
18191908 // fill missing colums
18201909 // for each non-pk column:
18211910 // The new query does 1 encode per source row and one indexed NOT-EXISTS probe.
1822- // The old plan does many decodes per candidate and can’t use an index to rule out matches quickly—so it burns CPU and I/O.
1823-
1824- sql = cloudsync_memory_mprintf (SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL , pkvalues_identifiers , table -> base_ref , table -> meta_ref );
1911+ // The old plan does many decodes per candidate and can't use an index to rule out matches quickly—so it burns CPU and I/O.
1912+
1913+ if (filter ) {
1914+ sql = cloudsync_memory_mprintf (SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL_FILTERED , pkvalues_identifiers , table -> base_ref , filter , table -> meta_ref );
1915+ } else {
1916+ sql = cloudsync_memory_mprintf (SQL_CLOUDSYNC_SELECT_PKS_NOT_IN_SYNC_FOR_COL , pkvalues_identifiers , table -> base_ref , table -> meta_ref );
1917+ }
18251918 rc = databasevm_prepare (data , sql , (void * * )& vm , DBFLAG_PERSISTENT );
18261919 cloudsync_memory_free (sql );
18271920 if (rc != DBRES_OK ) goto finalize ;
@@ -2263,15 +2356,17 @@ int cloudsync_payload_apply (cloudsync_context *data, const char *payload, int b
22632356 header .nrows = ntohl (header .nrows );
22642357 header .schema_hash = ntohll (header .schema_hash );
22652358
2266- #if !CLOUDSYNC_PAYLOAD_SKIP_SCHEMA_HASH_CHECK
2267- if (!data || header .schema_hash != data -> schema_hash ) {
2268- if (!database_check_schema_hash (data , header .schema_hash )) {
2269- char buffer [1024 ];
2270- snprintf (buffer , sizeof (buffer ), "Cannot apply the received payload because the schema hash is unknown %llu." , header .schema_hash );
2271- return cloudsync_set_error (data , buffer , DBRES_MISUSE );
2359+ // compare schema_hash only if not disabled and if the received payload was created with the current header version
2360+ // to avoid schema hash mismatch when processed by a peer with a different extension version during software updates.
2361+ if (dbutils_settings_get_int64_value (data , CLOUDSYNC_KEY_SKIP_SCHEMA_HASH_CHECK ) == 0 && header .version == CLOUDSYNC_PAYLOAD_VERSION_LATEST ) {
2362+ if (header .schema_hash != data -> schema_hash ) {
2363+ if (!database_check_schema_hash (data , header .schema_hash )) {
2364+ char buffer [1024 ];
2365+ snprintf (buffer , sizeof (buffer ), "Cannot apply the received payload because the schema hash is unknown %llu." , header .schema_hash );
2366+ return cloudsync_set_error (data , buffer , DBRES_MISUSE );
2367+ }
22722368 }
22732369 }
2274- #endif
22752370
22762371 // sanity check header
22772372 if ((header .signature != CLOUDSYNC_PAYLOAD_SIGNATURE ) || (header .ncols == 0 )) {
@@ -2444,8 +2539,8 @@ int cloudsync_payload_get (cloudsync_context *data, char **blob, int *blob_size,
24442539
24452540 // retrieve BLOB
24462541 char sql [1024 ];
2447- snprintf (sql , sizeof (sql ), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes) "
2448- "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, NULL )) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL" , * db_version , * db_version , * seq );
2542+ snprintf (sql , sizeof (sql ), "WITH max_db_version AS (SELECT MAX(db_version) AS max_db_version FROM cloudsync_changes WHERE site_id=cloudsync_siteid() ) "
2543+ "SELECT * FROM (SELECT cloudsync_payload_encode(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) AS payload, max_db_version AS max_db_version, MAX(IIF(db_version = max_db_version, seq, 0 )) FROM cloudsync_changes, max_db_version WHERE site_id=cloudsync_siteid() AND (db_version>%d OR (db_version=%d AND seq>%d))) WHERE payload IS NOT NULL" , * db_version , * db_version , * seq );
24492544
24502545 int64_t len = 0 ;
24512546 int rc = database_select_blob_2int (data , sql , blob , & len , new_db_version , new_seq );
@@ -2723,8 +2818,13 @@ int cloudsync_init_table (cloudsync_context *data, const char *table_name, const
27232818 // sync algo with table (unused in this version)
27242819 // cloudsync_sync_table_key(data, table_name, "*", CLOUDSYNC_KEY_ALGO, crdt_algo_name(algo_new));
27252820
2821+ // read row-level filter from settings (if any)
2822+ char init_filter_buf [2048 ];
2823+ int init_frc = dbutils_table_settings_get_value (data , table_name , "*" , "filter" , init_filter_buf , sizeof (init_filter_buf ));
2824+ const char * init_filter = (init_frc == DBRES_OK && init_filter_buf [0 ]) ? init_filter_buf : NULL ;
2825+
27262826 // check triggers
2727- rc = database_create_triggers (data , table_name , algo_new );
2827+ rc = database_create_triggers (data , table_name , algo_new , init_filter );
27282828 if (rc != DBRES_OK ) return cloudsync_set_error (data , "An error occurred while creating triggers" , DBRES_MISUSE );
27292829
27302830 // check meta-table
0 commit comments