Skip to content

Commit 2c8fd2e

Browse files
tonyxiaoclaude
andcommitted
Add AWS DSQL destination connector with Terraform and demo
- New `packages/destination-aws-dsql` connector using IAM auth via `@aws-sdk/dsql-signer`, adapted for DSQL limitations (no jsonb, no triggers, no generated columns, sequential DDL) - Terraform config in `terraform/` to provision a DSQL cluster - `demo/stripe-to-dsql.ts` syncs Stripe data to DSQL with built-in verification queries, auto-reads endpoint from terraform output - Registered `aws_dsql` in engine default connectors Verified: 1000 customers, 720 prices, 702 products synced successfully. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude
1 parent d77349c commit 2c8fd2e

13 files changed

Lines changed: 1179 additions & 11 deletions

File tree

apps/engine/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
"@hono/node-server": "^1",
5151
"@scalar/hono-api-reference": "^0.6",
5252
"@stripe/sync-destination-google-sheets": "workspace:*",
53+
"@stripe/sync-destination-aws-dsql": "workspace:*",
5354
"@stripe/sync-destination-postgres": "workspace:*",
5455
"@stripe/sync-hono-zod-openapi": "workspace:*",
5556
"@stripe/sync-logger": "workspace:*",

apps/engine/src/lib/default-connectors.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sourceStripe from '@stripe/sync-source-stripe'
2+
import destinationAwsDsql from '@stripe/sync-destination-aws-dsql'
23
import destinationPostgres from '@stripe/sync-destination-postgres'
34
import destinationGoogleSheets from '@stripe/sync-destination-google-sheets'
45
import type { RegisteredConnectors } from './resolver.js'
@@ -7,6 +8,7 @@ import type { RegisteredConnectors } from './resolver.js'
78
export const defaultConnectors: RegisteredConnectors = {
89
sources: { stripe: sourceStripe },
910
destinations: {
11+
aws_dsql: destinationAwsDsql,
1012
postgres: destinationPostgres,
1113
google_sheets: destinationGoogleSheets,
1214
},

demo/README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ outbound WebSocket connection.
141141
| `stripe-to-postgres.sh` | Stripe → Postgres via the engine | `STRIPE_API_KEY`, `DATABASE_URL` |
142142
| `stripe-to-google-sheets.sh` | Stripe → Google Sheets via the engine | `STRIPE_API_KEY`, `GOOGLE_*` |
143143
| `stripe-to-postgres-live.sh` | Stripe → Postgres with live WebSocket streaming | `STRIPE_API_KEY`, `DATABASE_URL` |
144+
| `stripe-to-dsql.ts` | Stripe → AWS DSQL via the engine | `STRIPE_API_KEY`, `AWS_*` |
145+
146+
### Stripe → AWS DSQL
147+
148+
Sync Stripe data to [Aurora DSQL](https://aws.amazon.com/rds/aurora/dsql/) (serverless distributed SQL):
149+
150+
```sh
151+
# One-time: provision the DSQL cluster
152+
cd terraform && terraform init && terraform apply && cd ..
153+
154+
# Sync (auto-reads endpoint from terraform output)
155+
node --import tsx demo/stripe-to-dsql.ts
156+
```
157+
158+
Or with explicit env vars: `DSQL_ENDPOINT=<id>.dsql.<region>.on.aws node --import tsx demo/stripe-to-dsql.ts`
144159

145160
### TypeScript API
146161

demo/stripe-to-dsql.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Sync Stripe → AWS DSQL via the engine API (TypeScript).
3+
*
4+
* Usage:
5+
* npx tsx demo/stripe-to-dsql.ts
6+
* bun demo/stripe-to-dsql.ts
7+
*
8+
* Env:
9+
* STRIPE_API_KEY — Stripe secret key
10+
* DSQL_ENDPOINT — DSQL cluster endpoint (e.g. <id>.dsql.us-east-1.on.aws)
11+
* AWS_REGION — AWS region (default: us-east-1)
12+
* AWS_ACCESS_KEY_ID — AWS credentials
13+
* AWS_SECRET_ACCESS_KEY — AWS credentials
14+
*/
15+
import { execSync } from 'node:child_process'
16+
import { createConnectorResolver, createEngine } from '../apps/engine/src/lib/index.js'
17+
import { defaultConnectors } from '../apps/engine/src/lib/default-connectors.js'
18+
import { fileStateStore } from '../apps/engine/src/lib/state-store.js'
19+
import type { PipelineConfig } from '../packages/protocol/src/index.js'
20+
import { buildPoolConfig, pg } from '../packages/destination-aws-dsql/src/index.js'
21+
22+
const stripeApiKey = process.env.STRIPE_API_KEY
23+
const region = process.env.AWS_REGION ?? 'us-east-1'
24+
25+
// Auto-read endpoint from terraform output if not set
26+
const dsqlEndpoint =
27+
process.env.DSQL_ENDPOINT ??
28+
(() => {
29+
try {
30+
return execSync('terraform -chdir=terraform output -raw cluster_endpoint', {
31+
encoding: 'utf8',
32+
}).trim()
33+
} catch {
34+
return undefined
35+
}
36+
})()
37+
38+
if (!stripeApiKey) throw new Error('Set STRIPE_API_KEY')
39+
if (!dsqlEndpoint)
40+
throw new Error('Set DSQL_ENDPOINT or run `terraform -chdir=terraform apply` first')
41+
42+
const pipeline: PipelineConfig = {
43+
source: { type: 'stripe', stripe: { api_key: stripeApiKey, backfill_limit: 10 } },
44+
destination: {
45+
type: 'aws_dsql',
46+
aws_dsql: { endpoint: dsqlEndpoint, region, schema: 'public' },
47+
},
48+
streams: [{ name: 'products' }, { name: 'prices' }, { name: 'customers' }],
49+
}
50+
51+
const resolver = await createConnectorResolver(defaultConnectors, { path: true })
52+
const engine = await createEngine(resolver)
53+
54+
// Create tables
55+
for await (const _msg of engine.pipeline_setup(pipeline)) {
56+
}
57+
58+
// State: file-backed, resumable across runs
59+
const store = fileStateStore('.sync-state-dsql.json')
60+
const state = await store.get()
61+
62+
// Sync
63+
for await (const msg of engine.pipeline_sync(pipeline, { state })) {
64+
if (msg.type === 'source_state') {
65+
if (msg.source_state.state_type === 'global') await store.setGlobal(msg.source_state.data)
66+
else await store.set(msg.source_state.stream, msg.source_state.data)
67+
}
68+
console.log(JSON.stringify(msg))
69+
}
70+
71+
// Verify: query DSQL to show what was synced
72+
console.log('\n--- Verifying data in DSQL ---')
73+
const poolConfig = await buildPoolConfig({
74+
endpoint: dsqlEndpoint,
75+
region,
76+
schema: 'public',
77+
batch_size: 100,
78+
})
79+
const pool = new pg.Pool(poolConfig)
80+
81+
for (const table of ['customers', 'prices', 'products']) {
82+
const { rows } = await pool.query(`SELECT count(*) FROM ${table}`)
83+
console.log(`${table}: ${rows[0].count} rows`)
84+
}
85+
86+
console.log('\nSample rows:')
87+
for (const table of ['customers', 'products']) {
88+
const { rows } = await pool.query(
89+
`SELECT id, substring(_raw_data, 1, 100) as data FROM ${table} LIMIT 2`
90+
)
91+
for (const row of rows) console.log(` [${table}] ${row.id}: ${row.data}...`)
92+
}
93+
94+
await pool.end()
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"name": "@stripe/sync-destination-aws-dsql",
3+
"version": "0.1.0",
4+
"private": false,
5+
"type": "module",
6+
"exports": {
7+
".": {
8+
"bun": "./src/index.ts",
9+
"types": "./dist/index.d.ts",
10+
"import": "./dist/index.js"
11+
}
12+
},
13+
"scripts": {
14+
"build": "tsc",
15+
"test": "vitest"
16+
},
17+
"files": [
18+
"dist",
19+
"src"
20+
],
21+
"dependencies": {
22+
"@aws-sdk/dsql-signer": "^3.1013.0",
23+
"@stripe/sync-protocol": "workspace:*",
24+
"@stripe/sync-util-postgres": "workspace:*",
25+
"pg": "^8.16.3",
26+
"zod": "^4.3.6"
27+
},
28+
"devDependencies": {
29+
"@types/pg": "^8.15.5",
30+
"vitest": "^3.2.4"
31+
}
32+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
import pg from 'pg'
2+
import type { PoolConfig } from 'pg'
3+
import { DsqlSigner } from '@aws-sdk/dsql-signer'
4+
import type { Destination, DestinationInput, LogMessage } from '@stripe/sync-protocol'
5+
import { sql, upsert } from '@stripe/sync-util-postgres'
6+
import defaultSpec from './spec.js'
7+
import type { Config } from './spec.js'
8+
9+
export { configSchema, type Config } from './spec.js'
10+
export { default as pg } from 'pg'
11+
12+
function logMsg(message: string, level: LogMessage['log']['level'] = 'info'): LogMessage {
13+
return { type: 'log', log: { level, message } }
14+
}
15+
16+
/** Generate a fresh DSQL IAM auth token. */
17+
async function generateToken(endpoint: string, region: string): Promise<string> {
18+
const signer = new DsqlSigner({ hostname: endpoint, region })
19+
return signer.getDbConnectAdminAuthToken()
20+
}
21+
22+
/** Build a pg PoolConfig for DSQL with rotating IAM auth tokens. */
23+
export async function buildPoolConfig(config: Config): Promise<PoolConfig> {
24+
const token = await generateToken(config.endpoint, config.region)
25+
return {
26+
host: config.endpoint,
27+
port: 5432,
28+
database: 'postgres',
29+
user: 'admin',
30+
password: token,
31+
ssl: true,
32+
}
33+
}
34+
35+
function createPool(poolConfig: PoolConfig): pg.Pool {
36+
const pool = new pg.Pool(poolConfig)
37+
pool.on('error', (err) => {
38+
console.error('DSQL destination pool error:', err)
39+
})
40+
return pool
41+
}
42+
43+
/**
44+
* Build a CREATE TABLE IF NOT EXISTS statement for DSQL.
45+
*
46+
* DSQL does not support: triggers, generated columns, PL/pgSQL DO blocks, jsonb.
47+
* We store _raw_data as text (JSON-serialized) with id as primary key.
48+
*/
49+
function buildCreateTableSQL(schema: string, tableName: string): string {
50+
const q = (s: string) => `"${s.replace(/"/g, '""')}"`
51+
return sql`
52+
CREATE TABLE IF NOT EXISTS ${q(schema)}.${q(tableName)} (
53+
"id" text NOT NULL,
54+
"_raw_data" text NOT NULL,
55+
"_last_synced_at" timestamptz,
56+
"_updated_at" timestamptz NOT NULL DEFAULT now(),
57+
PRIMARY KEY ("id")
58+
)
59+
`
60+
}
61+
62+
/**
63+
* Upsert records into a DSQL table.
64+
* Explicitly sets _updated_at = now() since DSQL has no trigger support.
65+
*/
66+
async function upsertMany(
67+
pool: pg.Pool,
68+
schema: string,
69+
table: string,
70+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
71+
entries: Record<string, any>[]
72+
): Promise<void> {
73+
if (!entries.length) return
74+
await upsert(
75+
pool,
76+
entries.map((e) => ({
77+
id: String(e.id ?? ''),
78+
_raw_data: JSON.stringify(e),
79+
_updated_at: new Date().toISOString(),
80+
})),
81+
{
82+
schema,
83+
table,
84+
keyColumns: ['id'],
85+
noDiffColumns: ['_updated_at'],
86+
}
87+
)
88+
}
89+
90+
/** Check if an error looks transient. */
91+
function isTransient(err: unknown): boolean {
92+
if (!(err instanceof Error)) return false
93+
const msg = err.message.toLowerCase()
94+
return msg.includes('econnrefused') || msg.includes('timeout') || msg.includes('connection')
95+
}
96+
97+
const destination = {
98+
async *spec() {
99+
yield { type: 'spec' as const, spec: defaultSpec }
100+
},
101+
102+
async *check({ config }) {
103+
const pool = createPool(await buildPoolConfig(config))
104+
try {
105+
await pool.query('SELECT 1')
106+
yield {
107+
type: 'connection_status' as const,
108+
connection_status: { status: 'succeeded' as const },
109+
}
110+
} catch (err) {
111+
yield {
112+
type: 'connection_status' as const,
113+
connection_status: {
114+
status: 'failed' as const,
115+
message: err instanceof Error ? err.message : String(err),
116+
},
117+
}
118+
} finally {
119+
await pool.end()
120+
}
121+
},
122+
123+
async *setup({ config, catalog }) {
124+
const pool = createPool(await buildPoolConfig(config))
125+
try {
126+
yield logMsg(`Creating schema "${config.schema}" (${catalog.streams.length} streams)`)
127+
await pool.query(sql`CREATE SCHEMA IF NOT EXISTS "${config.schema}"`)
128+
// DSQL requires sequential DDL — concurrent CREATE TABLE causes OC001 conflicts
129+
for (const cs of catalog.streams) {
130+
await pool.query(buildCreateTableSQL(config.schema, cs.stream.name))
131+
}
132+
} finally {
133+
await pool.end()
134+
}
135+
},
136+
137+
async *teardown({ config }) {
138+
const PROTECTED = new Set(['public', 'information_schema', 'pg_catalog', 'pg_toast'])
139+
if (PROTECTED.has(config.schema)) {
140+
throw new Error(`Refusing to drop protected schema "${config.schema}"`)
141+
}
142+
const pool = createPool(await buildPoolConfig(config))
143+
try {
144+
await pool.query(sql`DROP SCHEMA IF EXISTS "${config.schema}" CASCADE`)
145+
} finally {
146+
await pool.end()
147+
}
148+
},
149+
150+
async *write({ config }, $stdin) {
151+
const pool = createPool(await buildPoolConfig(config))
152+
const batchSize = config.batch_size
153+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
154+
const streamBuffers = new Map<string, Record<string, any>[]>()
155+
156+
const flushStream = async (streamName: string) => {
157+
const buffer = streamBuffers.get(streamName)
158+
if (!buffer || buffer.length === 0) return
159+
await upsertMany(pool, config.schema, streamName, buffer)
160+
streamBuffers.set(streamName, [])
161+
}
162+
163+
const flushAll = async () => {
164+
for (const streamName of streamBuffers.keys()) {
165+
await flushStream(streamName)
166+
}
167+
}
168+
169+
try {
170+
for await (const msg of $stdin as AsyncIterable<DestinationInput>) {
171+
if (msg.type === 'record') {
172+
const { stream, data } = msg.record
173+
if (!streamBuffers.has(stream)) streamBuffers.set(stream, [])
174+
const buffer = streamBuffers.get(stream)!
175+
buffer.push(data as Record<string, unknown>)
176+
if (buffer.length >= batchSize) await flushStream(stream)
177+
} else if (msg.type === 'source_state') {
178+
if (msg.source_state.state_type !== 'global') {
179+
await flushStream(msg.source_state.stream)
180+
}
181+
yield msg
182+
}
183+
}
184+
await flushAll()
185+
} catch (err: unknown) {
186+
try {
187+
await flushAll()
188+
} catch {
189+
// ignore flush errors during error handling
190+
}
191+
yield {
192+
type: 'trace' as const,
193+
trace: {
194+
trace_type: 'error' as const,
195+
error: {
196+
failure_type: isTransient(err)
197+
? ('transient_error' as const)
198+
: ('system_error' as const),
199+
message: err instanceof Error ? err.message : String(err),
200+
stack_trace: err instanceof Error ? err.stack : undefined,
201+
},
202+
},
203+
}
204+
} finally {
205+
await pool.end()
206+
}
207+
208+
yield logMsg(`DSQL destination: wrote to schema "${config.schema}"`)
209+
},
210+
} satisfies Destination<Config>
211+
212+
export default destination

0 commit comments

Comments
 (0)