Skip to content

Commit 34beba4

Browse files
committed
review note - finishing a job may occur when items are still marked as processing
1 parent e27fcce commit 34beba4

2 files changed

Lines changed: 20 additions & 10 deletions

File tree

src/cleanup/worker.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { getLogger } from "@logtape/logtape";
2-
import { and, eq, inArray, sql } from "drizzle-orm";
2+
import { and, eq, inArray, or, sql } from "drizzle-orm";
33

44
import db, { type Transaction } from "../db";
55
import * as schema from "../schema";
@@ -122,25 +122,30 @@ async function processJobItems(
122122
return;
123123
}
124124

125-
// Get pending items for this job with lock
126-
const pendingItems = await tx
125+
// Get unfinished items for this job with lock
126+
const unfinishedItems = await tx
127127
.select()
128128
.from(schema.cleanupJobItems)
129129
.where(
130130
and(
131131
eq(schema.cleanupJobItems.jobId, job.id),
132-
eq(schema.cleanupJobItems.status, "pending"),
132+
or(
133+
eq(schema.cleanupJobItems.status, "pending"),
134+
eq(schema.cleanupJobItems.status, "processing"),
135+
),
133136
),
134137
)
135138
.limit(BATCH_SIZE)
136139
.for("update", { skipLocked: true });
137140

138-
if (pendingItems.length === 0) {
141+
if (unfinishedItems.length === 0) {
139142
// No more items - mark job as completed
140143
await finalizeJob(tx, job, "completed");
141144
return;
142145
}
143146

147+
const pendingItems = unfinishedItems.filter((i) => i.status == "pending");
148+
144149
// Mark items as processing within the transaction
145150
const itemsToProcess = pendingItems.slice(0, CONCURRENT_ITEMS);
146151
const itemIds = itemsToProcess.map((item) => item.id);

src/import/worker.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { getLogger } from "@logtape/logtape";
2-
import { and, eq, inArray, sql } from "drizzle-orm";
2+
import { and, eq, inArray, or, sql } from "drizzle-orm";
33

44
import db, { type Transaction } from "../db";
55
import federation from "../federation/federation";
@@ -126,25 +126,30 @@ async function processJobItems(
126126
return;
127127
}
128128

129-
// Get pending items for this job with lock
130-
const pendingItems = await tx
129+
// Get unfinished items for this job with lock
130+
const unfinishedItems = await tx
131131
.select()
132132
.from(schema.importJobItems)
133133
.where(
134134
and(
135135
eq(schema.importJobItems.jobId, job.id),
136-
eq(schema.importJobItems.status, "pending"),
136+
or(
137+
eq(schema.importJobItems.status, "pending"),
138+
eq(schema.importJobItems.status, "processing"),
139+
),
137140
),
138141
)
139142
.limit(BATCH_SIZE)
140143
.for("update", { skipLocked: true });
141144

142-
if (pendingItems.length === 0) {
145+
if (unfinishedItems.length === 0) {
143146
// No more items - mark job as completed
144147
await finalizeJob(tx, job, "completed");
145148
return;
146149
}
147150

151+
const pendingItems = unfinishedItems.filter((i) => i.status == "pending");
152+
148153
// Get account owner for this job
149154
const accountOwner = await tx.query.accountOwners.findFirst({
150155
where: eq(schema.accountOwners.id, job.accountOwnerId),

0 commit comments

Comments
 (0)