Skip to content

Commit 2099316

Browse files
committed
feat(export): stream /export/dump to R2 with DO alarm resumption (#59)
The legacy /export/dump route buffers the entire dump in memory and runs synchronously, so it falls over on databases that exceed the 30s Worker timeout or the Durable Object memory ceiling (currently 1GB, soon 10GB). This change adds a streaming path that lives inside the Durable Object: - POST /export/dump kicks off a job, opens an R2 multipart upload, and returns 202 with a jobId. Supports format=sql|csv|json plus optional callbackUrl/table/chunkSize. - GET /export/dump/status/:jobId returns progress (tables, rows, bytes, parts uploaded) and a downloadUrl once status is 'completed'. - GET /export/dump/download/:jobId streams the finished object back from R2 to the client. - DELETE /export/dump/:jobId aborts an in-flight upload. The engine paginates 1000 rows at a time, buffers up to the R2 multipart 5 MiB minimum, flushes parts as they fill, and budgets each tick at 20s. When a tick yields, the leftover bytes are persisted to a temp R2 object (DO storage values are capped at 128 KiB and cannot hold the buffer directly). The DO alarm() handler dispatches dump work first, then falls through to the existing cron logic, so the two co-exist on the same alarm channel. A new [[r2_buckets]] binding named DATABASE_DUMPS gates the streaming path. The legacy GET /export/dump remains untouched for small databases and existing clients. Tests: 17 new unit tests covering the engine (mid-tick yield/resume, multipart flushing at the 5 MiB threshold, error abort, BLOB literals, empty databases, CSV/JSON formats) and the HTTP routes.
1 parent bb22735 commit 2099316

13 files changed

Lines changed: 1994 additions & 5 deletions

README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,54 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
243243
</code>
244244
</pre>
245245

246+
<h4>Streaming SQL dumps for large databases</h4>
247+
The synchronous endpoint above buffers the whole dump in memory and is bounded by the 30-second Worker timeout. For databases that exceed that budget, switch to the streaming variant — it paginates rows, writes them straight into R2 over (potentially many) Durable Object alarm ticks, and lets you download the finished artifact when ready.
248+
249+
Add an R2 binding called `DATABASE_DUMPS` to your `wrangler.toml`:
250+
251+
<pre>
252+
<code>
253+
[[r2_buckets]]
254+
binding = "DATABASE_DUMPS"
255+
bucket_name = "starbasedb-dumps"
256+
</code>
257+
</pre>
258+
259+
Kick off the job (returns 202 with a `jobId`):
260+
261+
<pre>
262+
<code>
263+
curl -X POST 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
264+
--header 'Authorization: Bearer ABC123' \
265+
--header 'Content-Type: application/json' \
266+
--data '{ "format": "sql", "callbackUrl": "https://hooks.example.com/dump-done" }'
267+
</code>
268+
</pre>
269+
270+
`format` may be `sql`, `csv`, or `json`. Optional fields: `callbackUrl` (POSTed the status view on completion), `table` (export a single table only), `chunkSize` (rows per SELECT batch; default 1000).
271+
272+
Poll the status, then download once it reads `completed`:
273+
274+
<pre>
275+
<code>
276+
curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/status/JOB_ID' \
277+
--header 'Authorization: Bearer ABC123'
278+
279+
curl 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/download/JOB_ID' \
280+
--header 'Authorization: Bearer ABC123' \
281+
--output database_dump.sql
282+
</code>
283+
</pre>
284+
285+
To cancel an in-flight job:
286+
287+
<pre>
288+
<code>
289+
curl -X DELETE 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump/JOB_ID' \
290+
--header 'Authorization: Bearer ABC123'
291+
</code>
292+
</pre>
293+
246294
<h3>JSON Data Export</h3>
247295
<pre>
248296
<code>

src/do.ts

Lines changed: 253 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
11
import { DurableObject } from 'cloudflare:workers'
2+
import { runTick } from './export/dump-engine'
3+
import { createDumpHost, jobStateKey } from './export/do-dump-host'
4+
import {
5+
DumpFormat,
6+
DumpJobOptions,
7+
DumpJobState,
8+
DumpJobStatusView,
9+
newJobState,
10+
toStatusView,
11+
} from './export/streaming-dump'
12+
13+
const ACTIVE_DUMP_KEY = 'dump:active'
14+
const DUMP_RESUME_DELAY_MS = 1_000
215

316
export class StarbaseDBDurableObject extends DurableObject {
417
// Durable storage for the SQL database
@@ -9,6 +22,8 @@ export class StarbaseDBDurableObject extends DurableObject {
922
public connections = new Map<string, WebSocket>()
1023
// Store the client auth token for requests back to our Worker
1124
private clientAuthToken: string
25+
// R2 bucket binding for streaming dump output (optional).
26+
private dumpBucket: R2Bucket | undefined
1227

1328
/**
1429
* The constructor is invoked once upon creation of the Durable Object, i.e. the first call to
@@ -22,6 +37,7 @@ export class StarbaseDBDurableObject extends DurableObject {
2237
this.clientAuthToken = env.CLIENT_AUTHORIZATION_TOKEN
2338
this.sql = ctx.storage.sql
2439
this.storage = ctx.storage
40+
this.dumpBucket = env.DATABASE_DUMPS
2541

2642
// Install default necessary `tmp_` tables for various features here.
2743
const cacheStatement = `
@@ -72,6 +88,10 @@ export class StarbaseDBDurableObject extends DurableObject {
7288
deleteAlarm: this.deleteAlarm.bind(this),
7389
getStatistics: this.getStatistics.bind(this),
7490
executeQuery: this.executeQuery.bind(this),
91+
startDumpJob: this.startDumpJob.bind(this),
92+
getDumpJob: this.getDumpJob.bind(this),
93+
getDumpDownloadBody: this.getDumpDownloadBody.bind(this),
94+
cancelDumpJob: this.cancelDumpJob.bind(this),
7595
}
7696
}
7797

@@ -105,12 +125,28 @@ export class StarbaseDBDurableObject extends DurableObject {
105125
}
106126

107127
async alarm() {
128+
// Dispatch streaming dump work first so a long-running export does not
129+
// get starved by other alarm-driven features. The dump engine sets its
130+
// own continuation alarm if it needs another tick.
131+
try {
132+
await this.continueActiveDumpJob()
133+
} catch (err) {
134+
console.error('Failed to continue dump job:', err)
135+
}
136+
108137
try {
109138
// Fetch all the tasks that are marked to emit an event for this cycle.
110-
const task = (await this.executeQuery({
111-
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
112-
isRaw: false,
113-
})) as Record<string, SqlStorageValue>[]
139+
// The cron table is created lazily by the CronPlugin; if it does not
140+
// exist yet (fresh DO with no cron plugin installed) we silently skip.
141+
let task: Record<string, SqlStorageValue>[] = []
142+
try {
143+
task = (await this.executeQuery({
144+
sql: 'SELECT * FROM tmp_cron_tasks WHERE is_active = 1;',
145+
isRaw: false,
146+
})) as Record<string, SqlStorageValue>[]
147+
} catch {
148+
return
149+
}
114150

115151
if (!task.length) {
116152
return
@@ -148,6 +184,219 @@ export class StarbaseDBDurableObject extends DurableObject {
148184
}
149185
}
150186

187+
// -- Streaming dump job machinery -------------------------------------
188+
189+
/**
190+
* Begin a new streaming dump job. Creates the R2 multipart upload,
191+
* persists initial state, and schedules an immediate alarm to kick off
192+
* the first tick. Returns a status view the worker can hand back to the
193+
* client right away.
194+
*/
195+
public async startDumpJob(
196+
options: DumpJobOptions
197+
): Promise<DumpJobStatusView> {
198+
if (!this.dumpBucket) {
199+
throw new Error(
200+
'Streaming dump requires the DATABASE_DUMPS R2 binding. ' +
201+
'Add an [[r2_buckets]] entry to wrangler.toml.'
202+
)
203+
}
204+
205+
const tables = await this.listUserTables(options.table)
206+
const jobId = crypto.randomUUID()
207+
const state = newJobState(jobId, options, tables)
208+
209+
// Open the R2 multipart upload up-front so part numbers and uploadId
210+
// are stable across alarm-driven continuations.
211+
const upload = await this.dumpBucket.createMultipartUpload(
212+
state.objectKey,
213+
{
214+
httpMetadata: {
215+
contentType: this.contentTypeFor(options.format),
216+
contentDisposition: `attachment; filename="${state.objectKey.split('/').pop()}"`,
217+
},
218+
}
219+
)
220+
state.uploadId = upload.uploadId
221+
222+
await this.storage.put(jobStateKey(jobId), state)
223+
await this.storage.put(ACTIVE_DUMP_KEY, jobId)
224+
await this.setAlarm(Date.now() + 1000)
225+
return toStatusView(state)
226+
}
227+
228+
/** Read the current state of a dump job for the status endpoint. */
229+
public async getDumpJob(jobId: string): Promise<DumpJobStatusView | null> {
230+
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
231+
if (!state) return null
232+
return toStatusView(state)
233+
}
234+
235+
/**
236+
* Stream the dump body from R2 back to the worker so it can be relayed to
237+
* the client. Returns null when the job is not finished yet or missing.
238+
*/
239+
public async getDumpDownloadBody(jobId: string): Promise<{
240+
body: ReadableStream
241+
size: number
242+
contentType: string
243+
filename: string
244+
} | null> {
245+
if (!this.dumpBucket) return null
246+
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
247+
if (!state || state.status !== 'completed') return null
248+
const obj = await this.dumpBucket.get(state.objectKey)
249+
if (!obj) return null
250+
return {
251+
body: obj.body,
252+
size: obj.size,
253+
contentType:
254+
obj.httpMetadata?.contentType ??
255+
this.contentTypeFor(state.format),
256+
filename: state.objectKey.split('/').pop() ?? state.objectKey,
257+
}
258+
}
259+
260+
/** Cancel an in-flight dump job. Aborts the R2 multipart and marks failed. */
261+
public async cancelDumpJob(
262+
jobId: string
263+
): Promise<DumpJobStatusView | null> {
264+
const state = await this.storage.get<DumpJobState>(jobStateKey(jobId))
265+
if (!state) return null
266+
if (state.status === 'completed' || state.status === 'failed')
267+
return toStatusView(state)
268+
269+
if (this.dumpBucket && state.uploadId) {
270+
try {
271+
const upload = this.dumpBucket.resumeMultipartUpload(
272+
state.objectKey,
273+
state.uploadId
274+
)
275+
await upload.abort()
276+
} catch (err) {
277+
console.error('Failed to abort R2 multipart upload:', err)
278+
}
279+
}
280+
if (this.dumpBucket && state.pendingTempKey) {
281+
await this.dumpBucket.delete(state.pendingTempKey).catch(() => {})
282+
}
283+
state.status = 'cancelled'
284+
state.error = 'Cancelled by user'
285+
state.progress.updatedAt = Date.now()
286+
await this.storage.put(jobStateKey(jobId), state)
287+
const active = await this.storage.get<string>(ACTIVE_DUMP_KEY)
288+
if (active === jobId) await this.storage.delete(ACTIVE_DUMP_KEY)
289+
return toStatusView(state)
290+
}
291+
292+
/**
293+
* Run one tick of the active dump job (if any). Called from alarm().
294+
* Persists state after every tick and re-arms the alarm if more work
295+
* remains.
296+
*/
297+
private async continueActiveDumpJob(): Promise<void> {
298+
const activeId = await this.storage.get<string>(ACTIVE_DUMP_KEY)
299+
if (!activeId) return
300+
if (!this.dumpBucket) {
301+
console.error(
302+
'Active dump job exists but DATABASE_DUMPS R2 binding is missing.'
303+
)
304+
await this.storage.delete(ACTIVE_DUMP_KEY)
305+
return
306+
}
307+
const state = await this.storage.get<DumpJobState>(
308+
jobStateKey(activeId)
309+
)
310+
if (!state) {
311+
await this.storage.delete(ACTIVE_DUMP_KEY)
312+
return
313+
}
314+
if (
315+
state.status === 'completed' ||
316+
state.status === 'failed' ||
317+
state.status === 'cancelled'
318+
) {
319+
await this.storage.delete(ACTIVE_DUMP_KEY)
320+
return
321+
}
322+
323+
const host = createDumpHost({
324+
sql: this.sql,
325+
storage: this.storage,
326+
bucket: this.dumpBucket,
327+
})
328+
329+
try {
330+
const { done } = await runTick(state, host)
331+
await this.storage.put(jobStateKey(state.jobId), state)
332+
if (done) {
333+
await this.storage.delete(ACTIVE_DUMP_KEY)
334+
if (state.callbackUrl) {
335+
this.fireDumpCallback(state).catch((err) =>
336+
console.error('Dump callback failed:', err)
337+
)
338+
}
339+
} else {
340+
await this.setAlarm(Date.now() + DUMP_RESUME_DELAY_MS)
341+
}
342+
} catch (err) {
343+
console.error('Dump engine error:', err)
344+
// The engine sets state.status='failed' before throwing. Persist
345+
// the failure so callers see it via the status endpoint.
346+
state.status = 'failed'
347+
state.error =
348+
err instanceof Error ? err.message : String(err ?? 'unknown')
349+
state.progress.updatedAt = Date.now()
350+
await this.storage.put(jobStateKey(state.jobId), state)
351+
await this.storage.delete(ACTIVE_DUMP_KEY)
352+
if (state.callbackUrl) {
353+
this.fireDumpCallback(state).catch((cbErr) =>
354+
console.error('Dump failure callback failed:', cbErr)
355+
)
356+
}
357+
}
358+
}
359+
360+
private async fireDumpCallback(state: DumpJobState): Promise<void> {
361+
if (!state.callbackUrl) return
362+
try {
363+
await fetch(state.callbackUrl, {
364+
method: 'POST',
365+
headers: { 'Content-Type': 'application/json' },
366+
body: JSON.stringify(toStatusView(state)),
367+
})
368+
} catch (err) {
369+
console.error('Failed to fire dump completion callback:', err)
370+
}
371+
}
372+
373+
/**
374+
* List the user-facing tables this DO holds. Excludes SQLite internal
375+
* tables and the tmp_* feature tables which would otherwise leak into
376+
* the export.
377+
*/
378+
private async listUserTables(only?: string): Promise<string[]> {
379+
if (only) {
380+
const exists = (await this.executeQuery({
381+
sql: 'SELECT name FROM sqlite_master WHERE type = ? AND name = ?;',
382+
params: ['table', only],
383+
isRaw: false,
384+
})) as Record<string, SqlStorageValue>[]
385+
return exists.length ? [only] : []
386+
}
387+
const rows = (await this.executeQuery({
388+
sql: "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' AND name NOT LIKE 'tmp_%' AND name NOT LIKE '_cf_%' ORDER BY name;",
389+
isRaw: false,
390+
})) as Record<string, SqlStorageValue>[]
391+
return rows.map((r) => String(r.name))
392+
}
393+
394+
private contentTypeFor(format: DumpFormat): string {
395+
if (format === 'csv') return 'text/csv'
396+
if (format === 'json') return 'application/json'
397+
return 'application/sql'
398+
}
399+
151400
public async getStatistics(): Promise<{
152401
databaseSize: number
153402
activeConnections: number

0 commit comments

Comments
 (0)