Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions apps/nestjs-backend/src/db-provider/integrity-query/abstract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ export abstract class IntegrityQueryAbstract {
isMultiValue: boolean;
}): string;

/**
* Atomically detect and fix InvalidLinkReference issues in a single SQL
* statement, eliminating the window between detection and fix where
* concurrent writes can re-introduce or worsen desyncs.
*
* Returns null if the database engine does not support atomic detect+fix
* (e.g., SQLite), in which case the caller should fall back to the
* two-step checkLinks() + fixLinks() approach.
*
* Currently implemented for PostgreSQL only.
*/
atomicFixLinks(_params: {
dbTableName: string;
foreignDbTableName: string;
fkHostTableName: string;
lookupDbFieldName: string;
selfKeyName: string;
foreignKeyName: string;
linkDbFieldName: string;
isMultiValue: boolean;
}): string | null {
// Default: not supported. Subclasses override for specific databases.
return null;
}

/**
* Deprecated: Do NOT use in new code.
* Link fields do not persist a display JSON column; their values are derived
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,140 @@ export class IntegrityQueryPostgres extends IntegrityQueryAbstract {
.toQuery();
}

/**
* Atomically detect and fix InvalidLinkReference issues in a single SQL
* statement for PostgreSQL.
*
* Combines the detection query (find records where JSONB diverges from
* junction/FK source of truth) with the fix query (rebuild JSONB from
* junction/FK data) into one UPDATE ... WHERE __id IN (SELECT ...).
*
* This eliminates the window between detection and fix where concurrent
* writes can re-introduce desyncs — a known race condition under high
* write concurrency (see: https://github.com/teableio/teable/issues/2680).
*/
override atomicFixLinks({
dbTableName,
foreignDbTableName,
fkHostTableName,
lookupDbFieldName,
selfKeyName,
foreignKeyName,
linkDbFieldName,
isMultiValue,
}: {
dbTableName: string;
foreignDbTableName: string;
fkHostTableName: string;
lookupDbFieldName: string;
selfKeyName: string;
foreignKeyName: string;
linkDbFieldName: string;
isMultiValue: boolean;
}): string | null {
if (isMultiValue) {
// Multi-value (ManyMany, OneMany): rebuild JSONB array from junction table
// Detection is inlined in the WHERE clause via count comparison
const detectionSubquery = this.knex
.select('sub.__id')
.from(`${dbTableName} as sub`)
.leftJoin(
this.knex(fkHostTableName)
.select({ id: selfKeyName })
.count('* as jc')
.whereNotNull(selfKeyName)
.groupBy(selfKeyName)
.as('jct'),
'sub.__id',
'jct.id'
)
.whereRaw(
`jsonb_array_length(COALESCE(sub."${linkDbFieldName}", '[]'::jsonb)) != COALESCE(jct.jc, 0)`
);

return this.knex(dbTableName)
.update({
[linkDbFieldName]: this.knex
.select(
this.knex.raw(
"jsonb_agg(jsonb_build_object('id', ??, 'title', ??) ORDER BY ??)",
[
`fk.${foreignKeyName}`,
`ft.${lookupDbFieldName}`,
`fk.${foreignKeyName}`,
]
)
)
.from(`${fkHostTableName} as fk`)
.join(`${foreignDbTableName} as ft`, `ft.__id`, `fk.${foreignKeyName}`)
.where('fk.' + selfKeyName, `${dbTableName}.__id`),
})
.whereIn('__id', detectionSubquery)
.toQuery();
}

if (fkHostTableName === dbTableName) {
// Single-value, FK in same table (ManyOne/OneOne): rebuild from FK column
const detectionSubquery = this.knex
.select('sub.__id')
.from(`${dbTableName} as sub`)
.where(function () {
this.whereRaw(`sub."${foreignKeyName}" IS NULL`)
.whereRaw(`sub."${linkDbFieldName}" IS NOT NULL`)
.orWhere(function () {
this.whereRaw(`sub."${linkDbFieldName}" IS NOT NULL`).andWhereRaw(
`(sub."${linkDbFieldName}"->>'id')::text != sub."${foreignKeyName}"::text`
);
});
});

return this.knex(dbTableName)
.update({
[linkDbFieldName]: this.knex.raw(
`CASE WHEN ?? IS NULL THEN NULL
ELSE jsonb_build_object('id', ??, 'title', (SELECT ?? FROM ?? WHERE __id = ??))
END`,
[foreignKeyName, foreignKeyName, lookupDbFieldName, foreignDbTableName, foreignKeyName]
),
})
.whereIn('__id', detectionSubquery)
.toQuery();
}

// Single-value, FK in host table (OneOne cross-table): rebuild via join
const detectionSubquery = this.knex
.select('sub.__id')
.from(`${dbTableName} as sub`)
.leftJoin(`${fkHostTableName} as t2`, `t2.${selfKeyName}`, 'sub.__id')
.where(function () {
this.whereRaw(`t2."${foreignKeyName}" IS NULL`)
.whereRaw(`sub."${linkDbFieldName}" IS NOT NULL`)
.orWhere(function () {
this.whereRaw(`sub."${linkDbFieldName}" IS NOT NULL`).andWhereRaw(
`(sub."${linkDbFieldName}"->>'id')::text != t2."${foreignKeyName}"::text`
);
});
});

return this.knex(dbTableName)
.update({
[linkDbFieldName]: this.knex
.select(
this.knex.raw(
`CASE WHEN t2.?? IS NULL THEN NULL
ELSE jsonb_build_object('id', t2.??, 'title', t2.??)
END`,
[foreignKeyName, foreignKeyName, lookupDbFieldName]
)
)
.from(`${fkHostTableName} as t2`)
.where(`t2.${foreignKeyName}`, `${dbTableName}.__id`)
.limit(1),
})
.whereIn('__id', detectionSubquery)
.toQuery();
}

/**
* Deprecated: Do NOT use in new code.
* Link fields typically do not persist a display JSON column in Postgres;
Expand Down
15 changes: 15 additions & 0 deletions apps/nestjs-backend/src/features/integrity/link-field.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ export class LinkFieldIntegrityService {
selfKeyName: string;
}) {
try {
// Prefer atomic detect+fix when supported (PostgreSQL).
// This eliminates the window between detection and fix where concurrent
// writes can re-introduce or worsen JSONB/junction desyncs.
const atomicQuery = this.dbProvider.integrityQuery().atomicFixLinks(params);
if (atomicQuery) {
const updatedCount = await this.prismaService.$executeRawUnsafe(atomicQuery);
if (updatedCount > 0) {
this.logger.debug(
`Atomically fixed ${updatedCount} records in ${params.dbTableName}`
);
}
return updatedCount;
}

// Fallback: two-step check-then-fix (SQLite and other engines).
const inconsistentRecords = await this.checkLinks(params);

if (inconsistentRecords.length > 0) {
Expand Down
Loading