@@ -26,11 +26,12 @@ interface JobConfig {
2626
2727interface MutationTask extends Document {
2828 _id : ObjectId | string ;
29- type : 'delete' | 'update' ;
29+ type : 'delete' | 'update' | 'native_ch' ;
3030 db : string ;
3131 collection : string ;
3232 query : Record < string , unknown > ;
3333 update ?: Record < string , unknown > ;
34+ native_sql ?: string ;
3435 running : boolean ;
3536 status : string ;
3637 hb ?: number ;
@@ -272,7 +273,7 @@ class MutationManagerJob extends Job {
272273 */
273274 async processTask ( task : MutationTask , summary : SummaryEntry [ ] , jobConfig : JobConfig = jobConfigState || DEFAULT_JOB_CONFIG ) : Promise < void > {
274275 const type = task . type ;
275- if ( type !== 'delete' && type !== 'update' ) {
276+ if ( type !== 'delete' && type !== 'update' && type !== 'native_ch' ) {
276277 await common . db . collection ( 'mutation_manager' ) . updateOne (
277278 { _id : task . _id } ,
278279 {
@@ -288,7 +289,7 @@ class MutationManagerJob extends Job {
288289 const clickhouseEnabled = mutationManager . isClickhouseEnabled ( ) ;
289290 const hasClickhouseDelete = clickhouseEnabled && ! ! ( clickHouseRunner && clickHouseRunner . deleteGranularDataByQuery ) ;
290291 const hasClickhouseUpdate = clickhouseEnabled && ! ! ( clickHouseRunner && clickHouseRunner . updateGranularDataByQuery ) ;
291- const hasClickhouse = ( type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete ) ;
292+ const hasClickhouse = type === 'native_ch' ? clickhouseEnabled : ( type === 'update' ? hasClickhouseUpdate : hasClickhouseDelete ) ;
292293
293294 if ( ! mongoDb && ! hasClickhouse ) {
294295 const reason = `mongo_db_unavailable:${ task . db || 'missing' } ` ;
@@ -317,7 +318,11 @@ class MutationManagerJob extends Job {
317318 }
318319
319320 let mongoOk = true ;
320- if ( mongoDb ) {
321+ if ( type === 'native_ch' ) {
322+ // Native CH mutations skip MongoDB entirely
323+ log . d ( 'Native CH mutation - skipping MongoDB' , { taskId : task . _id } ) ;
324+ }
325+ else if ( mongoDb ) {
321326 if ( type === 'update' ) {
322327 mongoOk = await this . updateMongo ( task , mongoDb ) ;
323328 }
@@ -330,7 +335,10 @@ class MutationManagerJob extends Job {
330335 }
331336
332337 let chScheduledOk = true ;
333- if ( type === 'update' && hasClickhouseUpdate ) {
338+ if ( type === 'native_ch' && clickhouseEnabled ) {
339+ chScheduledOk = await this . executeNativeClickhouse ( task ) ;
340+ }
341+ else if ( type === 'update' && hasClickhouseUpdate ) {
334342 chScheduledOk = await this . updateClickhouse ( task ) ;
335343 }
336344 else if ( type === 'delete' && hasClickhouseDelete ) {
@@ -422,8 +430,10 @@ class MutationManagerJob extends Job {
422430 for ( const task of awaiting ) {
423431 try {
424432 if ( chHealth && typeof chHealth . getMutationStatus === 'function' ) {
425- // In cluster mode, mutations target _local tables, so validation must check _local
426- const validationTable = isClusterMode ? task . collection + '_local' : task . collection ;
433+ // In cluster mode, mutations target _local tables, so validation must check _local.
434+ // native_ch tasks may already have _local in collection name — avoid doubling.
435+ const needsLocalSuffix = isClusterMode && ! task . collection . endsWith ( '_local' ) ;
436+ const validationTable = needsLocalSuffix ? task . collection + '_local' : task . collection ;
427437 const status = await chHealth . getMutationStatus ( { validation_command_id : task . validation_command_id , table : validationTable , database : task . db } ) ;
428438 if ( status && status . is_done ) {
429439 await common . db . collection ( 'mutation_manager' ) . updateOne (
@@ -677,6 +687,95 @@ class MutationManagerJob extends Job {
677687 }
678688 }
679689
690+ /**
691+ * Build validated ClickHouse mutation SQL with an embedded command-id for tracking.
692+ * - Strips trailing semicolon
693+ * - Validates ALTER TABLE ... DELETE/UPDATE ... WHERE ... shape
694+ * - Injects tautological AND before any SETTINGS clause
695+ * @returns Final SQL string, or null if the shape is invalid
696+ */
697+ private buildValidatedNativeClickhouseSql ( baseSql : string , commandId : string ) : string | null {
698+ if ( ! baseSql || typeof baseSql !== 'string' ) {
699+ return null ;
700+ }
701+ let sql = baseSql . trim ( ) ;
702+ if ( sql . endsWith ( ';' ) ) {
703+ sql = sql . slice ( 0 , - 1 ) . trimEnd ( ) ;
704+ }
705+ const upper = sql . toUpperCase ( ) ;
706+ if ( ! upper . startsWith ( 'ALTER TABLE ' ) ) {
707+ return null ;
708+ }
709+ if ( ! / \b ( D E L E T E | U P D A T E ) \b / . test ( upper ) ) {
710+ return null ;
711+ }
712+ if ( ! / \b W H E R E \b / . test ( upper ) ) {
713+ return null ;
714+ }
715+ // Find SETTINGS clause (if any) — inject command-id BEFORE it
716+ const settingsMatch = upper . match ( / \b S E T T I N G S \b / ) ;
717+ const settingsIdx = settingsMatch ?. index ?? - 1 ;
718+ const injection = ` AND '${ commandId } ' = '${ commandId } '` ;
719+ if ( settingsIdx !== - 1 ) {
720+ return sql . slice ( 0 , settingsIdx ) + injection + sql . slice ( settingsIdx ) ;
721+ }
722+ return sql + injection ;
723+ }
724+
725+ /**
726+ * Executes a native ClickHouse SQL mutation directly.
727+ * Used for complex mutations (e.g., deduplication) that cannot be expressed as Mongo-style queries.
728+ * Embeds validation_command_id for tracking via system.mutations.
729+ * @param task - The mutation task with native_sql field
730+ */
731+ async executeNativeClickhouse ( task : MutationTask ) : Promise < boolean > {
732+ if ( ! task . native_sql || typeof task . native_sql !== 'string' ) {
733+ log . e ( 'Skipping native CH mutation (empty sql)' , { taskId : task . _id } ) ;
734+ await this . markFailedOrRetry ( task , 'empty_native_sql' ) ;
735+ return false ;
736+ }
737+
738+ if ( ! common . clickhouseQueryService ) {
739+ log . e ( 'ClickHouse query service not available for native mutation' , { taskId : task . _id } ) ;
740+ await this . markFailedOrRetry ( task , 'ch_query_service_unavailable' ) ;
741+ return false ;
742+ }
743+
744+ try {
745+ const retryIndex = Number ( task . fail_count || 0 ) ;
746+ const commandId = `nm_${ String ( task . _id ) } _${ retryIndex } ` ;
747+
748+ const sql = this . buildValidatedNativeClickhouseSql ( task . native_sql , commandId ) ;
749+ if ( ! sql ) {
750+ log . e ( 'Skipping native CH mutation (invalid SQL shape)' , {
751+ taskId : task . _id ,
752+ native_sql : task . native_sql
753+ } ) ;
754+ await this . markFailedOrRetry ( task , 'invalid_native_sql_shape' ) ;
755+ return false ;
756+ }
757+
758+ // Persist command_id BEFORE executing mutation (crash safety: if we crash
759+ // between execution and this update, validation can still find the command_id)
760+ await common . db . collection ( 'mutation_manager' ) . updateOne (
761+ { _id : task . _id } ,
762+ { $set : { validation_command_id : commandId } }
763+ ) ;
764+
765+ await common . clickhouseQueryService . executeMutation ( { query : sql } ) ;
766+ log . d ( 'Native CH mutation scheduled' , { taskId : task . _id , commandId } ) ;
767+ return true ;
768+ }
769+ catch ( err ) {
770+ log . e ( 'Native CH mutation failed' , {
771+ taskId : task . _id ,
772+ error : ( err as Error ) ?. message || String ( err )
773+ } ) ;
774+ await this . markFailedOrRetry ( task , 'native_ch_error: ' + ( ( err as Error ) ?. message || err + '' ) ) ;
775+ return false ;
776+ }
777+ }
778+
680779 /**
681780 * Marks a task as failed or schedules it for a retry based on the number of previous failures.
682781 * @param task - The task object to update.
0 commit comments