Skip to content

Commit 32c68e7

Browse files
wip
1 parent 775b87a commit 32c68e7

2 files changed

Lines changed: 67 additions & 59 deletions

File tree

packages/backend/src/indexSyncer.ts

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
import { createBullBoard } from '@bull-board/api';
2-
import { ExpressAdapter } from '@bull-board/express';
31
import * as Sentry from '@sentry/node';
42
import { PrismaClient, Repo, RepoJobStatus, RepoJobType } from "@sourcebot/db";
53
import { createLogger, Logger } from "@sourcebot/logger";
6-
import express from 'express';
7-
import { BullBoardGroupMQAdapter, Job, Queue, ReservedJob, Worker } from "groupmq";
8-
import { Redis } from 'ioredis';
9-
import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
10-
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, measure } from './utils.js';
114
import { existsSync } from 'fs';
5+
import { readdir, rm } from 'fs/promises';
6+
import { Job, Queue, ReservedJob, Worker } from "groupmq";
7+
import { Redis } from 'ioredis';
8+
import { env } from './env.js';
129
import { cloneRepository, fetchRepository, isPathAValidGitRepoRoot, unsetGitConfig, upsertGitConfig } from './git.js';
10+
import { AppContext, repoMetadataSchema, RepoWithConnections, Settings } from "./types.js";
11+
import { getAuthCredentialsForRepo, getRepoPath, getShardPrefix, groupmqLifecycleExceptionWrapper, measure } from './utils.js';
1312
import { indexGitRepository } from './zoekt.js';
14-
import { rm, readdir } from 'fs/promises';
1513

1614
const LOG_TAG = 'index-syncer';
1715
const logger = createLogger(LOG_TAG);
@@ -26,17 +24,6 @@ type JobPayload = {
2624

2725
const JOB_TIMEOUT_MS = 1000 * 60 * 60 * 6; // 6 hour indexing timeout
2826

29-
30-
const groupmqLifecycleExceptionWrapper = async (name: string, fn: () => Promise<void>) => {
31-
try {
32-
await fn();
33-
} catch (error) {
34-
Sentry.captureException(error);
35-
logger.error(`Exception thrown while executing lifecycle function \`${name}\`.`, error);
36-
}
37-
}
38-
39-
4027
export class IndexSyncer {
4128
private interval?: NodeJS.Timeout;
4229
private queue: Queue<JobPayload>;
@@ -52,34 +39,26 @@ export class IndexSyncer {
5239
redis,
5340
namespace: 'index-sync-queue',
5441
jobTimeoutMs: JOB_TIMEOUT_MS,
55-
logger,
56-
maxAttempts: 1,
42+
maxAttempts: 3,
43+
...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? {
44+
logger,
45+
}: {}),
5746
});
5847

5948
this.worker = new Worker<JobPayload>({
6049
queue: this.queue,
6150
maxStalledCount: 1,
6251
handler: this.runJob.bind(this),
6352
concurrency: this.settings.maxRepoIndexingJobConcurrency,
64-
logger,
53+
...(env.SOURCEBOT_LOG_LEVEL === 'debug' ? {
54+
logger,
55+
}: {}),
6556
});
6657

6758
this.worker.on('completed', this.onJobCompleted.bind(this));
6859
this.worker.on('failed', this.onJobFailed.bind(this));
6960
this.worker.on('stalled', this.onJobStalled.bind(this));
7061
this.worker.on('error', this.onWorkerError.bind(this));
71-
72-
// @nocheckin
73-
const app = express();
74-
const serverAdapter = new ExpressAdapter();
75-
76-
createBullBoard({
77-
queues: [new BullBoardGroupMQAdapter(this.queue, { displayName: 'Index Sync' })],
78-
serverAdapter,
79-
});
80-
81-
app.use('/', serverAdapter.getRouter());
82-
app.listen(3070);
8362
}
8463

8564
public async startScheduler() {
@@ -215,7 +194,7 @@ export class IndexSyncer {
215194

216195
for (const job of jobs) {
217196
await this.queue.add({
218-
groupId: `repo:${job.repoId}`,
197+
groupId: `repo:${job.repoId}_${job.repo.name}`,
219198
data: {
220199
jobId: job.id,
221200
type,
@@ -230,7 +209,7 @@ export class IndexSyncer {
230209
private async runJob(job: ReservedJob<JobPayload>) {
231210
const id = job.data.jobId;
232211
const logger = createJobLogger(id);
233-
logger.info(`Running job ${id} for repo ${job.data.repoName}`);
212+
logger.info(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId}) (attempt ${job.attempts + 1} / ${job.maxAttempts})`);
234213

235214
const { repo, type: jobType } = await this.db.repoJob.update({
236215
where: {
@@ -286,35 +265,35 @@ export class IndexSyncer {
286265
// @see: https://github.com/sourcebot-dev/sourcebot/pull/483
287266
await unsetGitConfig(repoPath, ["remote.origin.url"]);
288267

289-
logger.info(`Fetching ${repo.displayName}...`);
268+
logger.info(`Fetching ${repo.name} (id: ${repo.id})...`);
290269
const { durationMs } = await measure(() => fetchRepository({
291270
cloneUrl: cloneUrlMaybeWithToken,
292271
authHeader,
293272
path: repoPath,
294273
onProgress: ({ method, stage, progress }) => {
295-
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
274+
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`)
296275
}
297276
}));
298277
const fetchDuration_s = durationMs / 1000;
299278

300279
process.stdout.write('\n');
301-
logger.info(`Fetched ${repo.displayName} in ${fetchDuration_s}s`);
280+
logger.info(`Fetched ${repo.name} (id: ${repo.id}) in ${fetchDuration_s}s`);
302281

303282
} else if (!isReadOnly) {
304-
logger.info(`Cloning ${repo.displayName}...`);
283+
logger.info(`Cloning ${repo.name} (id: ${repo.id})...`);
305284

306285
const { durationMs } = await measure(() => cloneRepository({
307286
cloneUrl: cloneUrlMaybeWithToken,
308287
authHeader,
309288
path: repoPath,
310289
onProgress: ({ method, stage, progress }) => {
311-
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.displayName}`)
290+
logger.debug(`git.${method} ${stage} stage ${progress}% complete for ${repo.name} (id: ${repo.id})`)
312291
}
313292
}));
314293
const cloneDuration_s = durationMs / 1000;
315294

316295
process.stdout.write('\n');
317-
logger.info(`Cloned ${repo.displayName} in ${cloneDuration_s}s`);
296+
logger.info(`Cloned ${repo.name} (id: ${repo.id}) in ${cloneDuration_s}s`);
318297
}
319298

320299
// Regardless of clone or fetch, always upsert the git config for the repo.
@@ -324,10 +303,10 @@ export class IndexSyncer {
324303
await upsertGitConfig(repoPath, metadata.gitConfig);
325304
}
326305

327-
logger.info(`Indexing ${repo.displayName}...`);
306+
logger.info(`Indexing ${repo.name} (id: ${repo.id})...`);
328307
const { durationMs } = await measure(() => indexGitRepository(repo, this.settings, this.ctx));
329308
const indexDuration_s = durationMs / 1000;
330-
logger.info(`Indexed ${repo.displayName} in ${indexDuration_s}s`);
309+
logger.info(`Indexed ${repo.name} (id: ${repo.id}) in ${indexDuration_s}s`);
331310
}
332311

333312
private async cleanupRepository(repo: Repo, logger: Logger) {
@@ -347,7 +326,7 @@ export class IndexSyncer {
347326
}
348327

349328
private onJobCompleted = async (job: Job<JobPayload>) =>
350-
groupmqLifecycleExceptionWrapper('onJobCompleted', async () => {
329+
groupmqLifecycleExceptionWrapper('onJobCompleted', logger, async () => {
351330
const logger = createJobLogger(job.data.jobId);
352331
const jobData = await this.db.repoJob.update({
353332
where: { id: job.data.jobId },
@@ -365,35 +344,47 @@ export class IndexSyncer {
365344
}
366345
});
367346

368-
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name}`);
347+
logger.info(`Completed index job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
369348
}
370349
else if (jobData.type === RepoJobType.CLEANUP) {
371350
const repo = await this.db.repo.delete({
372351
where: { id: jobData.repoId },
373352
});
374353

375-
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name}`);
354+
logger.info(`Completed cleanup job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id})`);
376355
}
377356
});
378357

379358
private onJobFailed = async (job: Job<JobPayload>) =>
380-
groupmqLifecycleExceptionWrapper('onJobFailed', async () => {
359+
groupmqLifecycleExceptionWrapper('onJobFailed', logger, async () => {
381360
const logger = createJobLogger(job.data.jobId);
382361

383-
const { repo } = await this.db.repoJob.update({
384-
where: { id: job.data.jobId },
385-
data: {
386-
completedAt: new Date(),
387-
errorMessage: job.failedReason,
388-
},
389-
select: { repo: true }
390-
});
362+
const attempt = job.attemptsMade + 1;
363+
const wasLastAttempt = attempt >= job.opts.attempts;
391364

392-
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name}`);
365+
if (wasLastAttempt) {
366+
const { repo } = await this.db.repoJob.update({
367+
where: { id: job.data.jobId },
368+
data: {
369+
status: RepoJobStatus.FAILED,
370+
completedAt: new Date(),
371+
errorMessage: job.failedReason,
372+
},
373+
select: { repo: true }
374+
});
375+
376+
logger.error(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Failing job.`);
377+
} else {
378+
const repo = await this.db.repo.findUniqueOrThrow({
379+
where: { id: job.data.repoId },
380+
});
381+
382+
logger.warn(`Failed job ${job.data.jobId} for repo ${repo.name} (id: ${repo.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
383+
}
393384
});
394385

395386
private onJobStalled = async (jobId: string) =>
396-
groupmqLifecycleExceptionWrapper('onJobStalled', async () => {
387+
groupmqLifecycleExceptionWrapper('onJobStalled', logger, async () => {
397388
const logger = createJobLogger(jobId);
398389
const { repo } = await this.db.repoJob.update({
399390
where: { id: jobId },
@@ -405,7 +396,7 @@ export class IndexSyncer {
405396
select: { repo: true }
406397
});
407398

408-
logger.error(`Job ${jobId} stalled for repo ${repo.name}`);
399+
logger.error(`Job ${jobId} stalled for repo ${repo.name} (id: ${repo.id})`);
409400
});
410401

411402
private async onWorkerError(error: Error) {

packages/backend/src/utils.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,20 @@ const createGitCloneUrlWithToken = (cloneUrl: string, credentials: { username?:
241241
}
242242
return url.toString();
243243
}
244+
245+
246+
/**
247+
* Wraps groupmq worker lifecycle callbacks with exception handling. This prevents
248+
* uncaught exceptions (e.g., like a RepoJob not existing in the DB) from crashing
249+
* the app.
250+
* @see: https://openpanel-dev.github.io/groupmq/api-worker/#events
251+
*/
252+
export const groupmqLifecycleExceptionWrapper = async (name: string, logger: Logger, fn: () => Promise<void>) => {
253+
try {
254+
await fn();
255+
} catch (error) {
256+
Sentry.captureException(error);
257+
logger.error(`Exception thrown while executing lifecycle function \`${name}\`.`, error);
258+
}
259+
}
260+

0 commit comments

Comments
 (0)