Skip to content

Commit 52a426b

Browse files
ascorbicclaude
andauthored
feat(pds): implement com.atproto.repo.applyWrites endpoint (#9)
Adds support for batch record operations (create/update/delete) in a single request. This endpoint is required by Bluesky clients for efficient repository mutations. - Add rpcApplyWrites method to AccountDurableObject - Add applyWrites handler and route - Add 6 tests covering batch operations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 5bf8917 commit 52a426b

4 files changed

Lines changed: 567 additions & 0 deletions

File tree

packages/pds/src/account-do.ts

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import {
55
BlockMap,
66
blocksToCarFile,
77
type RecordCreateOp,
8+
type RecordUpdateOp,
89
type RecordDeleteOp,
10+
type RecordWriteOp,
911
} from "@atproto/repo";
1012
import type { RepoRecord } from "@atproto/lexicon";
1113
import { Secp256k1Keypair } from "@atproto/crypto";
@@ -425,6 +427,202 @@ export class AccountDurableObject extends DurableObject<Env> {
425427
};
426428
}
427429

430+
/**
431+
* RPC method: Apply multiple writes (batch create/update/delete)
432+
*/
433+
async rpcApplyWrites(
434+
writes: Array<{
435+
$type: string;
436+
collection: string;
437+
rkey?: string;
438+
value?: unknown;
439+
}>,
440+
): Promise<{
441+
commit: { cid: string; rev: string };
442+
results: Array<{
443+
$type: string;
444+
uri?: string;
445+
cid?: string;
446+
validationStatus?: string;
447+
}>;
448+
}> {
449+
const repo = await this.getRepo();
450+
const keypair = await this.getKeypair();
451+
452+
// Convert input writes to RecordWriteOp format
453+
const ops: RecordWriteOp[] = [];
454+
const results: Array<{
455+
$type: string;
456+
uri?: string;
457+
cid?: string;
458+
validationStatus?: string;
459+
collection: string;
460+
rkey: string;
461+
action: WriteOpAction;
462+
}> = [];
463+
464+
for (const write of writes) {
465+
if (write.$type === "com.atproto.repo.applyWrites#create") {
466+
const rkey = write.rkey || TID.nextStr();
467+
const op: RecordCreateOp = {
468+
action: WriteOpAction.Create,
469+
collection: write.collection,
470+
rkey,
471+
record: write.value as RepoRecord,
472+
};
473+
ops.push(op);
474+
results.push({
475+
$type: "com.atproto.repo.applyWrites#createResult",
476+
collection: write.collection,
477+
rkey,
478+
action: WriteOpAction.Create,
479+
});
480+
} else if (write.$type === "com.atproto.repo.applyWrites#update") {
481+
if (!write.rkey) {
482+
throw new Error("Update requires rkey");
483+
}
484+
const op: RecordUpdateOp = {
485+
action: WriteOpAction.Update,
486+
collection: write.collection,
487+
rkey: write.rkey,
488+
record: write.value as RepoRecord,
489+
};
490+
ops.push(op);
491+
results.push({
492+
$type: "com.atproto.repo.applyWrites#updateResult",
493+
collection: write.collection,
494+
rkey: write.rkey,
495+
action: WriteOpAction.Update,
496+
});
497+
} else if (write.$type === "com.atproto.repo.applyWrites#delete") {
498+
if (!write.rkey) {
499+
throw new Error("Delete requires rkey");
500+
}
501+
const op: RecordDeleteOp = {
502+
action: WriteOpAction.Delete,
503+
collection: write.collection,
504+
rkey: write.rkey,
505+
};
506+
ops.push(op);
507+
results.push({
508+
$type: "com.atproto.repo.applyWrites#deleteResult",
509+
collection: write.collection,
510+
rkey: write.rkey,
511+
action: WriteOpAction.Delete,
512+
});
513+
} else {
514+
throw new Error(`Unknown write type: ${write.$type}`);
515+
}
516+
}
517+
518+
const prevCid = repo.cid;
519+
const updatedRepo = await repo.applyWrites(ops, keypair);
520+
this.repo = updatedRepo;
521+
522+
// Build final results with CIDs
523+
const finalResults: Array<{
524+
$type: string;
525+
uri?: string;
526+
cid?: string;
527+
validationStatus?: string;
528+
}> = [];
529+
530+
for (const result of results) {
531+
if (result.action === WriteOpAction.Delete) {
532+
finalResults.push({
533+
$type: result.$type,
534+
});
535+
} else {
536+
// Get the CID for create/update
537+
const dataKey = `${result.collection}/${result.rkey}`;
538+
const recordCid = await this.repo.data.get(dataKey);
539+
finalResults.push({
540+
$type: result.$type,
541+
uri: AtUri.make(this.repo.did, result.collection, result.rkey).toString(),
542+
cid: recordCid?.toString(),
543+
validationStatus: "valid",
544+
});
545+
}
546+
}
547+
548+
// Sequence the commit for firehose
549+
if (this.sequencer) {
550+
const newBlocks = new BlockMap();
551+
const rows = this.ctx.storage.sql
552+
.exec(
553+
"SELECT cid, bytes FROM blocks WHERE rev = ?",
554+
this.repo.cid.toString(),
555+
)
556+
.toArray();
557+
558+
for (const row of rows) {
559+
const cid = CID.parse(row.cid as string);
560+
const bytes = new Uint8Array(row.bytes as ArrayBuffer);
561+
newBlocks.set(cid, bytes);
562+
}
563+
564+
const commitData: CommitData = {
565+
did: this.repo.did,
566+
commit: this.repo.cid,
567+
rev: this.repo.cid.toString(),
568+
since: prevCid.toString(),
569+
newBlocks,
570+
ops,
571+
};
572+
573+
const seq = await this.sequencer.sequenceCommit(commitData);
574+
575+
// Build ops for firehose event
576+
const firehoseOps = await Promise.all(
577+
results.map(async (result) => {
578+
if (result.action === WriteOpAction.Delete) {
579+
return {
580+
action: "delete" as const,
581+
path: `${result.collection}/${result.rkey}`,
582+
cid: null,
583+
};
584+
}
585+
const dataKey = `${result.collection}/${result.rkey}`;
586+
const cid = await this.repo!.data.get(dataKey);
587+
return {
588+
action: result.action === WriteOpAction.Create ? "create" as const : "update" as const,
589+
path: `${result.collection}/${result.rkey}`,
590+
cid,
591+
};
592+
}),
593+
);
594+
595+
const event: SeqEvent = {
596+
seq,
597+
type: "commit",
598+
event: {
599+
seq,
600+
rebase: false,
601+
tooBig: false,
602+
repo: this.repo.did,
603+
commit: this.repo.cid,
604+
rev: this.repo.cid.toString(),
605+
since: prevCid.toString(),
606+
blocks: new Uint8Array(),
607+
ops: firehoseOps as any,
608+
blobs: [],
609+
time: new Date().toISOString(),
610+
},
611+
time: new Date().toISOString(),
612+
};
613+
614+
await this.broadcastCommit(event);
615+
}
616+
617+
return {
618+
commit: {
619+
cid: this.repo.cid.toString(),
620+
rev: this.repo.cid.toString(),
621+
},
622+
results: finalResults,
623+
};
624+
}
625+
428626
/**
429627
* RPC method: Get repo status
430628
*/

packages/pds/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,9 @@ app.post("/xrpc/com.atproto.repo.deleteRecord", requireAuth, (c) =>
142142
app.post("/xrpc/com.atproto.repo.uploadBlob", requireAuth, (c) =>
143143
repo.uploadBlob(c, getAccountDO(c.env)),
144144
);
145+
app.post("/xrpc/com.atproto.repo.applyWrites", requireAuth, (c) =>
146+
repo.applyWrites(c, getAccountDO(c.env)),
147+
);
145148

146149
// Server identity
147150
app.get("/xrpc/com.atproto.server.describeServer", server.describeServer);

packages/pds/src/xrpc/repo.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,57 @@ export async function deleteRecord(
253253
return c.json(result);
254254
}
255255

256+
export async function applyWrites(
257+
c: Context<{ Bindings: Env }>,
258+
accountDO: DurableObjectStub<AccountDurableObject>,
259+
): Promise<Response> {
260+
const body = await c.req.json();
261+
const { repo, writes } = body;
262+
263+
if (!repo || !writes || !Array.isArray(writes)) {
264+
return c.json(
265+
{
266+
error: "InvalidRequest",
267+
message: "Missing required parameters: repo, writes",
268+
},
269+
400,
270+
);
271+
}
272+
273+
if (repo !== c.env.DID) {
274+
return c.json(
275+
{
276+
error: "InvalidRepo",
277+
message: `Invalid repository: ${repo}`,
278+
},
279+
400,
280+
);
281+
}
282+
283+
if (writes.length > 200) {
284+
return c.json(
285+
{
286+
error: "InvalidRequest",
287+
message: "Too many writes. Max: 200",
288+
},
289+
400,
290+
);
291+
}
292+
293+
try {
294+
const result = await accountDO.rpcApplyWrites(writes);
295+
return c.json(result);
296+
} catch (err) {
297+
return c.json(
298+
{
299+
error: "InvalidRequest",
300+
message: err instanceof Error ? err.message : String(err),
301+
},
302+
400,
303+
);
304+
}
305+
}
306+
256307
export async function uploadBlob(
257308
c: Context<{ Bindings: Env }>,
258309
accountDO: DurableObjectStub<AccountDurableObject>,

0 commit comments

Comments
 (0)