|
1 | 1 | import * as Sentry from '@sentry/node'; |
2 | 2 | import { PrismaClient, Repo, RepoIndexingJobStatus, RepoIndexingJobType } from "@sourcebot/db"; |
3 | | -import { createLogger, env, getRepoPath, Logger, RepoIndexingJobMetadata, repoIndexingJobMetadataSchema, RepoMetadata, repoMetadataSchema } from "@sourcebot/shared"; |
| 3 | +import { createLogger, env, getRepoPath, Logger, getRepoIdFromPath, RepoIndexingJobMetadata, repoIndexingJobMetadataSchema, RepoMetadata, repoMetadataSchema } from "@sourcebot/shared"; |
4 | 4 | import { DelayedError, Job, Queue, Worker } from "bullmq"; |
5 | 5 | import { existsSync } from 'fs'; |
6 | 6 | import { readdir, rm } from 'fs/promises'; |
7 | 7 | import { Redis } from 'ioredis'; |
8 | 8 | import micromatch from 'micromatch'; |
9 | 9 | import Redlock, { ExecutionError } from 'redlock'; |
10 | | -import { INDEX_CACHE_DIR, WORKER_STOP_GRACEFUL_TIMEOUT_MS } from './constants.js'; |
| 10 | +import { INDEX_CACHE_DIR, REPOS_CACHE_DIR, WORKER_STOP_GRACEFUL_TIMEOUT_MS } from './constants.js'; |
11 | 11 | import { cloneRepository, fetchRepository, getBranches, getCommitHashForRefName, getLatestCommitTimestamp, getLocalDefaultBranch, getTags, isPathAValidGitRepoRoot, isRepoEmpty, unsetGitConfig, upsertGitConfig } from './git.js'; |
12 | 12 | import { captureEvent } from './posthog.js'; |
13 | 13 | import { PromClient } from './promClient.js'; |
14 | 14 | import { RepoWithConnections, Settings } from "./types.js"; |
15 | | -import { getAuthCredentialsForRepo, getShardPrefix, measure, setIntervalAsync } from './utils.js'; |
| 15 | +import { getAuthCredentialsForRepo, getRepoIdFromShardFileName, getShardPrefix, measure, setIntervalAsync } from './utils.js'; |
16 | 16 | import { cleanupTempShards, indexGitRepository } from './zoekt.js'; |
17 | 17 |
|
18 | 18 | const LOG_TAG = 'repo-index-manager'; |
@@ -170,6 +170,8 @@ export class RepoIndexManager { |
170 | 170 | } |
171 | 171 |
|
172 | 172 | private async scheduleCleanupJobs() { |
| 173 | + await this.cleanupOrphanedDiskResources(); |
| 174 | + |
173 | 175 | const gcGracePeriodMs = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs); |
174 | 176 | const timeoutDate = new Date(Date.now() - this.settings.repoIndexTimeoutMs); |
175 | 177 |
|
@@ -637,6 +639,48 @@ export class RepoIndexManager { |
637 | 639 | } |
638 | 640 | } |
639 | 641 |
|
| 642 | + // Scans the repos and index directories on disk and removes any entries |
| 643 | + // that have no corresponding Repo record in the database. This handles |
| 644 | + // edge cases where the DB and disk resources are out of sync. |
| 645 | + private async cleanupOrphanedDiskResources() { |
| 646 | + // --- Repo directories --- |
| 647 | + // Dirs are named by repoId: DATA_CACHE_DIR/repos/<repoId>/ |
| 648 | + if (existsSync(REPOS_CACHE_DIR)) { |
| 649 | + const entries = await readdir(REPOS_CACHE_DIR); |
| 650 | + for (const entry of entries) { |
| 651 | + const repoPath = `${REPOS_CACHE_DIR}/${entry}`; |
| 652 | + const repoId = getRepoIdFromPath(repoPath); |
| 653 | + if (repoId === undefined) { |
| 654 | + continue; |
| 655 | + } |
| 656 | + |
| 657 | + const repo = await this.db.repo.findUnique({ where: { id: repoId } }); |
| 658 | + if (!repo) { |
| 659 | + logger.info(`Removing orphaned repo directory with no DB record: ${repoPath}`); |
| 660 | + await rm(repoPath, { recursive: true, force: true }); |
| 661 | + } |
| 662 | + } |
| 663 | + } |
| 664 | + |
| 665 | + // --- Index shards --- |
| 666 | + // Shard files are prefixed with <orgId>_<repoId>: DATA_CACHE_DIR/index/<orgId>_<repoId>_*.zoekt |
| 667 | + if (existsSync(INDEX_CACHE_DIR)) { |
| 668 | + const entries = await readdir(INDEX_CACHE_DIR); |
| 669 | + for (const entry of entries) { |
| 670 | + const repoId = getRepoIdFromShardFileName(entry); |
| 671 | + if (repoId === undefined) { |
| 672 | + continue; |
| 673 | + } |
| 674 | + const repo = await this.db.repo.findUnique({ where: { id: repoId } }); |
| 675 | + if (!repo) { |
| 676 | + const shardPath = `${INDEX_CACHE_DIR}/${entry}`; |
| 677 | + logger.info(`Removing orphaned index shard with no DB record: ${shardPath}`); |
| 678 | + await rm(shardPath, { force: true }); |
| 679 | + } |
| 680 | + } |
| 681 | + } |
| 682 | + } |
| 683 | + |
640 | 684 | public async dispose() { |
641 | 685 | if (this.interval) { |
642 | 686 | clearInterval(this.interval); |
|
0 commit comments