Skip to content

Commit 8832f8a

Browse files
feat: Trigger.dev index-repo task with R2 storage (#88)
* feat: Trigger.dev index-repo task with R2 storage and webhook trigger - Scaffold apps/trigger with Trigger.dev SDK v4 (#81) - GitHub App JWT generation, installation tokens, tarball download (#82) - R2 upload helpers: tarball, file-tree.json, individual files (#83) - DrizzleSyncStorage: Merkle state via Drizzle/Neon (#84) - index-repo task: walk → chunk → embed → upsert → Merkle persist → R2 (#85) - Wire webhook to trigger index-repo after repo upsert (#86) - 170 tests passing (153 core + 17 web) * fix: address code review findings - Fix SHA extraction: use redirect:manual to capture SHA from Location URL - Bulk chunk cache inserts instead of sequential per-chunk DB round-trips - Remove redundant hashFiles call — reuse fileHashMap from computeChanges - Return 500 on webhook errors so GitHub retries delivery - Reuse S3Client singleton instead of creating one per file upload * style: fix formatting in index-repo.ts
1 parent 6912ab9 commit 8832f8a

11 files changed

Lines changed: 3086 additions & 136 deletions

File tree

apps/trigger/package.json

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "@codeindexer/trigger",
3+
"version": "0.1.0",
4+
"private": true,
5+
"type": "module",
6+
"scripts": {
7+
"dev": "npx trigger.dev@latest dev",
8+
"deploy": "npx trigger.dev@latest deploy",
9+
"typecheck": "tsc --noEmit"
10+
},
11+
"dependencies": {
12+
"@trigger.dev/sdk": "^4.0.0",
13+
"@aws-sdk/client-s3": "^3.0.0",
14+
"@codeindexer/core": "workspace:*",
15+
"@codeindexer/db": "workspace:*",
16+
"tar": "^7.0.0",
17+
"uuid": "^11.0.0",
18+
"zod": "^4.3.6"
19+
},
20+
"devDependencies": {
21+
"@trigger.dev/build": "^4.0.0",
22+
"@codeindexer/tsconfig": "workspace:*",
23+
"@types/uuid": "^10.0.0",
24+
"typescript": "^5"
25+
}
26+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import type { SyncStorage } from '@codeindexer/core';
2+
import { type Database, fileHashes, dirHashes, eq, and } from '@codeindexer/db';
3+
4+
export class DrizzleSyncStorage implements SyncStorage {
5+
constructor(
6+
private db: Database,
7+
private repoId: string,
8+
) {}
9+
10+
async getFileHash(filePath: string): Promise<string | null> {
11+
const row = await this.db.query.fileHashes.findFirst({
12+
where: and(eq(fileHashes.repoId, this.repoId), eq(fileHashes.filePath, filePath)),
13+
});
14+
return row?.sha256 ?? null;
15+
}
16+
17+
async setFileHash(filePath: string, hash: string): Promise<void> {
18+
await this.db
19+
.insert(fileHashes)
20+
.values({
21+
repoId: this.repoId,
22+
filePath,
23+
sha256: hash,
24+
updatedAt: new Date(),
25+
})
26+
.onConflictDoUpdate({
27+
target: [fileHashes.repoId, fileHashes.filePath],
28+
set: { sha256: hash, updatedAt: new Date() },
29+
});
30+
}
31+
32+
async getDirHash(dirPath: string): Promise<string | null> {
33+
const row = await this.db.query.dirHashes.findFirst({
34+
where: and(eq(dirHashes.repoId, this.repoId), eq(dirHashes.dirPath, dirPath)),
35+
});
36+
return row?.merkleHash ?? null;
37+
}
38+
39+
async setDirHash(dirPath: string, hash: string): Promise<void> {
40+
await this.db
41+
.insert(dirHashes)
42+
.values({
43+
repoId: this.repoId,
44+
dirPath,
45+
merkleHash: hash,
46+
updatedAt: new Date(),
47+
})
48+
.onConflictDoUpdate({
49+
target: [dirHashes.repoId, dirHashes.dirPath],
50+
set: { merkleHash: hash, updatedAt: new Date() },
51+
});
52+
}
53+
54+
async clearDirHashes(): Promise<void> {
55+
await this.db.delete(dirHashes).where(eq(dirHashes.repoId, this.repoId));
56+
}
57+
58+
async getAllFileHashes(): Promise<Map<string, string>> {
59+
const rows = await this.db.query.fileHashes.findMany({
60+
where: eq(fileHashes.repoId, this.repoId),
61+
});
62+
const map = new Map<string, string>();
63+
for (const row of rows) {
64+
map.set(row.filePath, row.sha256);
65+
}
66+
return map;
67+
}
68+
69+
async transaction<T>(fn: () => Promise<T>): Promise<T> {
70+
// Neon HTTP driver does not support interactive transactions.
71+
// Each query is auto-committed, so we just run fn sequentially.
72+
return fn();
73+
}
74+
}

apps/trigger/src/lib/github.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { createSign, createPrivateKey } from 'node:crypto';
2+
import { pipeline } from 'node:stream/promises';
3+
import { Readable } from 'node:stream';
4+
import { createWriteStream } from 'node:fs';
5+
import { mkdir } from 'node:fs/promises';
6+
import { extract } from 'tar';
7+
8+
function createAppJWT(appId: string, privateKey: string): string {
9+
const now = Math.floor(Date.now() / 1000);
10+
const header = Buffer.from(JSON.stringify({ alg: 'RS256', typ: 'JWT' })).toString('base64url');
11+
const payload = Buffer.from(
12+
JSON.stringify({ iat: now - 60, exp: now + 600, iss: appId }),
13+
).toString('base64url');
14+
const key = createPrivateKey(privateKey);
15+
const signature = createSign('RSA-SHA256').update(`${header}.${payload}`).sign(key, 'base64url');
16+
return `${header}.${payload}.${signature}`;
17+
}
18+
19+
async function getInstallationToken(installationId: number, appJwt: string): Promise<string> {
20+
const res = await fetch(
21+
`https://api.github.com/app/installations/${installationId}/access_tokens`,
22+
{
23+
method: 'POST',
24+
headers: {
25+
Authorization: `Bearer ${appJwt}`,
26+
Accept: 'application/vnd.github+json',
27+
'User-Agent': 'CodeIndexer/1.0',
28+
'X-GitHub-Api-Version': '2022-11-28',
29+
},
30+
},
31+
);
32+
if (!res.ok) throw new Error(`Failed to get installation token: ${res.status}`);
33+
const data = (await res.json()) as { token: string };
34+
return data.token;
35+
}
36+
37+
async function downloadAndExtractTarball(
38+
fullName: string,
39+
ref: string,
40+
token: string,
41+
destDir: string,
42+
tarballPath: string,
43+
): Promise<string> {
44+
// Use redirect: 'manual' to capture the SHA from the redirect Location URL
45+
// GitHub returns 302 → codeload.github.com/.../{sha}.tar.gz
46+
const headRes = await fetch(`https://api.github.com/repos/${fullName}/tarball/${ref}`, {
47+
headers: {
48+
Authorization: `Bearer ${token}`,
49+
Accept: 'application/vnd.github+json',
50+
'User-Agent': 'CodeIndexer/1.0',
51+
},
52+
redirect: 'manual',
53+
});
54+
55+
const location = headRes.headers.get('location') ?? '';
56+
const shaMatch =
57+
location.match(/\/([a-f0-9]{40})\.tar\.gz/) ?? location.match(/\/([a-f0-9]{7,40})/);
58+
const headSha = shaMatch?.[1] ?? 'unknown';
59+
60+
if (!location) throw new Error(`Tarball download failed: no redirect (status ${headRes.status})`);
61+
62+
// Follow the redirect and save tarball to disk
63+
const res = await fetch(location);
64+
if (!res.ok) throw new Error(`Tarball download failed: ${res.status}`);
65+
66+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
67+
await pipeline(Readable.fromWeb(res.body as any), createWriteStream(tarballPath));
68+
69+
// Extract tarball with strip: 1 to remove GitHub's wrapper directory
70+
await mkdir(destDir, { recursive: true });
71+
await extract({ file: tarballPath, cwd: destDir, strip: 1 });
72+
73+
return headSha;
74+
}
75+
76+
export { createAppJWT, getInstallationToken, downloadAndExtractTarball };

apps/trigger/src/lib/r2.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
2+
import { readFile } from 'node:fs/promises';
3+
import path from 'node:path';
4+
5+
let _client: S3Client | null = null;
6+
7+
function getR2Client(): S3Client {
8+
if (!_client) {
9+
_client = new S3Client({
10+
region: 'auto',
11+
endpoint: process.env.R2_ENDPOINT!,
12+
credentials: {
13+
accessKeyId: process.env.R2_ACCESS_KEY_ID!,
14+
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!,
15+
},
16+
});
17+
}
18+
return _client;
19+
}
20+
21+
const BUCKET = () => process.env.R2_BUCKET!;
22+
23+
async function uploadBuffer(key: string, body: Buffer, contentType = 'application/octet-stream') {
24+
await getR2Client().send(
25+
new PutObjectCommand({
26+
Bucket: BUCKET(),
27+
Key: key,
28+
Body: body,
29+
ContentType: contentType,
30+
}),
31+
);
32+
}
33+
34+
async function uploadFile(key: string, filePath: string) {
35+
const body = await readFile(filePath);
36+
await uploadBuffer(key, body);
37+
}
38+
39+
async function uploadRepoFiles(
40+
repoId: string,
41+
files: string[],
42+
cloneDir: string,
43+
concurrency = 20,
44+
) {
45+
const client = getR2Client();
46+
const bucket = BUCKET();
47+
48+
for (let i = 0; i < files.length; i += concurrency) {
49+
const batch = files.slice(i, i + concurrency);
50+
await Promise.all(
51+
batch.map(async (file) => {
52+
const relativePath = path.relative(cloneDir, file);
53+
const body = await readFile(file);
54+
await client.send(
55+
new PutObjectCommand({
56+
Bucket: bucket,
57+
Key: `repos/${repoId}/files/${relativePath}`,
58+
Body: body,
59+
}),
60+
);
61+
}),
62+
);
63+
}
64+
}
65+
66+
function buildFileTree(files: string[], cloneDir: string): Record<string, unknown> {
67+
const tree: Record<string, unknown> = {};
68+
for (const file of files) {
69+
const rel = path.relative(cloneDir, file);
70+
const parts = rel.split(path.sep);
71+
let node = tree as Record<string, unknown>;
72+
for (let i = 0; i < parts.length - 1; i++) {
73+
node[parts[i]] = (node[parts[i]] as Record<string, unknown>) || {};
74+
node = node[parts[i]] as Record<string, unknown>;
75+
}
76+
node[parts[parts.length - 1]] = null; // leaf = file
77+
}
78+
return tree;
79+
}
80+
81+
export { uploadBuffer, uploadFile, uploadRepoFiles, buildFileTree, getR2Client };

0 commit comments

Comments
 (0)