Skip to content

Commit d95cd4a

Browse files
dahliacodex
andcommitted
Serialize follower writes with cleanup
Acquire follower-level advisory locks in addFollower() before writing followers and follow_requests so cleanup and inserts cannot race on the same actor key. Add a regression test that reproduces concurrent reassignment plus new follow insertion and verifies valid follower state is retained. #19 (comment) Co-Authored-By: OpenAI Codex <codex@openai.com>
1 parent 658351d commit d95cd4a

2 files changed

Lines changed: 176 additions & 0 deletions

File tree

packages/botkit-postgres/src/mod.test.ts

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,168 @@ if (postgresUrl == null) {
542542
}
543543
});
544544

545+
test("does not drop valid followers during concurrent cleanup and add", async () => {
546+
const schema = createSchemaName();
547+
const adminSql = createSql(postgresUrl);
548+
const sqlA = createSql(postgresUrl);
549+
const sqlB = createSql(postgresUrl);
550+
const oldFollower = new Person({
551+
id: new URL("https://example.com/ap/actor/concurrent-grace"),
552+
preferredUsername: "concurrent-grace",
553+
});
554+
const reassignedFollower = new Person({
555+
id: new URL("https://example.com/ap/actor/concurrent-harper"),
556+
preferredUsername: "concurrent-harper",
557+
});
558+
const followA = new URL(
559+
"https://example.com/ap/follow/ea47ae4b-8e5d-4f5f-b1c8-9d66d5d11d83",
560+
);
561+
const followB = new URL(
562+
"https://example.com/ap/follow/8f65dab0-2a7e-4c6a-b53e-99f3e2521c0e",
563+
);
564+
let resolveCleanupReady!: () => void;
565+
let resolveInsertedNewRequest!: () => void;
566+
const cleanupReady = new Promise<void>((resolve) => {
567+
resolveCleanupReady = resolve;
568+
});
569+
const insertedNewRequest = new Promise<void>((resolve) => {
570+
resolveInsertedNewRequest = resolve;
571+
});
572+
const barrierTimeout = 50;
573+
const wrapCleanupSql = (sql: postgres.Sql): postgres.Sql =>
574+
new Proxy(sql, {
575+
get(target, property, receiver) {
576+
if (property === "begin") {
577+
return async (
578+
callback: (transactionSql: postgres.TransactionSql) => unknown,
579+
) =>
580+
await target.begin(async (transactionSql) =>
581+
await callback(
582+
new Proxy(transactionSql, {
583+
get(txTarget, txProperty, txReceiver) {
584+
if (txProperty === "unsafe") {
585+
return async <T extends object[]>(
586+
query: string,
587+
parameters?: postgres.ParameterOrJSON<never>[],
588+
options?: postgres.UnsafeQueryOptions,
589+
): Promise<T> => {
590+
if (
591+
query.includes(
592+
`DELETE FROM "${schema}"."followers"`,
593+
) &&
594+
parameters?.[0] === oldFollower.id!.href
595+
) {
596+
resolveCleanupReady();
597+
await Promise.race([
598+
insertedNewRequest,
599+
new Promise<void>((resolve) =>
600+
setTimeout(resolve, barrierTimeout)
601+
),
602+
]);
603+
}
604+
return await txTarget.unsafe<T>(
605+
query,
606+
parameters,
607+
options,
608+
);
609+
};
610+
}
611+
return Reflect.get(txTarget, txProperty, txReceiver);
612+
},
613+
}),
614+
)
615+
);
616+
}
617+
return Reflect.get(target, property, receiver);
618+
},
619+
});
620+
const wrapAddSql = (sql: postgres.Sql): postgres.Sql =>
621+
new Proxy(sql, {
622+
get(target, property, receiver) {
623+
if (property === "begin") {
624+
return async (
625+
callback: (transactionSql: postgres.TransactionSql) => unknown,
626+
) =>
627+
await target.begin(async (transactionSql) =>
628+
await callback(
629+
new Proxy(transactionSql, {
630+
get(txTarget, txProperty, txReceiver) {
631+
if (txProperty === "unsafe") {
632+
return async <T extends object[]>(
633+
query: string,
634+
parameters?: postgres.ParameterOrJSON<never>[],
635+
options?: postgres.UnsafeQueryOptions,
636+
): Promise<T> => {
637+
const result = await txTarget.unsafe<T>(
638+
query,
639+
parameters,
640+
options,
641+
);
642+
if (
643+
query.includes(
644+
`INSERT INTO "${schema}"."follow_requests"`,
645+
) &&
646+
parameters?.[0] === followB.href &&
647+
parameters?.[1] === oldFollower.id!.href
648+
) {
649+
resolveInsertedNewRequest();
650+
await new Promise<void>((resolve) =>
651+
setTimeout(resolve, barrierTimeout)
652+
);
653+
}
654+
return result;
655+
};
656+
}
657+
return Reflect.get(txTarget, txProperty, txReceiver);
658+
},
659+
}),
660+
)
661+
);
662+
}
663+
return Reflect.get(target, property, receiver);
664+
},
665+
});
666+
try {
667+
await initializePostgresRepositorySchema(adminSql, schema);
668+
const setupRepo = new PostgresRepository({ sql: adminSql, schema });
669+
await setupRepo.addFollower(followA, oldFollower);
670+
671+
const repoA = new PostgresRepository({
672+
sql: wrapCleanupSql(sqlA),
673+
schema,
674+
});
675+
const repoB = new PostgresRepository({ sql: wrapAddSql(sqlB), schema });
676+
677+
const reassignPromise = repoA.addFollower(followA, reassignedFollower);
678+
await cleanupReady;
679+
const addPromise = repoB.addFollower(followB, oldFollower);
680+
await Promise.all([reassignPromise, addPromise]);
681+
682+
const followers = await Promise.all(
683+
(await Array.fromAsync(repoA.getFollowers())).map((follower) =>
684+
follower.toJsonLd()
685+
),
686+
);
687+
assert.deepStrictEqual(await repoA.countFollowers(), 2);
688+
assert.ok(await repoA.hasFollower(oldFollower.id!));
689+
assert.ok(await repoA.hasFollower(reassignedFollower.id!));
690+
assert.deepStrictEqual(followers, [
691+
await oldFollower.toJsonLd(),
692+
await reassignedFollower.toJsonLd(),
693+
]);
694+
assert.deepStrictEqual(
695+
await (await repoA.removeFollower(followB, oldFollower.id!))
696+
?.toJsonLd(),
697+
await oldFollower.toJsonLd(),
698+
);
699+
assert.deepStrictEqual(await repoA.countFollowers(), 1);
700+
assert.deepStrictEqual(await repoA.hasFollower(oldFollower.id!), false);
701+
} finally {
702+
await adminSql.unsafe(`DROP SCHEMA IF EXISTS "${schema}" CASCADE`);
703+
await Promise.all([sqlA.end(), sqlB.end(), adminSql.end()]);
704+
}
705+
});
706+
545707
test("repository operations and persistence", async () => {
546708
const harness = createHarness();
547709
try {

packages/botkit-postgres/src/mod.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,10 @@ export class PostgresRepository implements Repository, AsyncDisposable {
483483
[followId.href],
484484
);
485485
const previousFollowerId = rows[0]?.follower_id;
486+
await this.lockFollowers(sql, [
487+
followerId.href,
488+
...(previousFollowerId == null ? [] : [previousFollowerId]),
489+
]);
486490
await this.query(
487491
sql,
488492
`INSERT INTO ${this.table("followers")} (follower_id, actor_json)
@@ -741,6 +745,16 @@ export class PostgresRepository implements Repository, AsyncDisposable {
741745
);
742746
}
743747

748+
private async lockFollowers(
749+
sql: Queryable,
750+
followerIds: readonly string[],
751+
): Promise<void> {
752+
const uniqueFollowerIds = [...new Set(followerIds)].sort();
753+
for (const followerId of uniqueFollowerIds) {
754+
await this.lockFollower(sql, followerId);
755+
}
756+
}
757+
744758
private async cleanupFollower(
745759
sql: Queryable,
746760
followerId: string,

0 commit comments

Comments
 (0)