Skip to content

Commit dd64739

Browse files
fix(worker): re-index repos whose zoekt shards are missing from disk
When the index directory is lost (e.g., it lives on ephemeral storage in a k8s deployment) while the database still marks repos as indexed, search silently returns empty results and nothing triggers a rebuild until the reindex interval elapses. Add a reconciliation step that runs on scheduler startup and on every scheduler poll: repos marked as indexed in the DB that have no index shards on disk get their indexedAt cleared, so the existing scheduler re-indexes them with its usual dedup and backoff guards.
1 parent 1387c46 commit dd64739

2 files changed

Lines changed: 193 additions & 0 deletions

File tree

packages/backend/src/repoIndexManager.test.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ vi.mock('@sourcebot/shared', () => ({
3737
vi.mock('./constants.js', () => ({
3838
WORKER_STOP_GRACEFUL_TIMEOUT_MS: 5000,
3939
INDEX_CACHE_DIR: 'test-data/index',
40+
REPOS_CACHE_DIR: 'test-data/repos',
4041
}));
4142

4243
vi.mock('./git.js', () => ({
@@ -65,6 +66,13 @@ vi.mock('./posthog.js', () => ({
6566
vi.mock('./utils.js', () => ({
6667
getAuthCredentialsForRepo: vi.fn().mockResolvedValue(null),
6768
getShardPrefix: vi.fn((orgId: number, repoId: number) => `${orgId}_${repoId}`),
69+
getRepoIdFromShardFileName: vi.fn((fileName: string) => {
70+
const match = fileName.match(/^(\d+)_(\d+)_/);
71+
if (!match) {
72+
return undefined;
73+
}
74+
return parseInt(match[2], 10);
75+
}),
6876
measure: vi.fn(async (cb: () => Promise<unknown>) => {
6977
const data = await cb();
7078
return { data, durationMs: 100 };
@@ -148,6 +156,7 @@ const createMockPrisma = () => {
148156
repo: {
149157
findMany: vi.fn().mockResolvedValue([]),
150158
update: vi.fn(),
159+
updateMany: vi.fn(),
151160
delete: vi.fn(),
152161
},
153162
repoIndexingJob: {
@@ -783,6 +792,128 @@ describe('RepoIndexManager', () => {
783792
});
784793
});
785794

795+
describe('Missing Shard Reconciliation', () => {
796+
const indexedRepo = (id: number, name: string) => createMockRepo({
797+
id,
798+
name,
799+
indexedAt: new Date(),
800+
indexedCommitHash: 'abc123',
801+
});
802+
803+
test('clears indexedAt for indexed repos whose shard files are missing on startup', async () => {
804+
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
805+
// Repo 1 has a shard on disk; repo 2 does not.
806+
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt']);
807+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
808+
indexedRepo(1, 'repo-with-shard'),
809+
indexedRepo(2, 'repo-missing-shard'),
810+
]);
811+
812+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
813+
await manager.startScheduler();
814+
815+
expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
816+
where: { id: { in: [2] } },
817+
data: { indexedAt: null },
818+
});
819+
});
820+
821+
test('does not touch repos when all shards are present', async () => {
822+
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
823+
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt', '1_2_v16.00000.zoekt']);
824+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
825+
indexedRepo(1, 'repo-1'),
826+
indexedRepo(2, 'repo-2'),
827+
]);
828+
829+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
830+
await manager.startScheduler();
831+
832+
expect(mockPrisma.repo.updateMany).not.toHaveBeenCalled();
833+
});
834+
835+
test('marks all indexed repos as stale when the index directory is missing', async () => {
836+
(existsSync as Mock).mockReturnValue(false);
837+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
838+
indexedRepo(1, 'repo-1'),
839+
indexedRepo(2, 'repo-2'),
840+
]);
841+
842+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
843+
await manager.startScheduler();
844+
845+
expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
846+
where: { id: { in: [1, 2] } },
847+
data: { indexedAt: null },
848+
});
849+
});
850+
851+
test('does not count temporary shard files as valid shards', async () => {
852+
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
853+
(readdir as Mock).mockResolvedValue(['1_2_v16.00000.zoekt123.tmp']);
854+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
855+
indexedRepo(2, 'repo-with-only-tmp-shard'),
856+
]);
857+
858+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
859+
await manager.startScheduler();
860+
861+
expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
862+
where: { id: { in: [2] } },
863+
data: { indexedAt: null },
864+
});
865+
});
866+
867+
test('only considers repos that are indexed, non-empty, and connected', async () => {
868+
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
869+
(readdir as Mock).mockResolvedValue([]);
870+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([]);
871+
872+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
873+
await manager.startScheduler();
874+
875+
// The reconciliation query must exclude unindexed repos (nothing to mark),
876+
// empty repos (indexing completes without producing a shard), and
877+
// unconnected repos (clearing indexedAt would bypass the GC grace period).
878+
expect(mockPrisma.repo.findMany).toHaveBeenCalledWith(
879+
expect.objectContaining({
880+
where: expect.objectContaining({
881+
indexedAt: { not: null },
882+
indexedCommitHash: { not: null },
883+
connections: { some: {} },
884+
}),
885+
})
886+
);
887+
888+
expect(mockPrisma.repo.updateMany).not.toHaveBeenCalled();
889+
});
890+
891+
test('reconciles on every scheduler poll, not just startup', async () => {
892+
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
893+
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt']);
894+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([]);
895+
896+
manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
897+
await manager.startScheduler();
898+
899+
// Simulate the index directory being wiped while the worker is running,
900+
// with repo 1 still marked as indexed in the DB.
901+
(readdir as Mock).mockResolvedValue([]);
902+
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
903+
indexedRepo(1, 'repo-1'),
904+
]);
905+
906+
const { setIntervalAsync } = await import('./utils.js');
907+
const tick = (setIntervalAsync as Mock).mock.calls[0][0];
908+
await tick();
909+
910+
expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
911+
where: { id: { in: [1] } },
912+
data: { indexedAt: null },
913+
});
914+
});
915+
});
916+
786917
describe('latestIndexingJobStatus Updates', () => {
787918
test('sets latestIndexingJobStatus to IN_PROGRESS when job starts', async () => {
788919
const repo = createMockRepoWithConnections();

packages/backend/src/repoIndexManager.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ export class RepoIndexManager {
100100
logger.debug('Starting scheduler');
101101
// Cleanup any orphaned disk resources on startup
102102
await this.cleanupOrphanedDiskResources();
103+
await this.markReposWithMissingShardsAsStale();
103104
this.interval = setIntervalAsync(async () => {
105+
await this.markReposWithMissingShardsAsStale();
104106
await this.scheduleIndexJobs();
105107
await this.scheduleCleanupJobs();
106108
}, this.settings.reindexRepoPollingIntervalMs);
@@ -682,6 +684,66 @@ export class RepoIndexManager {
682684
}
683685
}
684686

687+
// Detects repos that are marked as indexed in the database but have no
688+
// index shards on disk (e.g., because the index directory lives on
689+
// ephemeral storage and was lost), and clears their `indexedAt` so the
690+
// scheduler re-indexes them. This is the inverse of
691+
// `cleanupOrphanedDiskResources`.
692+
private async markReposWithMissingShardsAsStale() {
693+
// @note: the DB is queried *before* the disk is scanned so that a repo
694+
// whose first index job completes between the two reads is not falsely
695+
// marked as stale (its shard is guaranteed to be visible by the time it
696+
// appears in the query result).
697+
//
698+
// Empty repositories are excluded (via `indexedCommitHash`) since they
699+
// complete indexing without producing a shard. Unconnected repos are
700+
// excluded since clearing `indexedAt` would bypass the garbage
701+
// collection grace period in `scheduleCleanupJobs`.
702+
const indexedRepos = await this.db.repo.findMany({
703+
where: {
704+
indexedAt: { not: null },
705+
indexedCommitHash: { not: null },
706+
connections: { some: {} },
707+
},
708+
select: {
709+
id: true,
710+
name: true,
711+
},
712+
});
713+
714+
if (indexedRepos.length === 0) {
715+
return;
716+
}
717+
718+
const repoIdsWithShards = new Set<number>();
719+
if (existsSync(INDEX_CACHE_DIR)) {
720+
const entries = await readdir(INDEX_CACHE_DIR);
721+
for (const entry of entries) {
722+
// Ignore temporary files (e.g., `.tmp` files from in-flight or
723+
// failed indexing operations) - only completed shards count.
724+
if (!entry.endsWith('.zoekt')) {
725+
continue;
726+
}
727+
const repoId = getRepoIdFromShardFileName(entry);
728+
if (repoId !== undefined) {
729+
repoIdsWithShards.add(repoId);
730+
}
731+
}
732+
}
733+
734+
const staleRepos = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
735+
if (staleRepos.length === 0) {
736+
return;
737+
}
738+
739+
logger.warn(`Found ${staleRepos.length} repo(s) marked as indexed but with no index shards on disk. Marking as stale for re-indexing: ${staleRepos.map(repo => repo.name).join(', ')}`);
740+
741+
await this.db.repo.updateMany({
742+
where: { id: { in: staleRepos.map(repo => repo.id) } },
743+
data: { indexedAt: null },
744+
});
745+
}
746+
685747
// Scans the repos and index directories on disk and removes any entries
686748
// that have no corresponding Repo record in the database. This handles
687749
// edge cases where the DB and disk resources are out of sync.

0 commit comments

Comments
 (0)