|
| 1 | +/** |
| 2 | + * Sync Stripe → AWS DSQL via the engine API (TypeScript). |
| 3 | + * |
| 4 | + * Usage: |
| 5 | + * npx tsx demo/stripe-to-dsql.ts |
| 6 | + * bun demo/stripe-to-dsql.ts |
| 7 | + * |
| 8 | + * Env: |
| 9 | + * STRIPE_API_KEY — Stripe secret key |
| 10 | + * DSQL_ENDPOINT — DSQL cluster endpoint (e.g. <id>.dsql.us-east-1.on.aws) |
| 11 | + * AWS_REGION — AWS region (default: us-east-1) |
| 12 | + * AWS_ACCESS_KEY_ID — AWS credentials |
| 13 | + * AWS_SECRET_ACCESS_KEY — AWS credentials |
| 14 | + */ |
| 15 | +import { execSync } from 'node:child_process' |
| 16 | +import { createConnectorResolver, createEngine } from '../apps/engine/src/lib/index.js' |
| 17 | +import { defaultConnectors } from '../apps/engine/src/lib/default-connectors.js' |
| 18 | +import { fileStateStore } from '../apps/engine/src/lib/state-store.js' |
| 19 | +import type { PipelineConfig } from '../packages/protocol/src/index.js' |
| 20 | +import { buildPoolConfig, pg } from '../packages/destination-aws-dsql/src/index.js' |
| 21 | + |
| 22 | +const stripeApiKey = process.env.STRIPE_API_KEY |
| 23 | +const region = process.env.AWS_REGION ?? 'us-east-1' |
| 24 | + |
| 25 | +// Auto-read endpoint from terraform output if not set |
| 26 | +const dsqlEndpoint = |
| 27 | + process.env.DSQL_ENDPOINT ?? |
| 28 | + (() => { |
| 29 | + try { |
| 30 | + return execSync('terraform -chdir=terraform output -raw cluster_endpoint', { |
| 31 | + encoding: 'utf8', |
| 32 | + }).trim() |
| 33 | + } catch { |
| 34 | + return undefined |
| 35 | + } |
| 36 | + })() |
| 37 | + |
| 38 | +if (!stripeApiKey) throw new Error('Set STRIPE_API_KEY') |
| 39 | +if (!dsqlEndpoint) |
| 40 | + throw new Error('Set DSQL_ENDPOINT or run `terraform -chdir=terraform apply` first') |
| 41 | + |
| 42 | +const pipeline: PipelineConfig = { |
| 43 | + source: { type: 'stripe', stripe: { api_key: stripeApiKey, backfill_limit: 10 } }, |
| 44 | + destination: { |
| 45 | + type: 'aws_dsql', |
| 46 | + aws_dsql: { endpoint: dsqlEndpoint, region, schema: 'public' }, |
| 47 | + }, |
| 48 | + streams: [{ name: 'products' }, { name: 'prices' }, { name: 'customers' }], |
| 49 | +} |
| 50 | + |
| 51 | +const resolver = await createConnectorResolver(defaultConnectors, { path: true }) |
| 52 | +const engine = await createEngine(resolver) |
| 53 | + |
| 54 | +// Create tables |
| 55 | +for await (const _msg of engine.pipeline_setup(pipeline)) { |
| 56 | +} |
| 57 | + |
| 58 | +// State: file-backed, resumable across runs |
| 59 | +const store = fileStateStore('.sync-state-dsql.json') |
| 60 | +const state = await store.get() |
| 61 | + |
| 62 | +// Sync |
| 63 | +for await (const msg of engine.pipeline_sync(pipeline, { state })) { |
| 64 | + if (msg.type === 'source_state') { |
| 65 | + if (msg.source_state.state_type === 'global') await store.setGlobal(msg.source_state.data) |
| 66 | + else await store.set(msg.source_state.stream, msg.source_state.data) |
| 67 | + } |
| 68 | + console.log(JSON.stringify(msg)) |
| 69 | +} |
| 70 | + |
| 71 | +// Verify: query DSQL to show what was synced |
| 72 | +console.log('\n--- Verifying data in DSQL ---') |
| 73 | +const poolConfig = await buildPoolConfig({ |
| 74 | + endpoint: dsqlEndpoint, |
| 75 | + region, |
| 76 | + schema: 'public', |
| 77 | + batch_size: 100, |
| 78 | +}) |
| 79 | +const pool = new pg.Pool(poolConfig) |
| 80 | + |
| 81 | +for (const table of ['customers', 'prices', 'products']) { |
| 82 | + const { rows } = await pool.query(`SELECT count(*) FROM ${table}`) |
| 83 | + console.log(`${table}: ${rows[0].count} rows`) |
| 84 | +} |
| 85 | + |
| 86 | +console.log('\nSample rows:') |
| 87 | +for (const table of ['customers', 'products']) { |
| 88 | + const { rows } = await pool.query( |
| 89 | + `SELECT id, substring(_raw_data, 1, 100) as data FROM ${table} LIMIT 2` |
| 90 | + ) |
| 91 | + for (const row of rows) console.log(` [${table}] ${row.id}: ${row.data}...`) |
| 92 | +} |
| 93 | + |
| 94 | +await pool.end() |
0 commit comments