Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/sync-list-repos-by-collection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@getcirrus/pds": minor
---

Implement `com.atproto.sync.listReposByCollection`.

Relays and crawlers use this endpoint to discover which PDSes host repos that contain a given record collection. The PDS now answers with `{ repos: [{ did }] }` when its account has at least one record in the requested collection, or an empty list otherwise. Invalid or missing `collection` parameters return `InvalidRequest`.
20 changes: 20 additions & 0 deletions packages/pds/src/account-do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,26 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
};
}

/**
* RPC method: Check whether the repo contains any records in a collection.
*/
async rpcHasRecordsInCollection(collection: string): Promise<boolean> {
const repo = await this.getRepo();
const storage = await this.getStorage();

if (!storage.hasCollections() && (await storage.getRoot())) {
const seen = new Set<string>();
for await (const record of repo.walkRecords()) {
if (!seen.has(record.collection)) {
seen.add(record.collection);
storage.addCollection(record.collection);
}
}
}

return storage.getCollections().includes(collection);
}

/**
* RPC method: Get a single record
*/
Expand Down
3 changes: 3 additions & 0 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ app.get("/xrpc/com.atproto.sync.getBlob", (c) =>
app.get("/xrpc/com.atproto.sync.listRepos", (c) =>
sync.listRepos(c, getAccountDO(c.env)),
);
app.get("/xrpc/com.atproto.sync.listReposByCollection", (c) =>
sync.listReposByCollection(c, getAccountDO(c.env)),
);
app.get("/xrpc/com.atproto.sync.listBlobs", (c) =>
sync.listBlobs(c, getAccountDO(c.env)),
);
Expand Down
33 changes: 33 additions & 0 deletions packages/pds/src/xrpc/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,39 @@ export async function listRepos(
});
}

export async function listReposByCollection(
c: Context<AppEnv>,
accountDO: DurableObjectStub<AccountDurableObject>,
): Promise<Response> {
const collection = c.req.query("collection");

if (!collection) {
return c.json(
{
error: "InvalidRequest",
message: "Missing required parameter: collection",
},
400,
);
}

if (!isNsid(collection)) {
return c.json(
{
error: "InvalidRequest",
message: "Invalid collection format (must be NSID)",
},
400,
);
}

const hasRecords = await accountDO.rpcHasRecordsInCollection(collection);

return c.json({
repos: hasRecords ? [{ did: c.env.DID }] : [],
});
}

export async function getLatestCommit(
c: Context<AppEnv>,
accountDO: DurableObjectStub<AccountDurableObject>,
Expand Down
85 changes: 85 additions & 0 deletions packages/pds/test/list-repos-by-collection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { describe, it, expect } from "vitest";
import { env, worker } from "./helpers";

describe("com.atproto.sync.listReposByCollection", () => {
it("returns this PDS's DID for a collection with records", async () => {
const createResponse = await worker.fetch(
new Request(`http://pds.test/xrpc/com.atproto.repo.createRecord`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${env.AUTH_TOKEN}`,
},
body: JSON.stringify({
repo: env.DID,
collection: "app.bsky.feed.post",
record: {
$type: "app.bsky.feed.post",
text: "Hello listReposByCollection",
createdAt: new Date().toISOString(),
},
}),
}),
env,
);
expect(createResponse.status).toBe(200);

const response = await worker.fetch(
new Request(
`http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=app.bsky.feed.post`,
),
env,
);

expect(response.status).toBe(200);
const body = (await response.json()) as {
repos: Array<{ did: string }>;
cursor?: string;
};
expect(body.repos).toEqual([{ did: env.DID }]);
expect(body.cursor).toBeUndefined();
});

it("returns an empty list for a collection with no records", async () => {
const response = await worker.fetch(
new Request(
`http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=app.bsky.graph.block`,
),
env,
);

expect(response.status).toBe(200);
const body = (await response.json()) as {
repos: Array<{ did: string }>;
};
expect(body.repos).toEqual([]);
});

it("rejects requests missing the collection parameter", async () => {
const response = await worker.fetch(
new Request(
`http://pds.test/xrpc/com.atproto.sync.listReposByCollection`,
),
env,
);

expect(response.status).toBe(400);
const body = (await response.json()) as Record<string, unknown>;
expect(body.error).toBe("InvalidRequest");
expect(body.message).toContain("collection");
});

it("rejects an invalid collection NSID", async () => {
const response = await worker.fetch(
new Request(
`http://pds.test/xrpc/com.atproto.sync.listReposByCollection?collection=not-an-nsid`,
),
env,
);

expect(response.status).toBe(400);
const body = (await response.json()) as Record<string, unknown>;
expect(body.error).toBe("InvalidRequest");
expect(body.message).toContain("collection");
});
});
Loading