Skip to content

Commit eca3634

Browse files
ascorbicclaude
andauthored
fix(pds): correct firehose event format for relay compatibility (#12)
- Use revision TID for 'since' field instead of CID (per AT Protocol spec) - Include record CID in ops for create/update operations - Add listBlobs sync endpoint - Route chat.bsky.* lexicons to api.bsky.chat - Add firehose verification script - Configure R2 blob storage binding 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent e03131c commit eca3634

9 files changed

Lines changed: 503 additions & 145 deletions

File tree

packages/pds/scripts/generate-keys.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,21 @@
44
*/
55

66
import { Secp256k1Keypair } from "@atproto/crypto"
7-
import * as uint8arrays from "uint8arrays"
87

98
async function main() {
109
// Generate a new secp256k1 keypair
1110
const keypair = await Secp256k1Keypair.create({ exportable: true })
1211

13-
// Export the private key
12+
// Export the private key as hex
1413
const privateKeyBytes = await keypair.export()
15-
const privateKeyHex = uint8arrays.toString(privateKeyBytes, "hex")
14+
const privateKeyHex = Buffer.from(privateKeyBytes).toString("hex")
1615

1716
// Get the public key in did:key format
1817
const did = keypair.did()
1918

20-
// Get the public key bytes for DID document
21-
const publicKeyBytes = keypair.publicKeyBytes()
22-
const publicKeyMultibase = "z" + uint8arrays.toString(publicKeyBytes, "base58btc")
19+
// Get the public key multibase from did:key (includes multicodec prefix)
20+
// This is the correct format for DID document verificationMethod
21+
const publicKeyMultibase = did.replace("did:key:", "")
2322

2423
console.log("=== Edge PDS Signing Keys ===\n")
2524
console.log("Set these secrets with wrangler:\n")
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
#!/usr/bin/env npx tsx
2+
/**
3+
* Firehose verification script
4+
* Connects to the PDS firehose and verifies event data integrity
5+
*/
6+
7+
import WebSocket from "ws";
8+
import { decodeAll } from "@atproto/lex-cbor";
9+
import { CarReader } from "@ipld/car";
10+
import type { Cid } from "@atproto/lex-data";
11+
12+
const PDS_URL = process.env.PDS_URL || "wss://pds.mk.gg";
13+
const CURSOR = process.env.CURSOR || "0";
14+
const MAX_EVENTS = parseInt(process.env.MAX_EVENTS || "20", 10);
15+
16+
interface FrameHeader {
17+
op: number;
18+
t?: string;
19+
}
20+
21+
interface CommitOp {
22+
action: "create" | "update" | "delete";
23+
path: string;
24+
cid: Cid | null;
25+
}
26+
27+
interface CommitEvent {
28+
seq: number;
29+
rebase: boolean;
30+
tooBig: boolean;
31+
repo: string;
32+
commit: Cid;
33+
rev: string;
34+
since: string | null;
35+
blocks: Uint8Array;
36+
ops: CommitOp[];
37+
blobs: Cid[];
38+
time: string;
39+
}
40+
41+
function decodeFrame(data: Uint8Array): { header: FrameHeader; body: unknown } {
42+
// AT Protocol frame: header CBOR + body CBOR concatenated
43+
const parts = [...decodeAll(data)];
44+
if (parts.length < 2) {
45+
throw new Error(`Expected 2 CBOR values in frame, got ${parts.length}`);
46+
}
47+
return { header: parts[0] as FrameHeader, body: parts[1] };
48+
}
49+
50+
async function verifyCommitEvent(
51+
event: CommitEvent,
52+
eventNum: number,
53+
): Promise<{ valid: boolean; errors: string[] }> {
54+
const errors: string[] = [];
55+
56+
console.log(`\n--- Event ${eventNum} (seq: ${event.seq}) ---`);
57+
console.log(` Repo: ${event.repo}`);
58+
console.log(` Rev: ${event.rev}`);
59+
console.log(` Time: ${event.time}`);
60+
console.log(` Ops: ${event.ops.length}`);
61+
62+
for (const op of event.ops) {
63+
console.log(` - ${op.action}: ${op.path} (cid: ${op.cid?.toString() || "null"})`);
64+
}
65+
66+
// Check blocks
67+
console.log(` Blocks field length: ${event.blocks?.length || 0} bytes`);
68+
69+
if (!event.blocks || event.blocks.length === 0) {
70+
errors.push("blocks field is empty!");
71+
return { valid: false, errors };
72+
}
73+
74+
// Try to parse as CAR
75+
let reader: CarReader;
76+
try {
77+
reader = await CarReader.fromBytes(event.blocks);
78+
} catch (e) {
79+
errors.push(`Failed to parse blocks as CAR: ${e}`);
80+
return { valid: false, errors };
81+
}
82+
83+
// Check roots
84+
const roots = await reader.getRoots();
85+
console.log(` CAR roots: ${roots.length}`);
86+
if (roots.length === 0) {
87+
errors.push("CAR has no roots!");
88+
}
89+
90+
// Count blocks in CAR
91+
const blockCids: string[] = [];
92+
for await (const block of reader.blocks()) {
93+
blockCids.push(block.cid.toString());
94+
}
95+
console.log(` CAR blocks: ${blockCids.length}`);
96+
97+
if (blockCids.length === 0) {
98+
errors.push("CAR contains no blocks!");
99+
}
100+
101+
// Verify commit CID is in blocks
102+
const commitCidStr = event.commit?.toString();
103+
if (commitCidStr) {
104+
const hasCommit = blockCids.includes(commitCidStr);
105+
console.log(` Commit CID in blocks: ${hasCommit ? "✓" : "✗"}`);
106+
if (!hasCommit) {
107+
errors.push(`Commit CID ${commitCidStr} not found in blocks`);
108+
}
109+
}
110+
111+
// Verify record CIDs from ops are in blocks (for create/update)
112+
for (const op of event.ops) {
113+
if (op.action !== "delete" && op.cid) {
114+
const cidStr = op.cid.toString();
115+
const hasRecord = blockCids.includes(cidStr);
116+
console.log(` Record CID ${cidStr.slice(0, 20)}... in blocks: ${hasRecord ? "✓" : "✗"}`);
117+
if (!hasRecord) {
118+
errors.push(`Record CID ${cidStr} for ${op.path} not found in blocks`);
119+
}
120+
}
121+
}
122+
123+
// Check tooBig flag
124+
if (event.tooBig) {
125+
console.log(" ⚠️ tooBig flag is set - blocks may be truncated");
126+
}
127+
128+
return { valid: errors.length === 0, errors };
129+
}
130+
131+
async function main() {
132+
console.log(`Connecting to ${PDS_URL}/xrpc/com.atproto.sync.subscribeRepos?cursor=${CURSOR}`);
133+
console.log(`Will verify up to ${MAX_EVENTS} events\n`);
134+
135+
const ws = new WebSocket(
136+
`${PDS_URL}/xrpc/com.atproto.sync.subscribeRepos?cursor=${CURSOR}`,
137+
);
138+
139+
let eventCount = 0;
140+
let validCount = 0;
141+
let invalidCount = 0;
142+
const allErrors: string[] = [];
143+
144+
return new Promise<void>((resolve, reject) => {
145+
ws.on("open", () => {
146+
console.log("✓ Connected to firehose\n");
147+
});
148+
149+
ws.on("message", async (data: Buffer) => {
150+
try {
151+
eventCount++;
152+
const { header, body } = decodeFrame(new Uint8Array(data));
153+
154+
if (header.op === -1) {
155+
// Error frame
156+
console.log("Error frame:", body);
157+
return;
158+
}
159+
160+
if (header.t === "#commit") {
161+
const result = await verifyCommitEvent(
162+
body as CommitEvent,
163+
eventCount,
164+
);
165+
if (result.valid) {
166+
validCount++;
167+
console.log(" ✓ Event valid");
168+
} else {
169+
invalidCount++;
170+
console.log(" ✗ Event INVALID:");
171+
for (const err of result.errors) {
172+
console.log(` - ${err}`);
173+
allErrors.push(`Event ${eventCount}: ${err}`);
174+
}
175+
}
176+
} else {
177+
console.log(`Unknown frame type: ${header.t}`);
178+
}
179+
180+
if (eventCount >= MAX_EVENTS) {
181+
ws.close();
182+
}
183+
} catch (e) {
184+
console.error("Error processing message:", e);
185+
invalidCount++;
186+
}
187+
});
188+
189+
ws.on("error", (err) => {
190+
console.error("WebSocket error:", err);
191+
reject(err);
192+
});
193+
194+
ws.on("close", () => {
195+
console.log("\n" + "=".repeat(50));
196+
console.log("SUMMARY");
197+
console.log("=".repeat(50));
198+
console.log(`Total events: ${eventCount}`);
199+
console.log(`Valid: ${validCount}`);
200+
console.log(`Invalid: ${invalidCount}`);
201+
202+
if (allErrors.length > 0) {
203+
console.log("\nAll errors:");
204+
for (const err of allErrors) {
205+
console.log(` - ${err}`);
206+
}
207+
}
208+
209+
if (invalidCount > 0) {
210+
console.log("\n❌ FIREHOSE HAS ISSUES");
211+
process.exit(1);
212+
} else if (eventCount === 0) {
213+
console.log("\n⚠️ No events received (firehose may be empty)");
214+
process.exit(0);
215+
} else {
216+
console.log("\n✅ FIREHOSE OK");
217+
process.exit(0);
218+
}
219+
220+
resolve();
221+
});
222+
223+
// Timeout after 30 seconds
224+
setTimeout(() => {
225+
console.log("\nTimeout reached, closing connection...");
226+
ws.close();
227+
}, 30000);
228+
});
229+
}
230+
231+
main().catch(console.error);

0 commit comments

Comments
 (0)