@@ -70,12 +70,17 @@ INNER JOIN information_schema.tables t
7070 AND c.table_schema NOT IN ('pg_catalog', 'information_schema')" ;
7171
7272 pkSql = @"
73- SELECT tc.table_schema AS TableSchema, tc.table_name AS TableName, kcu.column_name AS ColumnName
73+ SELECT tc.table_schema AS TableSchema, tc.table_name AS TableName, kcu.column_name AS ColumnName,
74+ col.data_type AS DataType, col.udt_name AS UdtName
7475 FROM information_schema.table_constraints tc
7576 INNER JOIN information_schema.key_column_usage kcu
7677 ON tc.constraint_name = kcu.constraint_name
7778 AND tc.table_schema = kcu.table_schema
7879 AND tc.table_name = kcu.table_name
80+ INNER JOIN information_schema.columns col
81+ ON col.table_schema = kcu.table_schema
82+ AND col.table_name = kcu.table_name
83+ AND col.column_name = kcu.column_name
7984 WHERE tc.constraint_type = 'PRIMARY KEY'
8085 AND tc.table_schema NOT IN ('pg_catalog', 'information_schema')" ;
8186 }
@@ -89,12 +94,17 @@ INNER JOIN INFORMATION_SCHEMA.TABLES t
8994 WHERE c.DATA_TYPE IN ('char', 'varchar', 'nchar', 'nvarchar', 'text', 'ntext')" ;
9095
9196 pkSql = @"
92- SELECT tc.TABLE_SCHEMA AS TableSchema, tc.TABLE_NAME AS TableName, kcu.COLUMN_NAME AS ColumnName
97+ SELECT tc.TABLE_SCHEMA AS TableSchema, tc.TABLE_NAME AS TableName, kcu.COLUMN_NAME AS ColumnName,
98+ col.DATA_TYPE AS DataType
9399 FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
94100 INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
95101 ON tc.CONSTRAINT_NAME = kcu.CONSTRAINT_NAME
96102 AND tc.TABLE_SCHEMA = kcu.TABLE_SCHEMA
97103 AND tc.TABLE_NAME = kcu.TABLE_NAME
104+ INNER JOIN INFORMATION_SCHEMA.COLUMNS col
105+ ON col.TABLE_SCHEMA = kcu.TABLE_SCHEMA
106+ AND col.TABLE_NAME = kcu.TABLE_NAME
107+ AND col.COLUMN_NAME = kcu.COLUMN_NAME
98108 WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'" ;
99109 }
100110
@@ -109,7 +119,7 @@ INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
109119 var singlePk = pkRows
110120 . GroupBy ( p => p . TableSchema + "." + p . TableName )
111121 . Where ( g => g . Count ( ) == 1 )
112- . ToDictionary ( g => g . Key , g => g . First ( ) . ColumnName , StringComparer . OrdinalIgnoreCase ) ;
122+ . ToDictionary ( g => g . Key , g => g . First ( ) , StringComparer . OrdinalIgnoreCase ) ;
113123
114124 var targets = new List < Utf8TextColumnTarget > ( ) ;
115125
@@ -120,13 +130,20 @@ INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
120130 if ( ExcludedTables . Contains ( first . TableName ) )
121131 continue ;
122132
123- if ( ! singlePk . TryGetValue ( group . Key , out var pkColumn ) )
133+ if ( ! singlePk . TryGetValue ( group . Key , out var pk ) )
124134 continue ; // no single-column PK -> cannot page safely
125135
136+ // The keyset cursor is a string re-bound to the PK's native type; only text,
137+ // integer and uuid PKs are supported. Skip anything else (e.g. date/decimal PKs)
138+ // so we never emit a query that compares an incompatible type against the cursor.
139+ var keyType = ClassifyKeyType ( pk ) ;
140+ if ( keyType == null )
141+ continue ;
142+
126143 // Never clean the primary-key column itself.
127144 var textColumns = group
128145 . Select ( c => c . ColumnName )
129- . Where ( name => ! string . Equals ( name , pkColumn , StringComparison . OrdinalIgnoreCase ) )
146+ . Where ( name => ! string . Equals ( name , pk . ColumnName , StringComparison . OrdinalIgnoreCase ) )
130147 . ToList ( ) ;
131148
132149 if ( textColumns . Count == 0 )
@@ -136,7 +153,8 @@ INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
136153 {
137154 Schema = first . TableSchema ,
138155 TableName = first . TableName ,
139- PrimaryKeyColumn = pkColumn ,
156+ PrimaryKeyColumn = pk . ColumnName ,
157+ PrimaryKeyType = keyType . Value ,
140158 TextColumns = textColumns
141159 } ) ;
142160 }
@@ -168,7 +186,7 @@ public async Task<Utf8RowBatch> GetRowBatchAsync(Utf8TextColumnTarget target, st
168186 var dynamicParameters = new DynamicParameters ( ) ;
169187 dynamicParameters . Add ( "batchSize" , batchSize ) ;
170188 if ( hasCursor )
171- dynamicParameters . Add ( "lastKey" , lastKey ) ;
189+ dynamicParameters . Add ( "lastKey" , ConvertKey ( lastKey , target . PrimaryKeyType ) ) ;
172190
173191 using ( DbConnection conn = _connectionProvider . Create ( ) )
174192 {
@@ -238,7 +256,7 @@ public async Task<int> UpdateRowsAsync(Utf8TextColumnTarget target, IReadOnlyLis
238256 dynamicParameters . Add ( paramName , ( object ) column . Value ?? DBNull . Value ) ;
239257 }
240258
241- dynamicParameters . Add ( "rg_key" , row . Key ) ;
259+ dynamicParameters . Add ( "rg_key" , ConvertKey ( row . Key , target . PrimaryKeyType ) ) ;
242260
243261 var sql = $ "UPDATE { table } SET { string . Join ( ", " , setClauses ) } WHERE { pk } = @rg_key";
244262
@@ -289,24 +307,45 @@ public async Task SaveProgressAsync(Utf8CleanupProgress progress, CancellationTo
289307 {
290308 try
291309 {
292- var updateSql = $@ "
293- UPDATE { ProgressTable }
294- SET lastprocessedkey = @LastProcessedKey, lastcompletedutc = @LastCompletedUtc,
295- rowsscanned = @RowsScanned, rowsfixed = @RowsFixed, updatedonutc = @UpdatedOnUtc
296- WHERE tablename = @TableName" ;
310+ // Atomic upsert: overlapping cleanup runs (long sweeps or multiple workers) can save the
311+ // first watermark for the same table concurrently, which the old UPDATE-then-INSERT raced
312+ // on (duplicate-PK on the second INSERT). The tablename PK makes both forms below race-safe.
313+ string upsertSql ;
297314
298- var insertSql = $@ "
299- INSERT INTO { ProgressTable } (tablename, lastprocessedkey, lastcompletedutc, rowsscanned, rowsfixed, updatedonutc)
300- VALUES (@TableName, @LastProcessedKey, @LastCompletedUtc, @RowsScanned, @RowsFixed, @UpdatedOnUtc)" ;
315+ if ( IsPostgres )
316+ {
317+ upsertSql = $@ "
318+ INSERT INTO { ProgressTable } (tablename, lastprocessedkey, lastcompletedutc, rowsscanned, rowsfixed, updatedonutc)
319+ VALUES (@TableName, @LastProcessedKey, @LastCompletedUtc, @RowsScanned, @RowsFixed, @UpdatedOnUtc)
320+ ON CONFLICT (tablename) DO UPDATE SET
321+ lastprocessedkey = EXCLUDED.lastprocessedkey,
322+ lastcompletedutc = EXCLUDED.lastcompletedutc,
323+ rowsscanned = EXCLUDED.rowsscanned,
324+ rowsfixed = EXCLUDED.rowsfixed,
325+ updatedonutc = EXCLUDED.updatedonutc" ;
326+ }
327+ else
328+ {
329+ // UPDLOCK + SERIALIZABLE takes a key-range lock so the row-absent case serializes:
330+ // one run inserts, the other blocks then updates. No MERGE (its upsert races are well known).
331+ upsertSql = $@ "
332+ SET XACT_ABORT ON;
333+ BEGIN TRANSACTION;
334+ UPDATE { ProgressTable } WITH (UPDLOCK, SERIALIZABLE)
335+ SET lastprocessedkey = @LastProcessedKey, lastcompletedutc = @LastCompletedUtc,
336+ rowsscanned = @RowsScanned, rowsfixed = @RowsFixed, updatedonutc = @UpdatedOnUtc
337+ WHERE tablename = @TableName;
338+ IF @@ROWCOUNT = 0
339+ INSERT INTO { ProgressTable } (tablename, lastprocessedkey, lastcompletedutc, rowsscanned, rowsfixed, updatedonutc)
340+ VALUES (@TableName, @LastProcessedKey, @LastCompletedUtc, @RowsScanned, @RowsFixed, @UpdatedOnUtc);
341+ COMMIT TRANSACTION;" ;
342+ }
301343
302344 using ( DbConnection conn = _connectionProvider . Create ( ) )
303345 {
304346 await conn . OpenAsync ( cancellationToken ) ;
305347
306- var affected = await conn . ExecuteAsync ( new CommandDefinition ( updateSql , progress , cancellationToken : cancellationToken ) ) ;
307-
308- if ( affected == 0 )
309- await conn . ExecuteAsync ( new CommandDefinition ( insertSql , progress , cancellationToken : cancellationToken ) ) ;
348+ await conn . ExecuteAsync ( new CommandDefinition ( upsertSql , progress , cancellationToken : cancellationToken ) ) ;
310349 }
311350 }
312351 catch ( Exception ex )
@@ -316,11 +355,72 @@ public async Task SaveProgressAsync(Utf8CleanupProgress progress, CancellationTo
316355 }
317356 }
318357
358+ /// <summary>
359+ /// Maps a primary key column's declared type to the pageable category used for keyset cursors,
360+ /// or null when the type cannot be paged with a string cursor (e.g. date/decimal PKs).
361+ /// </summary>
362+ private static Utf8PrimaryKeyType ? ClassifyKeyType ( MetaColumn pk )
363+ {
364+ var udtName = ( pk . UdtName ?? string . Empty ) . ToLowerInvariant ( ) ;
365+ if ( udtName == "citext" )
366+ return Utf8PrimaryKeyType . Text ;
367+
368+ var dataType = ( pk . DataType ?? string . Empty ) . ToLowerInvariant ( ) ;
369+ switch ( dataType )
370+ {
371+ case "char" :
372+ case "varchar" :
373+ case "nchar" :
374+ case "nvarchar" :
375+ case "text" :
376+ case "ntext" :
377+ case "character" :
378+ case "character varying" :
379+ return Utf8PrimaryKeyType . Text ;
380+
381+ case "tinyint" :
382+ case "smallint" :
383+ case "int" :
384+ case "integer" :
385+ case "bigint" :
386+ return Utf8PrimaryKeyType . Integer ;
387+
388+ case "uniqueidentifier" :
389+ case "uuid" :
390+ return Utf8PrimaryKeyType . Guid ;
391+
392+ default :
393+ return null ; // unsupported PK type -> table is skipped
394+ }
395+ }
396+
397+ /// <summary>
398+ /// Re-binds a string cursor to the primary key's native CLR type so the keyset <c>></c> / <c>=</c>
399+ /// predicates compare like types (PostgreSQL has no implicit text-to-integer/uuid coercion).
400+ /// </summary>
401+ private static object ConvertKey ( string key , Utf8PrimaryKeyType keyType )
402+ {
403+ if ( string . IsNullOrEmpty ( key ) )
404+ return key ;
405+
406+ switch ( keyType )
407+ {
408+ case Utf8PrimaryKeyType . Integer :
409+ return long . Parse ( key , CultureInfo . InvariantCulture ) ;
410+ case Utf8PrimaryKeyType . Guid :
411+ return Guid . Parse ( key ) ;
412+ default :
413+ return key ;
414+ }
415+ }
416+
319417 private class MetaColumn
320418 {
321419 public string TableSchema { get ; set ; }
322420 public string TableName { get ; set ; }
323421 public string ColumnName { get ; set ; }
422+ public string DataType { get ; set ; }
423+ public string UdtName { get ; set ; }
324424 }
325425 }
326426}
0 commit comments