Skip to content

Commit 2f6f2ac

Browse files
move config syncing into backend
1 parent d4a99de commit 2f6f2ac

File tree

17 files changed

+240
-213
lines changed

17 files changed

+240
-213
lines changed

LICENSE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ Copyright (c) 2025 Taqla Inc.
22

33
Portions of this software are licensed as follows:
44

5-
- All content that resides under the "ee/", "packages/web/src/ee/", "packages/backend/src/ee/", and "packages/shared/src/ee/" directories of this repository, if these directories exist, is licensed under the license defined in "ee/LICENSE".
5+
- All content that resides under the "ee/", "packages/web/src/ee/" and "packages/backend/src/ee/" directories of this repository, if these directories exist, is licensed under the license defined in "ee/LICENSE".
66
- All third party components incorporated into the Sourcebot Software are licensed under the original license provided by the owner of the applicable component.
77
- Content outside of the above mentioned directories or restrictions above is available under the "Functional Source License" as defined below.
88

packages/backend/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
"argparse": "^2.0.1",
4141
"azure-devops-node-api": "^15.1.1",
4242
"bullmq": "^5.34.10",
43+
"chokidar": "^4.0.3",
4344
"cross-fetch": "^4.0.0",
4445
"dotenv": "^16.4.5",
4546
"express": "^4.21.2",
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import { Prisma, PrismaClient } from "@sourcebot/db";
2+
import { createLogger } from "@sourcebot/logger";
3+
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
4+
import { loadConfig } from "@sourcebot/shared";
5+
import chokidar, { FSWatcher } from 'chokidar';
6+
import { ConnectionManager } from "./connectionManager.js";
7+
import { SINGLE_TENANT_ORG_ID } from "./constants.js";
8+
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
9+
10+
const logger = createLogger('config-manager');
11+
12+
export class ConfigManager {
13+
private watcher: FSWatcher;
14+
15+
constructor(
16+
private db: PrismaClient,
17+
private connectionManager: ConnectionManager,
18+
configPath: string,
19+
) {
20+
this.watcher = chokidar.watch(configPath, {
21+
ignoreInitial: true, // Don't fire events for existing files
22+
awaitWriteFinish: {
23+
stabilityThreshold: 100, // File size stable for 100ms
24+
pollInterval: 100 // Check every 100ms
25+
},
26+
atomic: true // Handle atomic writes (temp file + rename)
27+
});
28+
29+
this.watcher.on('change', async () => {
30+
logger.info(`Config file ${configPath} changed. Syncing config.`);
31+
try {
32+
await this.syncConfig(configPath);
33+
} catch (error) {
34+
logger.error(`Failed to sync config: ${error}`);
35+
}
36+
});
37+
38+
this.syncConfig(configPath);
39+
}
40+
41+
private syncConfig = async (configPath: string) => {
42+
const config = await loadConfig(configPath);
43+
44+
await this.syncConnections(config.connections);
45+
await syncSearchContexts({
46+
contexts: config.contexts,
47+
orgId: SINGLE_TENANT_ORG_ID,
48+
db: this.db,
49+
});
50+
}
51+
52+
private syncConnections = async (connections?: { [key: string]: ConnectionConfig }) => {
53+
if (connections) {
54+
for (const [key, newConnectionConfig] of Object.entries(connections)) {
55+
const existingConnection = await this.db.connection.findUnique({
56+
where: {
57+
name_orgId: {
58+
name: key,
59+
orgId: SINGLE_TENANT_ORG_ID,
60+
}
61+
}
62+
});
63+
64+
65+
const existingConnectionConfig = existingConnection ? existingConnection.config as unknown as ConnectionConfig : undefined;
66+
const connectionNeedsSyncing =
67+
!existingConnection ||
68+
(JSON.stringify(existingConnectionConfig) !== JSON.stringify(newConnectionConfig));
69+
70+
// Either update the existing connection or create a new one.
71+
const connection = existingConnection ?
72+
await this.db.connection.update({
73+
where: {
74+
id: existingConnection.id,
75+
},
76+
data: {
77+
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
78+
isDeclarative: true,
79+
}
80+
}) :
81+
await this.db.connection.create({
82+
data: {
83+
name: key,
84+
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
85+
connectionType: newConnectionConfig.type,
86+
isDeclarative: true,
87+
org: {
88+
connect: {
89+
id: SINGLE_TENANT_ORG_ID,
90+
}
91+
}
92+
}
93+
});
94+
95+
if (connectionNeedsSyncing) {
96+
const [jobId] = await this.connectionManager.createJobs([connection]);
97+
logger.info(`Change detected for connection '${key}' (id: ${connection.id}). Created sync job ${jobId}.`);
98+
}
99+
}
100+
}
101+
102+
// Delete any connections that are no longer in the config.
103+
const deletedConnections = await this.db.connection.findMany({
104+
where: {
105+
isDeclarative: true,
106+
name: {
107+
notIn: Object.keys(connections ?? {}),
108+
},
109+
orgId: SINGLE_TENANT_ORG_ID,
110+
}
111+
});
112+
113+
for (const connection of deletedConnections) {
114+
logger.info(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
115+
await this.db.connection.delete({
116+
where: {
117+
id: connection.id,
118+
}
119+
})
120+
}
121+
}
122+
123+
public dispose = async () => {
124+
await this.watcher.close();
125+
}
126+
}

packages/backend/src/connectionManager.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ import * as Sentry from "@sentry/node";
22
import { Connection, ConnectionSyncJobStatus, PrismaClient } from "@sourcebot/db";
33
import { createLogger } from "@sourcebot/logger";
44
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
5-
import { loadConfig, syncSearchContexts } from "@sourcebot/shared";
5+
import { loadConfig } from "@sourcebot/shared";
66
import { Job, Queue, ReservedJob, Worker } from "groupmq";
77
import { Redis } from 'ioredis';
88
import { env } from "./env.js";
99
import { compileAzureDevOpsConfig, compileBitbucketConfig, compileGenericGitHostConfig, compileGerritConfig, compileGiteaConfig, compileGithubConfig, compileGitlabConfig } from "./repoCompileUtils.js";
1010
import { Settings } from "./types.js";
1111
import { groupmqLifecycleExceptionWrapper } from "./utils.js";
12+
import { syncSearchContexts } from "./ee/syncSearchContexts.js";
13+
import { captureEvent } from "./posthog.js";
1214

1315
const LOG_TAG = 'connection-manager';
1416
const logger = createLogger(LOG_TAG);
@@ -136,6 +138,8 @@ export class ConnectionManager {
136138
jobId: job.id,
137139
});
138140
}
141+
142+
return jobs.map(job => job.id);
139143
}
140144

141145
private async runJob(job: ReservedJob<JobPayload>): Promise<JobResult> {
@@ -297,10 +301,11 @@ export class ConnectionManager {
297301

298302
logger.info(`Connection sync job ${job.id} for connection ${job.data.connectionName} (id: ${job.data.connectionId}) completed`);
299303

300-
// captureEvent('backend_connection_sync_job_completed', {
301-
// connectionId: connectionId,
302-
// repoCount: result.repoCount,
303-
// });
304+
const result = job.returnvalue as JobResult;
305+
captureEvent('backend_connection_sync_job_completed', {
306+
connectionId: connectionId,
307+
repoCount: result.repoCount,
308+
});
304309
});
305310

306311
private onJobFailed = async (job: Job<JobPayload>) =>
@@ -332,10 +337,10 @@ export class ConnectionManager {
332337
logger.warn(`Failed job ${job.id} for connection ${connection.name} (id: ${connection.id}). Attempt ${attempt} / ${job.opts.attempts}. Retrying.`);
333338
}
334339

335-
// captureEvent('backend_connection_sync_job_failed', {
336-
// connectionId: connectionId,
337-
// error: err instanceof BackendException ? err.code : 'UNKNOWN',
338-
// });
340+
captureEvent('backend_connection_sync_job_failed', {
341+
connectionId: job.data.connectionId,
342+
error: job.failedReason,
343+
});
339344
});
340345

341346
private onJobStalled = async (jobId: string) =>
@@ -354,6 +359,11 @@ export class ConnectionManager {
354359
});
355360

356361
logger.error(`Job ${jobId} stalled for connection ${connection.name} (id: ${connection.id})`);
362+
363+
captureEvent('backend_connection_sync_job_failed', {
364+
connectionId: connection.id,
365+
error: 'Job stalled',
366+
});
357367
});
358368

359369
private async onWorkerError(error: Error) {

packages/backend/src/constants.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { env } from "./env.js";
22
import path from "path";
33

4+
export const SINGLE_TENANT_ORG_ID = 1;
5+
46
export const PERMISSION_SYNC_SUPPORTED_CODE_HOST_TYPES = [
57
'github',
68
];

packages/shared/src/ee/syncSearchContexts.ts renamed to packages/backend/src/ee/syncSearchContexts.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import micromatch from "micromatch";
22
import { createLogger } from "@sourcebot/logger";
33
import { PrismaClient } from "@sourcebot/db";
4-
import { getPlan, hasEntitlement } from "../entitlements.js";
5-
import { SOURCEBOT_SUPPORT_EMAIL } from "../constants.js";
4+
import { getPlan, hasEntitlement, SOURCEBOT_SUPPORT_EMAIL } from "@sourcebot/shared";
65
import { SearchContext } from "@sourcebot/schemas/v3/index.type";
76

87
const logger = createLogger('sync-search-contexts');

packages/backend/src/env.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ export const env = createEnv({
4747
DEBUG_ENABLE_GROUPMQ_LOGGING: booleanSchema.default('false'),
4848

4949
DATABASE_URL: z.string().url().default("postgresql://postgres:postgres@localhost:5432/postgres"),
50-
CONFIG_PATH: z.string().optional(),
50+
CONFIG_PATH: z.string(),
5151

5252
CONNECTION_MANAGER_UPSERT_TIMEOUT_MS: numberSchema.default(300000),
5353
REPO_SYNC_RETRY_BASE_SLEEP_SECONDS: numberSchema.default(60),
@@ -56,6 +56,8 @@ export const env = createEnv({
5656

5757
EXPERIMENT_EE_PERMISSION_SYNC_ENABLED: booleanSchema.default('false'),
5858
AUTH_EE_GITHUB_BASE_URL: z.string().optional(),
59+
60+
FORCE_ENABLE_ANONYMOUS_ACCESS: booleanSchema.default('false'),
5961
},
6062
runtimeEnv: process.env,
6163
emptyStringAsUndefined: true,

packages/backend/src/github.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ const getReposOwnedByUsers = async (users: string[], octokit: Octokit, signal: A
256256
const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSignal, url?: string) => {
257257
const results = await Promise.allSettled(orgs.map(async (org) => {
258258
try {
259-
logger.info(`Fetching repository info for org ${org}...`);
259+
logger.debug(`Fetching repository info for org ${org}...`);
260260

261261
const octokitToUse = await getOctokitWithGithubApp(octokit, org, url, `org ${org}`);
262262
const { durationMs, data } = await measure(async () => {
@@ -271,7 +271,7 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi
271271
return fetchWithRetry(fetchFn, `org ${org}`, logger);
272272
});
273273

274-
logger.info(`Found ${data.length} in org ${org} in ${durationMs}ms.`);
274+
logger.debug(`Found ${data.length} in org ${org} in ${durationMs}ms.`);
275275
return {
276276
type: 'valid' as const,
277277
data
@@ -305,7 +305,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
305305
const results = await Promise.allSettled(repoList.map(async (repo) => {
306306
try {
307307
const [owner, repoName] = repo.split('/');
308-
logger.info(`Fetching repository info for ${repo}...`);
308+
logger.debug(`Fetching repository info for ${repo}...`);
309309

310310
const octokitToUse = await getOctokitWithGithubApp(octokit, owner, url, `repo ${repo}`);
311311
const { durationMs, data: result } = await measure(async () => {
@@ -320,7 +320,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna
320320
return fetchWithRetry(fetchFn, repo, logger);
321321
});
322322

323-
logger.info(`Found info for repository ${repo} in ${durationMs}ms`);
323+
logger.debug(`Found info for repository ${repo} in ${durationMs}ms`);
324324
return {
325325
type: 'valid' as const,
326326
data: [result.data]

packages/backend/src/index.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@ import { getConfigSettings, hasEntitlement } from '@sourcebot/shared';
66
import { existsSync } from 'fs';
77
import { mkdir } from 'fs/promises';
88
import { Redis } from 'ioredis';
9+
import { ConfigManager } from "./configManager.js";
910
import { ConnectionManager } from './connectionManager.js';
1011
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
12+
import { GithubAppManager } from "./ee/githubAppManager.js";
1113
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
1214
import { UserPermissionSyncer } from "./ee/userPermissionSyncer.js";
13-
import { GithubAppManager } from "./ee/githubAppManager.js";
1415
import { env } from "./env.js";
15-
import { RepoIndexManager } from "./repoIndexManager.js";
1616
import { PromClient } from './promClient.js';
17+
import { RepoIndexManager } from "./repoIndexManager.js";
1718

1819

1920
const logger = createLogger('backend-entrypoint');
@@ -53,6 +54,7 @@ const connectionManager = new ConnectionManager(prisma, settings, redis);
5354
const repoPermissionSyncer = new RepoPermissionSyncer(prisma, settings, redis);
5455
const userPermissionSyncer = new UserPermissionSyncer(prisma, settings, redis);
5556
const repoIndexManager = new RepoIndexManager(prisma, settings, redis, promClient);
57+
const configManager = new ConfigManager(prisma, connectionManager, env.CONFIG_PATH);
5658

5759
connectionManager.startScheduler();
5860
repoIndexManager.startScheduler();
@@ -66,11 +68,13 @@ else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement(
6668
userPermissionSyncer.startScheduler();
6769
}
6870

71+
logger.info('Worker started.');
72+
6973
const cleanup = async (signal: string) => {
7074
logger.info(`Received ${signal}, cleaning up...`);
7175

7276
const shutdownTimeout = 30000; // 30 seconds
73-
77+
7478
try {
7579
await Promise.race([
7680
Promise.all([
@@ -79,8 +83,9 @@ const cleanup = async (signal: string) => {
7983
repoPermissionSyncer.dispose(),
8084
userPermissionSyncer.dispose(),
8185
promClient.dispose(),
86+
configManager.dispose(),
8287
]),
83-
new Promise((_, reject) =>
88+
new Promise((_, reject) =>
8489
setTimeout(() => reject(new Error('Shutdown timeout')), shutdownTimeout)
8590
)
8691
]);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
Warnings:
3+
4+
- You are about to drop the column `syncStatus` on the `Connection` table. All the data in the column will be lost.
5+
- You are about to drop the column `syncStatusMetadata` on the `Connection` table. All the data in the column will be lost.
6+
7+
*/
8+
-- CreateEnum
9+
CREATE TYPE "ConnectionSyncJobStatus" AS ENUM ('PENDING', 'IN_PROGRESS', 'COMPLETED', 'FAILED');
10+
11+
-- AlterTable
12+
ALTER TABLE "Connection" DROP COLUMN "syncStatus",
13+
DROP COLUMN "syncStatusMetadata";
14+
15+
-- CreateTable
16+
CREATE TABLE "ConnectionSyncJob" (
17+
"id" TEXT NOT NULL,
18+
"status" "ConnectionSyncJobStatus" NOT NULL DEFAULT 'PENDING',
19+
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
20+
"updatedAt" TIMESTAMP(3) NOT NULL,
21+
"completedAt" TIMESTAMP(3),
22+
"warningMessages" TEXT[],
23+
"errorMessage" TEXT,
24+
"connectionId" INTEGER NOT NULL,
25+
26+
CONSTRAINT "ConnectionSyncJob_pkey" PRIMARY KEY ("id")
27+
);
28+
29+
-- AddForeignKey
30+
ALTER TABLE "ConnectionSyncJob" ADD CONSTRAINT "ConnectionSyncJob_connectionId_fkey" FOREIGN KEY ("connectionId") REFERENCES "Connection"("id") ON DELETE CASCADE ON UPDATE CASCADE;

0 commit comments

Comments
 (0)