Skip to content

Commit 5e058c8

Browse files
authored
fix(pds): store blobs from Worker, not the firehose-holding DO (#165)
* fix(pds): store blobs from Worker, not the firehose-holding DO uploadBlob computed the CID and did the R2 put inside the AccountDurableObject. That DO is single-threaded and also holds the relay's subscribeRepos firehose WebSocket; awaiting an R2 put inside it pins the input gate (R2 latency is independent of object size — even a small link-card OG image can stall), and Cloudflare resets the object with "Durable Object storage operation exceeded timeout", dropping the firehose and desyncing the relay until a manual requestCrawl. The Worker now computes the CID and writes to R2 directly, mirroring the existing sync.getBlob download path, and only calls the DO (new rpcTrackBlob) for the small imported_blobs tracking row. rpcUploadBlob is removed. * docs: rewrite changeset as user-facing
1 parent c30eb0b commit 5e058c8

3 files changed

Lines changed: 47 additions & 38 deletions

File tree

.changeset/blob-upload-off-do.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@getcirrus/pds": patch
3+
---
4+
5+
Fix blob uploads intermittently desyncing the PDS from the relay.
6+
7+
Uploading a blob (commonly a link-card thumbnail) could occasionally fail and leave the relay no longer tracking the repo, so new posts stopped federating until a manual crawl request. Blob uploads are now reliable and no longer drop the firehose connection.

packages/pds/src/account-do.ts

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import {
2828
type SeqIdentityEvent,
2929
type CommitData,
3030
} from "./sequencer";
31-
import { BlobStore, type BlobRef } from "./blobs";
31+
import { BlobStore } from "./blobs";
3232
import { jsonToLex } from "@atproto/lex-json";
3333
import type { PDSEnv } from "./types";
3434
import { RecordAlreadyExistsError, type ValidationStatus } from "./validation";
@@ -1038,28 +1038,23 @@ export class AccountDurableObject extends DurableObject<PDSEnv> {
10381038
}
10391039

10401040
/**
1041-
* RPC method: Upload a blob to R2
1041+
* RPC method: Record an already-stored blob's metadata.
1042+
*
1043+
* The blob bytes are written to R2 by the stateless Worker, not here.
1044+
* This DO is single-threaded and also holds the relay's firehose
1045+
* WebSocket; awaiting an R2 put inside it (R2 latency is independent of
1046+
* object size — even a small image can stall) pins the input gate, and
1047+
* Cloudflare resets the object when a storage op can't complete in time,
1048+
* dropping the firehose and desyncing the relay. Only the tiny tracking
1049+
* row needs the DO's SQLite.
10421050
*/
1043-
async rpcUploadBlob(bytes: Uint8Array, mimeType: string): Promise<BlobRef> {
1044-
if (!this.blobStore) {
1045-
throw new Error("Blob storage not configured");
1046-
}
1047-
1048-
// Enforce size limit (60MB)
1049-
const MAX_BLOB_SIZE = 60 * 1024 * 1024;
1050-
if (bytes.length > MAX_BLOB_SIZE) {
1051-
throw new Error(
1052-
`Blob too large: ${bytes.length} bytes (max ${MAX_BLOB_SIZE})`,
1053-
);
1054-
}
1055-
1056-
const blobRef = await this.blobStore.putBlob(bytes, mimeType);
1057-
1058-
// Track the imported blob for migration progress
1051+
async rpcTrackBlob(
1052+
cid: string,
1053+
size: number,
1054+
mimeType: string,
1055+
): Promise<void> {
10591056
const storage = await this.getStorage();
1060-
storage.trackImportedBlob(blobRef.ref.$link, bytes.length, mimeType);
1061-
1062-
return blobRef;
1057+
storage.trackImportedBlob(cid, size, mimeType);
10631058
}
10641059

10651060
/**

packages/pds/src/xrpc/repo.ts

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
type ValidationStatus,
1111
} from "../validation.js";
1212
import { detectContentType } from "../format.js";
13+
import { BlobStore } from "../blobs.js";
1314
import { buildScopeChecker, requireScope } from "../middleware/auth.js";
1415

1516
function invalidRecordError(
@@ -680,24 +681,30 @@ export async function uploadBlob(
680681
);
681682
}
682683

683-
try {
684-
const blobRef = await accountDO.rpcUploadBlob(bytes, contentType);
685-
return c.json({ blob: blobRef });
686-
} catch (err) {
687-
if (
688-
err instanceof Error &&
689-
err.message.includes("Blob storage not configured")
690-
) {
691-
return c.json(
692-
{
693-
error: "ServiceUnavailable",
694-
message: "Blob storage is not configured",
695-
},
696-
503,
697-
);
698-
}
699-
throw err;
684+
if (!c.env.BLOBS) {
685+
return c.json(
686+
{
687+
error: "ServiceUnavailable",
688+
message: "Blob storage is not configured",
689+
},
690+
503,
691+
);
700692
}
693+
694+
// Store the blob from the stateless Worker, not the DO. The DO is
695+
// single-threaded and holds the relay's firehose WebSocket; awaiting an
696+
// R2 put inside it (R2 latency is independent of object size) pins the
697+
// input gate, and Cloudflare resets the object when a storage op times
698+
// out, dropping the firehose and desyncing the relay. The DO only
699+
// records the tracking metadata. This mirrors sync.getBlob.
700+
const blobStore = new BlobStore(c.env.BLOBS, c.env.DID);
701+
const blobRef = await blobStore.putBlob(bytes, contentType);
702+
await accountDO.rpcTrackBlob(
703+
blobRef.ref.$link,
704+
blobRef.size,
705+
blobRef.mimeType,
706+
);
707+
return c.json({ blob: blobRef });
701708
}
702709

703710
export async function importRepo(

0 commit comments

Comments
 (0)