Skip to content

Commit ab73a2d

Browse files
authored
perf(pds): stream getRepo to fix OOM on large repos (#131)
* perf(pds): stream getRepo response to fix OOM on large repos Route getRepo through the DO's fetch() handler instead of RPC, enabling a streaming Response. Lazily iterates SQLite rows and pipes them through writeCarStream, dropping memory usage from O(repo_size) to O(single_block). * ci: add changeset for streaming getRepo
1 parent 764fef3 commit ab73a2d

4 files changed

Lines changed: 117 additions & 28 deletions

File tree

.changeset/real-teams-lead.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@getcirrus/pds": patch
3+
---
4+
5+
Stream getRepo response to fix OOM on large repos

packages/pds/src/account-do.ts

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import {
44
WriteOpAction,
55
BlockMap,
66
blocksToCarFile,
7+
writeCarStream,
78
readCarWithRoot,
89
getRecords,
10+
type CarBlock,
911
type RecordCreateOp,
1012
type RecordUpdateOp,
1113
type RecordDeleteOp,
@@ -738,31 +740,50 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
738740
}
739741

740742
/**
741-
* RPC method: Export repo as CAR
743+
* Handle streaming getRepo via fetch (not RPC, to enable streaming response).
742744
*/
743-
async rpcGetRepoCar(): Promise<Uint8Array> {
745+
private async handleGetRepo(): Promise<Response> {
744746
const storage = await this.getStorage();
745747
const root = await storage.getRoot();
746748

747749
if (!root) {
748-
throw new Error("No repository root found");
750+
return Response.json(
751+
{ error: "RepoNotFound", message: "No repository root found" },
752+
{ status: 404 },
753+
);
749754
}
750755

751-
// Get all blocks from SQLite storage
752-
const rows = this.ctx.storage.sql
753-
.exec("SELECT cid, bytes FROM blocks")
754-
.toArray();
755-
756-
// Build BlockMap
757-
const blocks = new BlockMap();
758-
for (const row of rows) {
759-
const cid = CID.parse(row.cid as string);
760-
const bytes = new Uint8Array(row.bytes as ArrayBuffer);
761-
blocks.set(cid, bytes);
756+
// Lazily iterate SQLite rows — the cursor is already lazy,
757+
// only .toArray() would materialize everything in memory.
758+
const cursor = this.ctx.storage.sql.exec(
759+
"SELECT cid, bytes FROM blocks",
760+
);
761+
762+
async function* blocks(): AsyncGenerator<CarBlock> {
763+
for (const row of cursor) {
764+
yield {
765+
cid: CID.parse(row.cid as string),
766+
bytes: new Uint8Array(row.bytes as ArrayBuffer),
767+
};
768+
}
762769
}
763770

764-
// Use the official CAR builder
765-
return blocksToCarFile(root, blocks);
771+
const carIter = writeCarStream(root, blocks())[Symbol.asyncIterator]();
772+
773+
const stream = new ReadableStream<Uint8Array>({
774+
async pull(controller) {
775+
const { value, done } = await carIter.next();
776+
if (done) {
777+
controller.close();
778+
} else {
779+
controller.enqueue(value);
780+
}
781+
},
782+
});
783+
784+
return new Response(stream, {
785+
headers: { "Content-Type": "application/vnd.ipld.car" },
786+
});
766787
}
767788

768789
/**
@@ -1541,15 +1562,18 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
15411562
}
15421563

15431564
/**
1544-
* HTTP fetch handler for WebSocket upgrades.
1545-
* This is used instead of RPC to avoid WebSocket serialization errors.
1565+
* HTTP fetch handler for WebSocket upgrades and streaming responses.
1566+
* Used instead of RPC when the response can't be serialized (WebSocket)
1567+
* or when streaming is needed to avoid buffering large payloads (getRepo).
15461568
*/
15471569
override async fetch(request: Request): Promise<Response> {
1548-
// Only handle WebSocket upgrades via fetch
15491570
const url = new URL(request.url);
15501571
if (url.pathname === "/xrpc/com.atproto.sync.subscribeRepos") {
15511572
return this.handleFirehoseUpgrade(request);
15521573
}
1574+
if (url.pathname === "/xrpc/com.atproto.sync.getRepo") {
1575+
return this.handleGetRepo();
1576+
}
15531577

15541578
// All other requests should use RPC methods, not fetch
15551579
return new Response("Method not allowed", { status: 405 });

packages/pds/src/xrpc/sync.ts

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,10 @@ export async function getRepo(
3838
);
3939
}
4040

41-
const carBytes = await accountDO.rpcGetRepoCar();
42-
43-
return new Response(carBytes, {
44-
status: 200,
45-
headers: {
46-
"Content-Type": "application/vnd.ipld.car",
47-
"Content-Length": carBytes.length.toString(),
48-
},
49-
});
41+
// Stream through the DO's fetch handler to avoid buffering the entire CAR
42+
return accountDO.fetch(
43+
new Request("https://do/xrpc/com.atproto.sync.getRepo"),
44+
);
5045
}
5146

5247
export async function getRepoStatus(

packages/pds/test/xrpc.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,6 +1140,71 @@ describe("XRPC Endpoints", () => {
11401140
expect(blocks.length).toBeGreaterThan(0);
11411141
});
11421142

1143+
it("should stream getRepo response without Content-Length", async () => {
1144+
const { CarReader } = await import("@ipld/car");
1145+
1146+
// Ensure repo has content
1147+
await worker.fetch(
1148+
new Request("http://pds.test/xrpc/com.atproto.repo.createRecord", {
1149+
method: "POST",
1150+
headers: {
1151+
"Content-Type": "application/json",
1152+
Authorization: `Bearer ${env.AUTH_TOKEN}`,
1153+
},
1154+
body: JSON.stringify({
1155+
repo: env.DID,
1156+
collection: "app.bsky.feed.post",
1157+
rkey: "stream-test",
1158+
record: {
1159+
$type: "app.bsky.feed.post",
1160+
text: "Streaming test",
1161+
createdAt: new Date().toISOString(),
1162+
},
1163+
}),
1164+
}),
1165+
env,
1166+
);
1167+
1168+
const response = await worker.fetch(
1169+
new Request(
1170+
`http://pds.test/xrpc/com.atproto.sync.getRepo?did=${env.DID}`,
1171+
),
1172+
env,
1173+
);
1174+
expect(response.status).toBe(200);
1175+
1176+
// Streaming: no Content-Length header
1177+
expect(response.headers.get("Content-Length")).toBeNull();
1178+
1179+
// Body is a ReadableStream, not a fixed buffer
1180+
expect(response.body).toBeInstanceOf(ReadableStream);
1181+
1182+
// Read incrementally to verify chunked delivery
1183+
const reader = response.body!.getReader();
1184+
const chunks: Uint8Array[] = [];
1185+
for (;;) {
1186+
const { value, done } = await reader.read();
1187+
if (done) break;
1188+
chunks.push(value);
1189+
}
1190+
1191+
// Should arrive in multiple chunks (header + blocks)
1192+
expect(chunks.length).toBeGreaterThan(1);
1193+
1194+
// Reassembled bytes are a valid CAR
1195+
const totalLength = chunks.reduce((n, c) => n + c.byteLength, 0);
1196+
const carBytes = new Uint8Array(totalLength);
1197+
let offset = 0;
1198+
for (const chunk of chunks) {
1199+
carBytes.set(chunk, offset);
1200+
offset += chunk.byteLength;
1201+
}
1202+
1203+
const car = await CarReader.fromBytes(carBytes);
1204+
const roots = await car.getRoots();
1205+
expect(roots).toHaveLength(1);
1206+
});
1207+
11431208
it("should export CAR file for empty repo", async () => {
11441209
const { CarReader } = await import("@ipld/car");
11451210

0 commit comments

Comments
 (0)