Skip to content

Commit 6b12ebc

Browse files
add re-try logic
1 parent 7d4500a commit 6b12ebc

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

packages/backend/src/connectionManager.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export class ConnectionManager {
4747
defaultJobOptions: {
4848
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
4949
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
50+
attempts: 2,
5051
},
5152
});
5253

@@ -61,10 +62,10 @@ export class ConnectionManager {
6162
);
6263

6364
this.worker.on('completed', this.onJobCompleted.bind(this));
64-
this.worker.on('failed', this.onJobFailed.bind(this));
65+
this.worker.on('failed', this.onJobMaybeFailed.bind(this));
6566
this.worker.on('stalled', (jobId) => {
6667
// Just log - BullMQ will automatically retry the job (up to maxStalledCount times).
67-
// If all retries fail, onJobFailed will handle marking it as failed.
68+
// If all retries fail, onJobMaybeFailed will handle marking it as failed.
6869
logger.warn(`Job ${jobId} stalled - BullMQ will retry`);
6970
});
7071
this.worker.on('error', (error) => {
@@ -339,15 +340,22 @@ export class ConnectionManager {
339340
}
340341
}
341342

342-
private async onJobFailed(job: Job<JobPayload> | undefined, error: Error) {
343+
private async onJobMaybeFailed(job: Job<JobPayload> | undefined, error: Error) {
343344
try {
344345
if (!job) {
345346
logger.error(`Job failed but job object is undefined. Error: ${error.message}`);
346347
return;
347348
}
348-
349349
const jobLogger = createJobLogger(job.id!);
350350

351+
// @note: we need to check the job state to determine if the job failed,
352+
// or if it is being retried.
353+
const jobState = await job.getState();
354+
if (jobState !== 'failed') {
355+
jobLogger.warn(`Job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) failed. Retrying...`);
356+
return;
357+
}
358+
351359
const { connection } = await this.db.connectionSyncJob.update({
352360
where: { id: job.id },
353361
data: {
@@ -371,7 +379,7 @@ export class ConnectionManager {
371379
});
372380
} catch (err) {
373381
Sentry.captureException(err);
374-
logger.error(`Exception thrown while executing lifecycle function \`onJobFailed\`.`, err);
382+
logger.error(`Exception thrown while executing lifecycle function \`onJobMaybeFailed\`.`, err);
375383
}
376384
}
377385

packages/backend/src/repoIndexManager.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export class RepoIndexManager {
6666
defaultJobOptions: {
6767
removeOnComplete: env.REDIS_REMOVE_ON_COMPLETE,
6868
removeOnFail: env.REDIS_REMOVE_ON_FAIL,
69+
attempts: 2,
6970
},
7071
});
7172

@@ -85,10 +86,10 @@ export class RepoIndexManager {
8586
);
8687

8788
this.worker.on('completed', this.onJobCompleted.bind(this));
88-
this.worker.on('failed', this.onJobFailed.bind(this));
89+
this.worker.on('failed', this.onJobMaybeFailed.bind(this));
8990
this.worker.on('stalled', (jobId) => {
9091
// Just log - BullMQ will automatically retry the job (up to maxStalledCount times).
91-
// If all retries fail, onJobFailed will handle marking it as failed.
92+
// If all retries fail, onJobMaybeFailed will handle marking it as failed.
9293
logger.warn(`Job ${jobId} stalled - BullMQ will retry`);
9394
});
9495
this.worker.on('error', (error) => {
@@ -572,7 +573,7 @@ export class RepoIndexManager {
572573
}
573574
}
574575

575-
private async onJobFailed(job: Job<JobPayload> | undefined, error: Error) {
576+
private async onJobMaybeFailed(job: Job<JobPayload> | undefined, error: Error) {
576577
try {
577578
if (!job) {
578579
logger.error(`Job failed but job object is undefined. Error: ${error.message}`);
@@ -582,6 +583,14 @@ export class RepoIndexManager {
582583
const jobLogger = createJobLogger(job.data.jobId);
583584
const jobTypeLabel = getJobTypePrometheusLabel(job.data.type);
584585

586+
// @note: we need to check the job state to determine if the job failed,
587+
// or if it is being retried.
588+
const jobState = await job.getState();
589+
if (jobState !== 'failed') {
590+
jobLogger.warn(`Job ${job.id} for repo ${job.data.repoName} (id: ${job.data.repoId}) failed. Retrying...`);
591+
return;
592+
}
593+
585594
const { repo } = await this.db.repoIndexingJob.update({
586595
where: { id: job.data.jobId },
587596
data: {
@@ -604,7 +613,7 @@ export class RepoIndexManager {
604613

605614
} catch (err) {
606615
Sentry.captureException(err);
607-
logger.error(`Exception thrown while executing lifecycle function \`onJobFailed\`.`, err);
616+
logger.error(`Exception thrown while executing lifecycle function \`onJobMaybeFailed\`.`, err);
608617
}
609618
}
610619

0 commit comments

Comments
 (0)