From 0203863b0350ac03a8f9127ed0a304892b549236 Mon Sep 17 00:00:00 2001 From: Matt Kane Date: Sun, 24 May 2026 20:44:10 +0100 Subject: [PATCH] feat(pds): implement com.atproto.sync.listReposByCollection Relays use this endpoint to discover which PDSes host repos for a given collection. Cirrus is single-account, so the answer reduces to "does our repo contain any record in the requested collection?". Uses the existing `collections` SQLite cache (with the same lazy backfill as `rpcDescribeRepo`) to answer without walking the MST. --- .changeset/sync-list-repos-by-collection.md | 7 ++ packages/pds/src/account-do.ts | 20 +++++ packages/pds/src/index.ts | 3 + packages/pds/src/xrpc/sync.ts | 33 +++++++ .../pds/test/list-repos-by-collection.test.ts | 85 +++++++++++++++++++ 5 files changed, 148 insertions(+) create mode 100644 .changeset/sync-list-repos-by-collection.md create mode 100644 packages/pds/test/list-repos-by-collection.test.ts diff --git a/.changeset/sync-list-repos-by-collection.md b/.changeset/sync-list-repos-by-collection.md new file mode 100644 index 00000000..7eebfb13 --- /dev/null +++ b/.changeset/sync-list-repos-by-collection.md @@ -0,0 +1,7 @@ +--- +"@getcirrus/pds": minor +--- + +Implement `com.atproto.sync.listReposByCollection`. + +Relays and crawlers use this endpoint to discover which PDSes host repos that contain a given record collection. The PDS now answers with `{ repos: [{ did }] }` when its account has at least one record in the requested collection, or an empty list otherwise. Invalid or missing `collection` parameters return `InvalidRequest`. diff --git a/packages/pds/src/account-do.ts b/packages/pds/src/account-do.ts index 3efe42dd..f82c83b7 100644 --- a/packages/pds/src/account-do.ts +++ b/packages/pds/src/account-do.ts @@ -248,6 +248,26 @@ export class AccountDurableObject extends DurableObject { }; } + /** + * RPC method: Check whether the repo contains any records in a collection. + */ + async rpcHasRecordsInCollection(collection: string): Promise { + const repo = await this.getRepo(); + const storage = await this.getStorage(); + + if (!storage.hasCollections() && (await storage.getRoot())) { + const seen = new Set(); + for await (const record of repo.walkRecords()) { + if (!seen.has(record.collection)) { + seen.add(record.collection); + storage.addCollection(record.collection); + } + } + } + + return storage.getCollections().includes(collection); + } + /** * RPC method: Get a single record */ diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index fe6a67f7..4ef2a862 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -224,6 +224,9 @@ app.get("/xrpc/com.atproto.sync.getBlob", (c) => app.get("/xrpc/com.atproto.sync.listRepos", (c) => sync.listRepos(c, getAccountDO(c.env)), ); +app.get("/xrpc/com.atproto.sync.listReposByCollection", (c) => + sync.listReposByCollection(c, getAccountDO(c.env)), +); app.get("/xrpc/com.atproto.sync.listBlobs", (c) => sync.listBlobs(c, getAccountDO(c.env)), ); diff --git a/packages/pds/src/xrpc/sync.ts b/packages/pds/src/xrpc/sync.ts index 709626ae..283649c9 100644 --- a/packages/pds/src/xrpc/sync.ts +++ b/packages/pds/src/xrpc/sync.ts @@ -117,6 +117,39 @@ export async function listRepos( }); } +export async function listReposByCollection( + c: Context, + accountDO: DurableObjectStub, +): Promise { + const collection = c.req.query("collection"); + + if (!collection) { + return c.json( + { + error: "InvalidRequest", + message: "Missing required parameter: collection", + }, + 400, + ); + } + + if (!isNsid(collection)) { + return c.json( + { + error: "InvalidRequest", + message: "Invalid collection format (must be NSID)", + }, + 400, + ); + } + + const hasRecords = await accountDO.rpcHasRecordsInCollection(collection); + + return c.json({ + repos: hasRecords ? [{ did: c.env.DID }] : [], + }); +} + export async function getLatestCommit( c: Context, accountDO: DurableObjectStub, diff --git a/packages/pds/test/list-repos-by-collection.test.ts b/packages/pds/test/list-repos-by-collection.test.ts new file mode 100644 index 00000000..cd42d6bb --- /dev/null +++ b/packages/pds/test/list-repos-by-collection.test.ts @@ -0,0 +1,85 @@ +import { describe, it, expect } from "vitest"; +import { env, worker } from "./helpers"; + +describe("com.atproto.sync.listReposByCollection", () => { + it("returns this PDS's DID for a collection with records", async () => { + const createResponse = await worker.fetch( + new Request(`http://pds.test/xrpc/com.atproto.repo.createRecord`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${env.AUTH_TOKEN}`, + }, + body: JSON.stringify({ + repo: env.DID, + collection: "app.bsky.feed.post", + record: { + $type: "app.bsky.feed.post", + text: "Hello listReposByCollection", + createdAt: new Date().toISOString(), + }, + }), + }), + env, + ); + expect(createResponse.status).toBe(200); + + const response = await worker.fetch( + new Request( + `http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=app.bsky.feed.post`, + ), + env, + ); + + expect(response.status).toBe(200); + const body = (await response.json()) as { + repos: Array<{ did: string }>; + cursor?: string; + }; + expect(body.repos).toEqual([{ did: env.DID }]); + expect(body.cursor).toBeUndefined(); + }); + + it("returns an empty list for a collection with no records", async () => { + const response = await worker.fetch( + new Request( + `http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=app.bsky.graph.block`, + ), + env, + ); + + expect(response.status).toBe(200); + const body = (await response.json()) as { + repos: Array<{ did: string }>; + }; + expect(body.repos).toEqual([]); + }); + + it("rejects requests missing the collection parameter", async () => { + const response = await worker.fetch( + new Request( + `http://pds.test/xrpc/com.atproto.sync.listReposByCollection`, + ), + env, + ); + + expect(response.status).toBe(400); + const body = (await response.json()) as Record; + expect(body.error).toBe("InvalidRequest"); + expect(body.message).toContain("collection"); + }); + + it("rejects an invalid collection NSID", async () => { + const response = await worker.fetch( + new Request( + `http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=not-an-nsid`, + ), + env, + ); + + expect(response.status).toBe(400); + const body = (await response.json()) as Record; + expect(body.error).toBe("InvalidRequest"); + expect(body.message).toContain("collection"); + }); +});