Skip to content

Commit a492bf7

Browse files
authored
feat(pds): align lexicon validation with reference PDS (#160)
* feat(pds): tighten lexicon validation to match reference PDS Honor the `validate` flag, return `validationStatus`, validate rkeys against schema `keySchema`, reject legacy blob refs, and broaden the bundled schema set. Move authoritative rkey allocation into the DO with a collision-retry loop so worker-isolate clockid collisions can't surface as 500s. Translate client-supplied rkey collisions to 409 and intra-batch duplicates to 400. * test(pds): generate TID-format rkeys in e2e helpers Now that the validator enforces the schema's keySchema (e.g. tidString for app.bsky.feed.post), uniqueRkey() must return a valid TID.
1 parent 5eb1b6b commit a492bf7

10 files changed

Lines changed: 1091 additions & 301 deletions

File tree

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
"@getcirrus/pds": minor
3+
---
4+
5+
Lexicon validation now matches the reference PDS more closely:
6+
7+
- `createRecord`, `putRecord`, and `applyWrites` honor the `validate` flag from the request body. `true` requires a known schema, `false` skips schema validation, `undefined` validates known schemas optimistically.
8+
- Responses include `validationStatus` (`"valid"` for known, `"unknown"` for unknown collections; omitted when `validate: false`). Per-write `validationStatus` is returned in `applyWrites` results.
9+
- The record's `$type` is filled in from `collection` when missing and rejected on mismatch.
10+
- Generic record-key shape (`isRecordKey`) is enforced for any provided rkey, regardless of `validate` flag — closes a hole where empty-string and path-traversal-style rkeys could reach the repo.
11+
- Schema-specific record keys are validated against the schema's `keySchema` for known collections (e.g. `app.bsky.feed.post` requires a TID, `app.bsky.actor.profile` requires `self`).
12+
- Legacy `{ cid, mimeType }` blob refs are rejected.
13+
- Bundled schema set broadened to include `com.atproto.lexicon.schema`, `app.bsky.actor.status`, `app.bsky.notification.declaration`, and `chat.bsky.actor.declaration`.
14+
- The Durable Object is now the authoritative rkey allocator: when the client doesn't supply an rkey, the worker validates against a candidate (so restrictive `keySchema`s still reject early) and the DO picks the final rkey against its MST state, with a small retry loop to defeat any worker-isolate clockid collisions.
15+
- Client-supplied rkey collisions return `409 RecordAlreadyExists` instead of a generic 500.
16+
- Intra-batch duplicate rkeys in `applyWrites` return `400 InvalidRequest` (distinguished from the 409 above).
17+
- Missing rkey for `applyWrites#update`/`#delete` returns `400 InvalidRequest`.
18+
- Non-boolean `validate` flag values return `400 InvalidRequest`.
19+
- Non-string `rkey` values (including `null`) return `400 InvalidRequest`.

packages/pds/e2e/helpers.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { AtpAgent } from "@atproto/api";
2+
import { now as tidNow } from "@atcute/tid";
23

34
export function getPort(): number {
45
return (
@@ -15,10 +16,12 @@ export function createAgent(): AtpAgent {
1516
}
1617

1718
/**
18-
* Generate a unique rkey for test isolation
19+
* Generate a unique TID-format rkey for test isolation. Most app.bsky.*
20+
* record collections constrain the rkey to TID format, so tests can't use
21+
* arbitrary strings.
1922
*/
2023
export function uniqueRkey(): string {
21-
return `test-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
24+
return tidNow();
2225
}
2326

2427
export const TEST_DID = "did:web:test.local";

packages/pds/src/account-do.ts

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
import { BlobStore, type BlobRef } from "./blobs";
3232
import { jsonToLex } from "@atproto/lex-json";
3333
import type { PDSEnv } from "./types";
34+
import { RecordAlreadyExistsError, type ValidationStatus } from "./validation";
3435

3536
/**
3637
* Account Durable Object - manages a single user's AT Protocol repository.
@@ -322,16 +323,42 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
322323
collection: string,
323324
rkey: string | undefined,
324325
record: unknown,
326+
validationStatus?: ValidationStatus,
325327
): Promise<{
326328
uri: string;
327329
cid: string;
328330
commit: { cid: string; rev: string };
331+
validationStatus?: ValidationStatus;
329332
}> {
330333
await this.ensureActive();
331334
const repo = await this.getRepo();
332335
const keypair = await this.getKeypair();
333336

334-
const actualRkey = rkey || tidNow();
337+
// Auto-generate rkey here (not in the worker) so the candidate is
338+
// chosen against this DO's authoritative MST state, eliminating the
339+
// 1/1024 collision risk between worker isolates picking the same
340+
// timestamp+clockid in the same ms. For client-supplied rkeys, throw
341+
// a structured RecordAlreadyExistsError if it collides. Use the
342+
// MST's CID lookup (data.get) instead of repo.getRecord to avoid
343+
// fetching and decoding the full record block on every write.
344+
const autoGenerated = rkey === undefined;
345+
let actualRkey = rkey ?? tidNow();
346+
for (let attempt = 0; attempt < 5; attempt++) {
347+
const existingCid = await repo.data.get(`${collection}/${actualRkey}`);
348+
if (!existingCid) break;
349+
if (!autoGenerated) {
350+
throw new RecordAlreadyExistsError(
351+
`${collection}/${actualRkey}`,
352+
);
353+
}
354+
if (attempt === 4) {
355+
throw new Error(
356+
`Failed to allocate unique rkey for ${collection} after 5 attempts`,
357+
);
358+
}
359+
actualRkey = tidNow();
360+
}
361+
335362
const createOp: RecordCreateOp = {
336363
action: WriteOpAction.Create,
337364
collection,
@@ -394,6 +421,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
394421
cid: this.repo.cid.toString(),
395422
rev: this.repo.commit.rev,
396423
},
424+
...(validationStatus !== undefined ? { validationStatus } : {}),
397425
};
398426
}
399427

@@ -466,11 +494,12 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
466494
collection: string,
467495
rkey: string,
468496
record: unknown,
497+
validationStatus?: ValidationStatus,
469498
): Promise<{
470499
uri: string;
471500
cid: string;
472501
commit: { cid: string; rev: string };
473-
validationStatus: string;
502+
validationStatus?: ValidationStatus;
474503
}> {
475504
await this.ensureActive();
476505
const repo = await this.getRepo();
@@ -548,7 +577,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
548577
cid: this.repo.cid.toString(),
549578
rev: this.repo.commit.rev,
550579
},
551-
validationStatus: "valid",
580+
...(validationStatus !== undefined ? { validationStatus } : {}),
552581
};
553582
}
554583

@@ -561,14 +590,15 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
561590
collection: string;
562591
rkey?: string;
563592
value?: unknown;
593+
validationStatus?: ValidationStatus;
564594
}>,
565595
): Promise<{
566596
commit: { cid: string; rev: string };
567597
results: Array<{
568598
$type: string;
569599
uri?: string;
570600
cid?: string;
571-
validationStatus?: string;
601+
validationStatus?: ValidationStatus;
572602
}>;
573603
}> {
574604
await this.ensureActive();
@@ -581,15 +611,38 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
581611
$type: string;
582612
uri?: string;
583613
cid?: string;
584-
validationStatus?: string;
614+
validationStatus?: ValidationStatus;
585615
collection: string;
586616
rkey: string;
587617
action: WriteOpAction;
588618
}> = [];
589619

620+
// Track rkeys this batch will write so two auto-generated creates in
621+
// the same batch don't pick the same rkey.
622+
const reservedRkeys = new Set<string>();
623+
590624
for (const write of writes) {
591625
if (write.$type === "com.atproto.repo.applyWrites#create") {
592-
const rkey = write.rkey || tidNow();
626+
const autoGenerated = write.rkey === undefined;
627+
let rkey = write.rkey ?? tidNow();
628+
for (let attempt = 0; attempt < 5; attempt++) {
629+
const composite = `${write.collection}/${rkey}`;
630+
const collidesInBatch = reservedRkeys.has(composite);
631+
const collidesInRepo =
632+
!collidesInBatch &&
633+
(await repo.data.get(composite)) !== null;
634+
if (!collidesInBatch && !collidesInRepo) break;
635+
if (!autoGenerated) {
636+
throw new RecordAlreadyExistsError(composite);
637+
}
638+
if (attempt === 4) {
639+
throw new Error(
640+
`Failed to allocate unique rkey for ${write.collection} after 5 attempts`,
641+
);
642+
}
643+
rkey = tidNow();
644+
}
645+
reservedRkeys.add(`${write.collection}/${rkey}`);
593646
const op: RecordCreateOp = {
594647
action: WriteOpAction.Create,
595648
collection: write.collection,
@@ -602,6 +655,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
602655
collection: write.collection,
603656
rkey,
604657
action: WriteOpAction.Create,
658+
validationStatus: write.validationStatus,
605659
});
606660
} else if (write.$type === "com.atproto.repo.applyWrites#update") {
607661
if (!write.rkey) {
@@ -619,6 +673,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
619673
collection: write.collection,
620674
rkey: write.rkey,
621675
action: WriteOpAction.Update,
676+
validationStatus: write.validationStatus,
622677
});
623678
} else if (write.$type === "com.atproto.repo.applyWrites#delete") {
624679
if (!write.rkey) {
@@ -657,7 +712,7 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
657712
$type: string;
658713
uri?: string;
659714
cid?: string;
660-
validationStatus?: string;
715+
validationStatus?: ValidationStatus;
661716
}> = [];
662717
const opsWithCids: Array<RecordWriteOp & { cid?: CID | null }> = [];
663718

@@ -678,7 +733,9 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
678733
$type: result.$type,
679734
uri: `at://${this.repo.did}/${result.collection}/${result.rkey}`,
680735
cid: recordCid?.toString(),
681-
validationStatus: "valid",
736+
...(result.validationStatus !== undefined
737+
? { validationStatus: result.validationStatus }
738+
: {}),
682739
});
683740
// Include the record CID in the op for the firehose
684741
opsWithCids.push({ ...op, cid: recordCid });

0 commit comments

Comments
 (0)