Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed incorrect shutdown of PostHog SDK in the worker. [#609](https://github.com/sourcebot-dev/sourcebot/pull/609)
- Fixed race condition in job schedulers. [#607](https://github.com/sourcebot-dev/sourcebot/pull/607)

### Added
- Added force resync buttons for connections and repositories. [#610](https://github.com/sourcebot-dev/sourcebot/pull/610)

## [4.9.1] - 2025-11-07

### Added
Expand Down
1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"@types/micromatch": "^4.0.9",
"@types/node": "^22.7.5",
"cross-env": "^7.0.3",
"express-async-errors": "^3.1.1",
Comment thread
brendan-kellam marked this conversation as resolved.
Outdated
"json-schema-to-typescript": "^15.0.4",
"tsc-watch": "^6.2.0",
"tsx": "^4.19.1",
Expand Down
103 changes: 103 additions & 0 deletions packages/backend/src/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { PrismaClient, RepoIndexingJobType } from '@sourcebot/db';
import { createLogger } from '@sourcebot/shared';
import express, { Request, Response } from 'express';
import 'express-async-errors';
import * as http from "http";
import z from 'zod';
import { ConnectionManager } from './connectionManager.js';
import { PromClient } from './promClient.js';
import { RepoIndexManager } from './repoIndexManager.js';

const logger = createLogger('api');
const PORT = 3060;
Comment thread
brendan-kellam marked this conversation as resolved.

export class Api {
private server: http.Server;

constructor(
promClient: PromClient,
private prisma: PrismaClient,
private connectionManager: ConnectionManager,
private repoIndexManager: RepoIndexManager,
) {
const app = express();
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

// Prometheus metrics endpoint
app.use('/metrics', async (_req: Request, res: Response) => {
res.set('Content-Type', promClient.registry.contentType);
const metrics = await promClient.registry.metrics();
res.end(metrics);
});

app.post('/api/sync-connection', this.syncConnection.bind(this));
app.post('/api/index-repo', this.indexRepo.bind(this));
Comment thread
brendan-kellam marked this conversation as resolved.

this.server = app.listen(PORT, () => {
logger.info(`API server is running on port ${PORT}`);
});
}

private async syncConnection(req: Request, res: Response) {
const schema = z.object({
connectionId: z.number(),
}).strict();

const parsed = schema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.message });
return;
}

const { connectionId } = parsed.data;
const connection = await this.prisma.connection.findUnique({
where: {
id: connectionId,
}
});

if (!connection) {
res.status(404).json({ error: 'Connection not found' });
return;
}

const [jobId] = await this.connectionManager.createJobs([connection]);

res.status(200).json({ jobId });
}

private async indexRepo(req: Request, res: Response) {
const schema = z.object({
repoId: z.number(),
}).strict();

const parsed = schema.safeParse(req.body);
if (!parsed.success) {
res.status(400).json({ error: parsed.error.message });
return;
}

const { repoId } = parsed.data;
const repo = await this.prisma.repo.findUnique({
where: { id: repoId },
});

if (!repo) {
res.status(404).json({ error: 'Repo not found' });
return;
}

const [jobId] = await this.repoIndexManager.createJobs([repo], RepoIndexingJobType.INDEX);
res.status(200).json({ jobId });
}

public async dispose() {
return new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve(undefined);
});
});
}
}
19 changes: 13 additions & 6 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import "./instrument.js";

import { PrismaClient } from "@sourcebot/db";
import { createLogger } from "@sourcebot/shared";
import { env, getConfigSettings, hasEntitlement, getDBConnectionString } from '@sourcebot/shared';
import { createLogger, env, getConfigSettings, getDBConnectionString, hasEntitlement } from "@sourcebot/shared";
import 'express-async-errors';
import { existsSync } from 'fs';
import { mkdir } from 'fs/promises';
import { Redis } from 'ioredis';
import { Api } from "./api.js";
import { ConfigManager } from "./configManager.js";
import { ConnectionManager } from './connectionManager.js';
import { INDEX_CACHE_DIR, REPOS_CACHE_DIR } from './constants.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { GithubAppManager } from "./ee/githubAppManager.js";
import { RepoPermissionSyncer } from './ee/repoPermissionSyncer.js';
import { AccountPermissionSyncer } from "./ee/accountPermissionSyncer.js";
import { shutdownPosthog } from "./posthog.js";
import { PromClient } from './promClient.js';
import { RepoIndexManager } from "./repoIndexManager.js";
import { shutdownPosthog } from "./posthog.js";


const logger = createLogger('backend-entrypoint');

Expand Down Expand Up @@ -74,6 +74,13 @@ else if (env.EXPERIMENT_EE_PERMISSION_SYNC_ENABLED === 'true' && hasEntitlement(
accountPermissionSyncer.startScheduler();
}

const api = new Api(
promClient,
prisma,
connectionManager,
repoIndexManager,
);

logger.info('Worker started.');

const cleanup = async (signal: string) => {
Expand All @@ -88,7 +95,6 @@ const cleanup = async (signal: string) => {
connectionManager.dispose(),
repoPermissionSyncer.dispose(),
accountPermissionSyncer.dispose(),
promClient.dispose(),
configManager.dispose(),
]),
new Promise((_, reject) =>
Expand All @@ -102,6 +108,7 @@ const cleanup = async (signal: string) => {

await prisma.$disconnect();
await redis.quit();
await api.dispose();
await shutdownPosthog();
}

Expand Down
33 changes: 1 addition & 32 deletions packages/backend/src/promClient.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import express, { Request, Response } from 'express';
import { Server } from 'http';
import client, { Registry, Counter, Gauge } from 'prom-client';
import { createLogger } from "@sourcebot/shared";

const logger = createLogger('prometheus-client');

export class PromClient {
private registry: Registry;
private app: express.Application;
private server: Server;
public registry: Registry;

public activeRepoIndexJobs: Gauge<string>;
public pendingRepoIndexJobs: Gauge<string>;
Expand All @@ -22,8 +14,6 @@ export class PromClient {
public connectionSyncJobFailTotal: Counter<string>;
public connectionSyncJobSuccessTotal: Counter<string>;

public readonly PORT = 3060;

constructor() {
this.registry = new Registry();

Expand Down Expand Up @@ -100,26 +90,5 @@ export class PromClient {
client.collectDefaultMetrics({
register: this.registry,
});

this.app = express();
this.app.get('/metrics', async (req: Request, res: Response) => {
res.set('Content-Type', this.registry.contentType);

const metrics = await this.registry.metrics();
res.end(metrics);
});

this.server = this.app.listen(this.PORT, () => {
logger.info(`Prometheus metrics server is running on port ${this.PORT}`);
});
}

async dispose() {
return new Promise<void>((resolve, reject) => {
this.server.close((err) => {
if (err) reject(err);
else resolve();
});
});
}
}
4 changes: 3 additions & 1 deletion packages/backend/src/repoIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class RepoIndexManager {
}
}

private async createJobs(repos: Repo[], type: RepoIndexingJobType) {
public async createJobs(repos: Repo[], type: RepoIndexingJobType) {
// @note: we don't perform this in a transaction because
// we want to avoid the situation where a job is created and run
// prior to the transaction being committed.
Expand Down Expand Up @@ -221,6 +221,8 @@ export class RepoIndexManager {
const jobTypeLabel = getJobTypePrometheusLabel(type);
this.promClient.pendingRepoIndexJobs.inc({ repo: job.repo.name, type: jobTypeLabel });
}

return jobs.map(job => job.id);
}

private async runJob(job: ReservedJob<JobPayload>) {
Expand Down
14 changes: 12 additions & 2 deletions packages/web/src/app/[domain]/repos/[id]/page.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { sew } from "@/actions"
import { getCurrentUserRole, sew } from "@/actions"
import { Badge } from "@/components/ui/badge"
import { Button } from "@/components/ui/button"
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"
Expand All @@ -19,6 +19,7 @@ import { BackButton } from "../../components/backButton"
import { DisplayDate } from "../../components/DisplayDate"
import { RepoBranchesTable } from "../components/repoBranchesTable"
import { RepoJobsTable } from "../components/repoJobsTable"
import { OrgRole } from "@sourcebot/db"

export default async function RepoDetailPage({ params }: { params: Promise<{ id: string }> }) {
const { id } = await params
Expand Down Expand Up @@ -51,6 +52,11 @@ export default async function RepoDetailPage({ params }: { params: Promise<{ id:

const repoMetadata = repoMetadataSchema.parse(repo.metadata);

const userRole = await getCurrentUserRole(SINGLE_TENANT_ORG_DOMAIN);
if (isServiceError(userRole)) {
throw new ServiceErrorException(userRole);
}

return (
<>
<div className="mb-6">
Expand Down Expand Up @@ -172,7 +178,11 @@ export default async function RepoDetailPage({ params }: { params: Promise<{ id:
</CardHeader>
<CardContent>
<Suspense fallback={<Skeleton className="h-96 w-full" />}>
<RepoJobsTable data={repo.jobs} />
<RepoJobsTable
data={repo.jobs}
repoId={repo.id}
isIndexButtonVisible={userRole === OrgRole.OWNER}
/>
</Suspense>
</CardContent>
</Card>
Expand Down
Loading
Loading