|
1 | 1 | import * as Sentry from '@sentry/node'; |
2 | 2 | import { PrismaClient, Repo, RepoIndexingJobStatus, RepoIndexingJobType } from "@sourcebot/db"; |
3 | | -import { createLogger, env, getRepoPath, Logger, RepoIndexingJobMetadata, repoIndexingJobMetadataSchema, RepoMetadata, repoMetadataSchema } from "@sourcebot/shared"; |
| 3 | +import { createLogger, env, getRepoPath, Logger, getRepoIdFromPath, RepoIndexingJobMetadata, repoIndexingJobMetadataSchema, RepoMetadata, repoMetadataSchema } from "@sourcebot/shared"; |
4 | 4 | import { DelayedError, Job, Queue, Worker } from "bullmq"; |
5 | 5 | import { existsSync } from 'fs'; |
6 | 6 | import { readdir, rm } from 'fs/promises'; |
7 | 7 | import { Redis } from 'ioredis'; |
8 | 8 | import micromatch from 'micromatch'; |
9 | 9 | import Redlock, { ExecutionError } from 'redlock'; |
10 | | -import { INDEX_CACHE_DIR, WORKER_STOP_GRACEFUL_TIMEOUT_MS } from './constants.js'; |
| 10 | +import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, WORKER_STOP_GRACEFUL_TIMEOUT_MS } from './constants.js'; |
11 | 11 | import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getLatestCommitTimestamp, getLocalDefaultBranch, getTags, isPathAValidGitRepoRoot, isRepoEmpty, unsetGitConfig, upsertGitConfig } from './git.js'; |
12 | 12 | import { captureEvent } from './posthog.js'; |
13 | 13 | import { PromClient } from './promClient.js'; |
14 | 14 | import { RepoWithConnections, Settings } from "./types.js"; |
15 | | -import { getAuthCredentialsForRepo, getShardPrefix, measure, setIntervalAsync } from './utils.js'; |
| 15 | +import { getAuthCredentialsForRepo, getRepoIdFromShardFileName, getShardPrefix, measure, setIntervalAsync } from './utils.js'; |
16 | 16 | import { cleanupTempShards, indexGitRepository } from './zoekt.js'; |
17 | 17 |
|
18 | 18 | const LOG_TAG = 'repo-index-manager'; |
@@ -96,8 +96,10 @@ export class RepoIndexManager { |
96 | 96 | }); |
97 | 97 | } |
98 | 98 |
|
99 | | - public startScheduler() { |
| 99 | + public async startScheduler() { |
100 | 100 | logger.debug('Starting scheduler'); |
| 101 | + // Cleanup any orphaned disk resources on startup |
| 102 | + await this.cleanupOrphanedDiskResources(); |
101 | 103 | this.interval = setIntervalAsync(async () => { |
102 | 104 | await this.scheduleIndexJobs(); |
103 | 105 | await this.scheduleCleanupJobs(); |
@@ -637,6 +639,71 @@ export class RepoIndexManager { |
637 | 639 | } |
638 | 640 | } |
639 | 641 |
|
| 642 | + // Scans the repos and index directories on disk and removes any entries |
| 643 | + // that have no corresponding Repo record in the database. This handles |
| 644 | + // edge cases where the DB and disk resources are out of sync. |
| 645 | + private async cleanupOrphanedDiskResources() { |
| 646 | + // --- Repo directories --- |
| 647 | + // Dirs are named by repoId: DATA_CACHE_DIR/repos/<repoId>/ |
| 648 | + if (existsSync(REPOS_CACHE_DIR)) { |
| 649 | + const entries = await readdir(REPOS_CACHE_DIR); |
| 650 | + const repoIdToPath = new Map<number, string>(); |
| 651 | + for (const entry of entries) { |
| 652 | + const repoPath = `${REPOS_CACHE_DIR}/${entry}`; |
| 653 | + const repoId = getRepoIdFromPath(repoPath); |
| 654 | + if (repoId !== undefined) { |
| 655 | + repoIdToPath.set(repoId, repoPath); |
| 656 | + } |
| 657 | + } |
| 658 | + |
| 659 | + if (repoIdToPath.size > 0) { |
| 660 | + const existingRepos = await this.db.repo.findMany({ |
| 661 | + where: { id: { in: [...repoIdToPath.keys()] } }, |
| 662 | + select: { id: true }, |
| 663 | + }); |
| 664 | + const existingIds = new Set(existingRepos.map(r => r.id)); |
| 665 | + for (const [repoId, repoPath] of repoIdToPath) { |
| 666 | + if (!existingIds.has(repoId)) { |
| 667 | + logger.info(`Removing orphaned repo directory with no DB record: ${repoPath}`); |
| 668 | + await rm(repoPath, { recursive: true, force: true }); |
| 669 | + } |
| 670 | + } |
| 671 | + } |
| 672 | + } |
| 673 | + |
| 674 | + // --- Index shards --- |
| 675 | + // Shard files are prefixed with <orgId>_<repoId>: DATA_CACHE_DIR/index/<orgId>_<repoId>_*.zoekt |
| 676 | + if (existsSync(INDEX_CACHE_DIR)) { |
| 677 | + const entries = await readdir(INDEX_CACHE_DIR); |
| 678 | + const repoIdToShards = new Map<number, string[]>(); |
| 679 | + for (const entry of entries) { |
| 680 | + const repoId = getRepoIdFromShardFileName(entry); |
| 681 | + if (repoId !== undefined) { |
| 682 | + const shards = repoIdToShards.get(repoId) ?? []; |
| 683 | + shards.push(entry); |
| 684 | + repoIdToShards.set(repoId, shards); |
| 685 | + } |
| 686 | + } |
| 687 | + |
| 688 | + if (repoIdToShards.size > 0) { |
| 689 | + const existingRepos = await this.db.repo.findMany({ |
| 690 | + where: { id: { in: [...repoIdToShards.keys()] } }, |
| 691 | + select: { id: true }, |
| 692 | + }); |
| 693 | + const existingIds = new Set(existingRepos.map(r => r.id)); |
| 694 | + for (const [repoId, shards] of repoIdToShards) { |
| 695 | + if (!existingIds.has(repoId)) { |
| 696 | + for (const entry of shards) { |
| 697 | + const shardPath = `${INDEX_CACHE_DIR}/${entry}`; |
| 698 | + logger.info(`Removing orphaned index shard with no DB record: ${shardPath}`); |
| 699 | + await rm(shardPath, { force: true }); |
| 700 | + } |
| 701 | + } |
| 702 | + } |
| 703 | + } |
| 704 | + } |
| 705 | + } |
| 706 | + |
640 | 707 | public async dispose() { |
641 | 708 | if (this.interval) { |
642 | 709 | clearInterval(this.interval); |
|
0 commit comments