Skip to content

Commit 8dce997

Browse files
ascorbicclaude
andcommitted
feat(pds): implement R2 blob storage with XRPC endpoints (#5)
- Add BlobStore class for R2 blob upload/retrieval using CID-based addressing - Implement com.atproto.repo.uploadBlob endpoint with 5MB size limit - Implement com.atproto.sync.getBlob endpoint with direct R2 access - Add RPC methods to AccountDurableObject for blob operations - Export BlobStore and BlobRef types for advanced users - Fix TypeScript errors in sequencer (RecordWriteOp import, LexValue types) - Remove unused storage variable in createRecord method - Add comprehensive blob storage tests (10 tests) - Update EDGE_PDS_PLAN.md to mark Phase 5 complete - Update CLAUDE.md with @atproto library usage guidance 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent dca8830 commit 8dce997

11 files changed

Lines changed: 657 additions & 31 deletions

File tree

CLAUDE.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,34 +110,41 @@ Required environment variables (validated at module load using `cloudflare:worke
110110

111111
### Protocol Helpers and Dependencies
112112

113-
The codebase uses official @atproto packages for all protocol operations:
113+
**CRITICAL: Always use @atproto libraries instead of low-level dependencies where available.**
114+
115+
The codebase uses official @atproto packages for all protocol operations. When implementing new features:
116+
117+
- **Always prefer @atproto packages** over direct use of `multiformats`, `uint8arrays`, `cborg`, etc.
118+
- **Reference the atproto monorepo** at `~/Repos/atproto` to understand available functions and patterns
119+
- The @atproto packages provide stable, tested abstractions over low-level primitives
114120

115121
**Encoding and Data Structures:**
116122

117-
- `@atproto/lex-cbor` - CBOR encoding/decoding with `encode()` and `cidForCbor()`
123+
- `@atproto/lex-cbor` - CBOR encoding/decoding with `encode()`, `cidForCbor()`, `cidForRawBytes()`
118124
- `@atproto/lex-data` - CID operations via stable interface wrapping multiformats
119125
- `@atproto/repo` - Repository operations, `BlockMap`, `blocksToCarFile()`
120126

121127
**Protocol Utilities:**
122128

123129
- `@atproto/common-web` - `TID.nextStr()` for record key generation
124130
- `@atproto/syntax` - `AtUri.make()`, `ensureValidDid()`, `ensureValidHandle()`
125-
- `@atproto/crypto` - `Secp256k1Keypair` for signing operations
131+
- `@atproto/crypto` - `Secp256k1Keypair` for signing operations, `sha256()` for hashing
126132
- `@atproto/lexicon` - Schema validation and type definitions
127133

128134
**Important Notes:**
129135

130136
- Never manually construct AT URIs - use `AtUri.make(did, collection, rkey).toString()`
131137
- Never manually generate record keys - use `TID.nextStr()`
132138
- Always validate DIDs and handles using `ensureValidDid()` / `ensureValidHandle()`
139+
- Use `cidForRawBytes()` from `@atproto/lex-cbor` for blob CID generation
133140
- Use `@atproto/lex-cbor` for test fixtures instead of direct `@ipld/dag-cbor`
134141
- CAR file export uses `blocksToCarFile()` from `@atproto/repo`
135142

136143
### Vitest Configuration Notes
137144

138145
- **Module Shimming**: Uses `resolve: { conditions: ["node", "require"] }` to force CJS builds for multiformats
139146
- **BlockMap/CidSet**: Access internal Map/Set via `(blocks as unknown as { map: Map<...> }).map` when iterating
140-
- **Test Count**: 48 tests (16 storage tests, 26 XRPC tests, 6 firehose tests)
147+
- **Test Count**: 58 tests (16 storage tests, 26 XRPC tests, 6 firehose tests, 10 blob tests)
141148

142149
### Firehose Implementation
143150

EDGE_PDS_PLAN.md

Lines changed: 132 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Build a single-user AT Protocol Personal Data Server (PDS) on Cloudflare Workers
1212

1313
**Live at: https://pds.mk.gg**
1414

15-
### Completed (Phase 1 + Phase 2 + Phase 3 + Phase 4 + Phase 6 + Phase 7)
15+
### Completed (Phase 1 + Phase 2 + Phase 3 + Phase 4 + Phase 5 + Phase 6 + Phase 7)
1616

1717
-**Storage Layer** (Phase 1) - `SqliteRepoStorage` implementing `@atproto/repo` RepoStorage interface
1818
-**Durable Object** (Phase 2) - `AccountDurableObject` with Repo integration
@@ -41,17 +41,25 @@ Build a single-user AT Protocol Personal Data Server (PDS) on Cloudflare Workers
4141
-**Deployment** - Custom domain `pds.mk.gg` with auto-provisioned DNS
4242
-**Signing Keys** - secp256k1 keypair generated and configured
4343
-**Environment Validation** - Module-scope validation using `cloudflare:workers` env import
44-
-**Testing** - Migrated to vitest 4, all 48 tests passing
44+
-**Blob Storage** (Phase 5) - R2 integration for image/media uploads
45+
- `BlobStore` class using `cidForRawBytes()` from `@atproto/lex-cbor`
46+
- `com.atproto.repo.uploadBlob` endpoint (authenticated, 5MB limit)
47+
- `com.atproto.sync.getBlob` endpoint (public read access)
48+
- Direct R2 access in endpoint (R2ObjectBody cannot be serialized across RPC)
49+
- Blobs stored with DID prefix for isolation
50+
-**Testing** - Migrated to vitest 4, all 58 tests passing
4551
- 16 storage tests
4652
- 26 XRPC tests (auth, concurrency, error handling, CAR validation)
4753
- 6 firehose tests (event sequencing, cursor validation, backfill)
54+
- 10 blob tests (upload, retrieval, size limits, content types)
4855
-**TypeScript** - All diagnostic errors resolved, proper type declarations for cloudflare:test
4956
-**Protocol Helpers** - All protocol operations use official @atproto utilities
5057
- Record keys: `TID.nextStr()` from `@atproto/common-web`
5158
- AT URI construction: `AtUri.make()` from `@atproto/syntax`
5259
- DID validation: `ensureValidDid()` from `@atproto/syntax`
5360
- Handle validation: `ensureValidHandle()` from `@atproto/syntax`
5461
- CBOR encoding: `@atproto/lex-cbor`
62+
- Blob CID generation: `cidForRawBytes()` from `@atproto/lex-cbor`
5563
- CAR export: `blocksToCarFile()` from `@atproto/repo`
5664
-**Dependency Optimization** - Removed 6 low-level dependencies, added 3 @atproto helpers
5765
- Removed: `varint`, `@types/varint`, `cborg`, `uint8arrays`, `@ipld/dag-cbor`, `multiformats`
@@ -60,7 +68,7 @@ Build a single-user AT Protocol Personal Data Server (PDS) on Cloudflare Workers
6068

6169
### Not Started
6270

63-
- **Blob Storage** (Phase 5) - R2 integration (R2 needs enabling in dashboard)
71+
- None! All planned phases are complete.
6472

6573
### Testing & Development Notes
6674

@@ -89,6 +97,8 @@ for (const [cidStr, bytes] of internalMap) { ... }
8997

9098
**Durable Object RPC Types**: Using `Rpc.Serializable<any>` for RPC method return types to ensure TypeScript correctly infers serializable types instead of `never`.
9199

100+
**R2 Blob Retrieval**: The `getBlob` endpoint accesses R2 directly rather than going through DO RPC because `R2ObjectBody` cannot be serialized (contains ReadableStream). Upload operations still use DO RPC since they only need to pass Uint8Array and return serializable metadata.
101+
92102
---
93103

94104
## Architecture
@@ -1720,6 +1730,125 @@ These can all be added later.
17201730

17211731
---
17221732

1733+
## Deployment Architecture
1734+
1735+
### Design Decision: Zero-Code Re-Export Pattern
1736+
1737+
For maximum simplicity, users deploying a PDS should not need to write any code. The `@ascorbic/pds-worker` package provides everything needed, and users simply re-export it.
1738+
1739+
#### User's Worker (Minimal)
1740+
1741+
```typescript
1742+
// src/index.ts
1743+
export { default, AccountDurableObject } from '@ascorbic/pds-worker'
1744+
```
1745+
1746+
That's it. No additional code required.
1747+
1748+
#### Package Exports
1749+
1750+
The `@ascorbic/pds-worker` package exports:
1751+
1752+
```typescript
1753+
// Core exports for advanced users
1754+
export { SqliteRepoStorage } from "./storage"
1755+
export { AccountDurableObject } from "./account-do"
1756+
export { BlobStore, type BlobRef } from "./blobs"
1757+
export { Sequencer } from "./sequencer"
1758+
1759+
// Default export: configured Hono app
1760+
export default app
1761+
```
1762+
1763+
#### Configuration
1764+
1765+
All configuration is via environment variables and secrets:
1766+
1767+
**Required environment variables:**
1768+
- `PDS_HOSTNAME` - Public hostname (set in wrangler.jsonc)
1769+
1770+
**Required secrets:**
1771+
- `DID` - Account's DID (e.g., "did:web:pds.example.com")
1772+
- `HANDLE` - Account's handle (e.g., "alice.pds.example.com")
1773+
- `AUTH_TOKEN` - Bearer token for write operations
1774+
- `SIGNING_KEY` - Private key for signing commits (secp256k1 JWK)
1775+
- `SIGNING_KEY_PUBLIC` - Public key for DID document (multibase)
1776+
1777+
**Resource bindings:**
1778+
- `ACCOUNT` - DurableObjectNamespace binding
1779+
- `BLOBS` - R2Bucket binding
1780+
1781+
#### Deployment Workflow
1782+
1783+
1. **Scaffold** (future: via `npm create @ascorbic/pds`)
1784+
- Creates project directory with re-export pattern
1785+
- Generates wrangler.jsonc with bindings
1786+
- Provides setup script for key generation
1787+
1788+
2. **Setup** (via setup script)
1789+
- Interactive prompts for hostname and handle
1790+
- Generates secp256k1 keypair
1791+
- Creates DID (did:web based on hostname)
1792+
- Generates random AUTH_TOKEN
1793+
- Writes to `.dev.vars` for local dev
1794+
1795+
3. **Local Development**
1796+
```bash
1797+
wrangler dev
1798+
```
1799+
1800+
4. **Production Deployment**
1801+
```bash
1802+
# Create R2 bucket
1803+
wrangler r2 bucket create pds-blobs
1804+
1805+
# Set secrets
1806+
wrangler secret put DID
1807+
wrangler secret put HANDLE
1808+
wrangler secret put AUTH_TOKEN
1809+
wrangler secret put SIGNING_KEY
1810+
wrangler secret put SIGNING_KEY_PUBLIC
1811+
1812+
# Deploy
1813+
wrangler deploy
1814+
```
1815+
1816+
#### Demo Structure
1817+
1818+
```
1819+
demos/pds/
1820+
├── src/
1821+
│ └── index.ts # Re-exports @ascorbic/pds-worker
1822+
├── wrangler.jsonc # Worker config with bindings
1823+
├── package.json # Dependencies
1824+
├── .env.example # Template for required vars
1825+
└── README.md # Setup instructions
1826+
```
1827+
1828+
#### Future: create-pds CLI
1829+
1830+
```bash
1831+
npm create @ascorbic/pds my-pds
1832+
cd my-pds
1833+
npm install
1834+
npm run dev
1835+
```
1836+
1837+
This will scaffold a complete deployment with:
1838+
- Project structure
1839+
- Generated keys and configuration
1840+
- Pre-configured wrangler.jsonc
1841+
- Setup instructions
1842+
1843+
#### Rationale
1844+
1845+
1. **Single-user PDS**: Not a multi-tenant platform - each deployment serves one account
1846+
2. **Configuration via environment**: All customization is environment-based
1847+
3. **No code needed**: Users shouldn't need to understand Hono/Workers/DOs to deploy
1848+
4. **Future-proof**: Can add factory function later for customization without breaking changes
1849+
1850+
---
1851+
17231852
## Reference Material
17241853

17251854
- AT Protocol specs: https://atproto.com/specs

packages/pds/src/account-do.ts

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { AtUri } from "@atproto/syntax";
1515
import { encode as cborEncode } from "@atproto/lex-cbor";
1616
import { SqliteRepoStorage } from "./storage";
1717
import { Sequencer, type SeqEvent, type CommitData } from "./sequencer";
18+
import { BlobStore, type BlobRef } from "./blobs";
1819

1920
/**
2021
* Account Durable Object - manages a single user's AT Protocol repository.
@@ -30,6 +31,7 @@ export class AccountDurableObject extends DurableObject<Env> {
3031
private repo: Repo | null = null;
3132
private keypair: Secp256k1Keypair | null = null;
3233
private sequencer: Sequencer | null = null;
34+
private blobStore: BlobStore | null = null;
3335
private storageInitialized = false;
3436
private repoInitialized = false;
3537

@@ -43,6 +45,11 @@ export class AccountDurableObject extends DurableObject<Env> {
4345
if (!env.DID) {
4446
throw new Error("Missing required environment variable: DID");
4547
}
48+
49+
// Initialize BlobStore if R2 bucket is available
50+
if (env.BLOBS) {
51+
this.blobStore = new BlobStore(env.BLOBS, env.DID);
52+
}
4653
}
4754

4855
/**
@@ -242,7 +249,6 @@ export class AccountDurableObject extends DurableObject<Env> {
242249
}> {
243250
const repo = await this.getRepo();
244251
const keypair = await this.getKeypair();
245-
const storage = await this.getStorage();
246252

247253
const actualRkey = rkey || TID.nextStr();
248254
const createOp: RecordCreateOp = {
@@ -461,6 +467,37 @@ export class AccountDurableObject extends DurableObject<Env> {
461467
return blocksToCarFile(root, blocks);
462468
}
463469

470+
/**
471+
* RPC method: Upload a blob to R2
472+
*/
473+
async rpcUploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> {
474+
if (!this.blobStore) {
475+
throw new Error("Blob storage not configured");
476+
}
477+
478+
// Enforce size limit (5MB)
479+
const MAX_BLOB_SIZE = 5 * 1024 * 1024;
480+
if (bytes.length > MAX_BLOB_SIZE) {
481+
throw new Error(
482+
`Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`,
483+
);
484+
}
485+
486+
return this.blobStore.putBlob(bytes, mimeType);
487+
}
488+
489+
/**
490+
* RPC method: Get a blob from R2
491+
*/
492+
async rpcGetBlob(cidStr: string): Promise<R2ObjectBody | null> {
493+
if (!this.blobStore) {
494+
throw new Error("Blob storage not configured");
495+
}
496+
497+
const cid = CID.parse(cidStr);
498+
return this.blobStore.getBlob(cid);
499+
}
500+
464501
/**
465502
* Encode a firehose frame (header + body CBOR).
466503
*/
@@ -495,10 +532,7 @@ export class AccountDurableObject extends DurableObject<Env> {
495532
/**
496533
* Backfill firehose events from a cursor.
497534
*/
498-
private async backfillFirehose(
499-
ws: WebSocket,
500-
cursor: number,
501-
): Promise<void> {
535+
private async backfillFirehose(ws: WebSocket, cursor: number): Promise<void> {
502536
if (!this.sequencer) {
503537
throw new Error("Sequencer not initialized");
504538
}
@@ -594,7 +628,10 @@ export class AccountDurableObject extends DurableObject<Env> {
594628
/**
595629
* WebSocket message handler (hibernation API).
596630
*/
597-
override webSocketMessage(_ws: WebSocket, _message: string | ArrayBuffer): void {
631+
override webSocketMessage(
632+
_ws: WebSocket,
633+
_message: string | ArrayBuffer,
634+
): void {
598635
// Firehose is server-push only, ignore client messages
599636
}
600637

packages/pds/src/blobs.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { CID } from "@atproto/lex-data";
2+
import { cidForRawBytes } from "@atproto/lex-cbor";
3+
4+
export interface BlobRef {
5+
$type: "blob";
6+
ref: { $link: string };
7+
mimeType: string;
8+
size: number;
9+
}
10+
11+
/**
12+
* BlobStore manages blob storage in R2.
13+
* Blobs are stored with CID-based keys prefixed by the account's DID.
14+
*/
15+
export class BlobStore {
16+
constructor(
17+
private r2: R2Bucket,
18+
private did: string,
19+
) {}
20+
21+
/**
22+
* Upload a blob to R2 and return a BlobRef.
23+
*/
24+
async putBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> {
25+
// Compute CID using SHA-256 (RAW codec)
26+
const cid = await cidForRawBytes(bytes);
27+
28+
// Store in R2 with DID prefix for isolation
29+
const key = `${this.did}/${cid.toString()}`;
30+
await this.r2.put(key, bytes, {
31+
httpMetadata: { contentType: mimeType },
32+
});
33+
34+
return {
35+
$type: "blob",
36+
ref: { $link: cid.toString() },
37+
mimeType,
38+
size: bytes.length,
39+
};
40+
}
41+
42+
/**
43+
* Retrieve a blob from R2 by CID.
44+
*/
45+
async getBlob(cid: CID): Promise<R2ObjectBody | null> {
46+
const key = `${this.did}/${cid.toString()}`;
47+
return this.r2.get(key);
48+
}
49+
50+
/**
51+
* Check if a blob exists in R2.
52+
*/
53+
async hasBlob(cid: CID): Promise<boolean> {
54+
const key = `${this.did}/${cid.toString()}`;
55+
const head = await this.r2.head(key);
56+
return head !== null;
57+
}
58+
}

0 commit comments

Comments
 (0)