Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Upgraded `tar` to `^7.5.16`. [#1338](https://github.com/sourcebot-dev/sourcebot/pull/1338)
- Upgraded `esbuild` to `^0.28.1`. [#1342](https://github.com/sourcebot-dev/sourcebot/pull/1342)
- Enabled Next.js version skew protection to fix "Failed to load chunk" errors during rolling deploys. [#1346](https://github.com/sourcebot-dev/sourcebot/pull/1346)
- Re-indexed repositories whose database state is indexed but whose zoekt shard files are missing on worker startup. [#1350](https://github.com/sourcebot-dev/sourcebot/pull/1350)

## [5.0.3] - 2026-06-17

Expand Down
94 changes: 94 additions & 0 deletions packages/backend/src/repoIndexManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ vi.mock('@sourcebot/shared', () => ({
path: `/test-data/repos/${repo.id}`,
isReadOnly: false,
})),
getRepoIdFromPath: vi.fn((repoPath: string) => {
const repoId = Number(repoPath.split('/').at(-1));
return Number.isNaN(repoId) ? undefined : repoId;
}),
repoMetadataSchema: {
parse: vi.fn((metadata: unknown) => metadata ?? {}),
},
Expand All @@ -36,6 +40,7 @@ vi.mock('@sourcebot/shared', () => ({

vi.mock('./constants.js', () => ({
WORKER_STOP_GRACEFUL_TIMEOUT_MS: 5000,
REPOS_CACHE_DIR: 'test-data/repos',
INDEX_CACHE_DIR: 'test-data/index',
}));

Expand All @@ -56,6 +61,7 @@ vi.mock('./git.js', () => ({

vi.mock('./zoekt.js', () => ({
indexGitRepository: vi.fn().mockResolvedValue({ stdout: '', stderr: '' }),
cleanupTempShards: vi.fn(),
}));

vi.mock('./posthog.js', () => ({
Expand All @@ -64,6 +70,10 @@ vi.mock('./posthog.js', () => ({

vi.mock('./utils.js', () => ({
getAuthCredentialsForRepo: vi.fn().mockResolvedValue(null),
getRepoIdFromShardFileName: vi.fn((fileName: string) => {
const match = fileName.match(/^(\d+)_(\d+)_/);
return match ? Number(match[2]) : undefined;
}),
getShardPrefix: vi.fn((orgId: number, repoId: number) => `${orgId}_${repoId}`),
measure: vi.fn(async (cb: () => Promise<unknown>) => {
const data = await cb();
Expand Down Expand Up @@ -148,6 +158,7 @@ const createMockPrisma = () => {
repo: {
findMany: vi.fn().mockResolvedValue([]),
update: vi.fn(),
updateMany: vi.fn(),
delete: vi.fn(),
},
repoIndexingJob: {
Expand Down Expand Up @@ -696,6 +707,89 @@ describe('RepoIndexManager', () => {
});
});

describe('Startup Reconciliation', () => {
test('marks indexed repos with missing shard files stale and queues reindex jobs', async () => {
const staleRepo = createMockRepo({
id: 42,
orgId: 7,
name: 'missing-shards',
indexedAt: new Date('2026-01-01T00:00:00Z'),
indexedCommitHash: 'missing123',
});
const healthyRepo = createMockRepo({
id: 43,
orgId: 7,
name: 'has-shards',
indexedAt: new Date('2026-01-01T00:00:00Z'),
indexedCommitHash: 'healthy123',
});

(existsSync as Mock).mockReturnValue(true);
(readdir as Mock).mockImplementation(async (dir: string) => {
if (dir === 'test-data/repos') {
return [];
}
if (dir === 'test-data/index') {
return [
'7_43_main.zoekt',
'7_42_main.zoekt.tmp',
];
}
return [];
});
(mockPrisma.repo.findMany as Mock)
.mockResolvedValueOnce([{ id: 42 }, { id: 43 }])
.mockResolvedValueOnce([staleRepo, healthyRepo]);
(mockPrisma.repo.updateMany as Mock).mockResolvedValue({ count: 1 });
(mockPrisma.repoIndexingJob.createManyAndReturn as Mock).mockResolvedValue([
{
id: 'reindex-job-42',
repo: staleRepo,
},
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);

await manager.startScheduler();

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: {
id: {
in: [42],
},
},
data: {
indexedAt: null,
indexedCommitHash: null,
latestIndexingJobStatus: RepoIndexingJobStatus.PENDING,
},
});

expect(mockPrisma.repoIndexingJob.createManyAndReturn).toHaveBeenCalledWith({
data: [
{
type: RepoIndexingJobType.INDEX,
repoId: 42,
},
],
include: {
repo: true,
},
});

expect(mockQueueAdd).toHaveBeenCalledWith(
'repo-index-job',
{
jobId: 'reindex-job-42',
type: RepoIndexingJobType.INDEX,
repoName: staleRepo.name,
repoId: staleRepo.id,
},
{ jobId: 'reindex-job-42' }
);
});
});

describe('Cleanup Jobs', () => {
test('deletes repo directory and index shards', async () => {
const repo = createMockRepoWithConnections({ id: 5, orgId: 2 });
Expand Down
67 changes: 66 additions & 1 deletion packages/backend/src/repoIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const LOG_TAG = 'repo-index-manager';
const logger = createLogger(LOG_TAG);
const createJobLogger = (jobId: string) => createLogger(`${LOG_TAG}:job:${jobId}`);
const QUEUE_NAME = 'repo-index-queue';
const STALE_REPO_UPDATE_BATCH_SIZE = 500;

type JobPayload = {
type: 'INDEX' | 'CLEANUP';
Expand Down Expand Up @@ -98,8 +99,9 @@ export class RepoIndexManager {

public async startScheduler() {
logger.debug('Starting scheduler');
// Cleanup any orphaned disk resources on startup
// Reconcile DB and disk state on startup before scheduling new work.
await this.cleanupOrphanedDiskResources();
await this.scheduleMissingShardReindexJobs();
this.interval = setIntervalAsync(async () => {
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
Expand Down Expand Up @@ -747,6 +749,69 @@ export class RepoIndexManager {
}
}

private async scheduleMissingShardReindexJobs() {
const repoIdsWithShards = new Set<number>();

if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
for (const entry of entries) {
if (!entry.endsWith('.zoekt') || entry.includes('.tmp')) {
continue;
}

const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
}

const indexedRepos = await this.db.repo.findMany({
where: {
indexedAt: { not: null },
indexedCommitHash: { not: null },
NOT: {
jobs: {
some: {
type: RepoIndexingJobType.INDEX,
status: {
in: [
RepoIndexingJobStatus.PENDING,
RepoIndexingJobStatus.IN_PROGRESS,
]
}
}
}
}
},
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

const reposMissingShards = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
if (reposMissingShards.length === 0) {
return;
}

logger.warn(`Found ${reposMissingShards.length} indexed repo(s) with missing zoekt shard files. Marking stale and scheduling reindex jobs.`);

for (let i = 0; i < reposMissingShards.length; i += STALE_REPO_UPDATE_BATCH_SIZE) {
const batch = reposMissingShards.slice(i, i + STALE_REPO_UPDATE_BATCH_SIZE);
await this.db.repo.updateMany({
where: {
id: {
in: batch.map(repo => repo.id),
},
},
data: {
indexedAt: null,
indexedCommitHash: null,
latestIndexingJobStatus: RepoIndexingJobStatus.PENDING,
},
});
}

await this.createJobs(reposMissingShards, RepoIndexingJobType.INDEX);
}

public async dispose() {
if (this.interval) {
clearInterval(this.interval);
Expand Down