Skip to content

Commit a0f9db7

Browse files
committed
sheet destination
1 parent 16428bc commit a0f9db7

4 files changed

Lines changed: 431 additions & 24 deletions

File tree

apps/service/src/temporal/activities/reconcile-cleanup.ts

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,27 @@ import { createStripeSource, type Config as StripeSourceConfig } from '@stripe/s
33
import destinationPostgres, {
44
type Config as PostgresDestConfig,
55
} from '@stripe/sync-destination-postgres'
6+
import destinationSheets, {
7+
type Config as SheetsDestConfig,
8+
} from '@stripe/sync-destination-google-sheets'
9+
import type { Destination } from '@stripe/sync-protocol'
610
import type { ActivitiesContext } from './_shared.js'
711
import { log } from '../../logger.js'
812

13+
type SupportedDestType = 'postgres' | 'google_sheets'
14+
15+
function resolveDestination(
16+
type: string
17+
): { destination: Destination<Record<string, unknown>>; type: SupportedDestType } | undefined {
18+
if (type === 'postgres') {
19+
return { destination: destinationPostgres as Destination<Record<string, unknown>>, type }
20+
}
21+
if (type === 'google_sheets') {
22+
return { destination: destinationSheets as Destination<Record<string, unknown>>, type }
23+
}
24+
return undefined
25+
}
26+
927
export function createReconcileCleanupActivity(context: ActivitiesContext) {
1028
return async function reconcileCleanup(
1129
pipelineId: string,
@@ -14,15 +32,22 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) {
1432
const pipeline = await context.pipelineStore.get(pipelineId)
1533
const { source, destination, streams } = pipeline
1634

17-
if (destination.type !== 'postgres' || source.type !== 'stripe') {
18-
// Only stripe→postgres is supported today.
35+
if (source.type !== 'stripe') {
36+
// Only stripe sources support verifyRecords today.
37+
return
38+
}
39+
const resolved = resolveDestination(destination.type)
40+
if (!resolved) {
41+
// Destination doesn't implement getStaleRecords yet.
1942
return
2043
}
2144

2245
// Configs were validated against connector schemas at pipeline create time,
23-
// so the runtime shape matches the connector's strict Config type.
46+
// so the runtime shape matches each connector's strict Config type.
2447
const sourceConfig = source[source.type] as unknown as StripeSourceConfig
25-
const destConfig = destination[destination.type] as unknown as PostgresDestConfig
48+
const destConfig = destination[destination.type] as unknown as
49+
| PostgresDestConfig
50+
| SheetsDestConfig
2651

2752
const catalog = {
2853
streams:
@@ -39,20 +64,24 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) {
3964
const filter = sourceConfig.account_id ? { _account_id: sourceConfig.account_id } : undefined
4065
if (!filter) {
4166
log.warn(
42-
{ pipelineId },
67+
{ pipelineId, destinationType: resolved.type },
4368
'reconcile_cleanup: source has no account_id — running unscoped (unsafe in multi-tenant schemas)'
4469
)
4570
}
4671

4772
const stripeSource = createStripeSource()
73+
const dest = resolved.destination
74+
// Guaranteed by `resolveDestination`'s whitelist: every type that resolves
75+
// here is a destination that ships a `getStaleRecords` implementation.
76+
const getStaleRecords = dest.getStaleRecords!
4877

4978
try {
50-
heartbeat({ phase: 'starting', pipelineId })
79+
heartbeat({ phase: 'starting', pipelineId, destinationType: resolved.type })
5180

5281
// Wrap the destination's batches so we heartbeat per stream.
5382
async function* heartbeatedStaleRecords() {
54-
const inner = destinationPostgres.getStaleRecords!({
55-
config: destConfig,
83+
const inner = getStaleRecords({
84+
config: destConfig as Record<string, unknown>,
5685
catalog,
5786
syncRunStartedAt,
5887
filter,
@@ -68,8 +97,8 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) {
6897
heartbeatedStaleRecords()
6998
)
7099

71-
const writeOutput = destinationPostgres.write(
72-
{ config: destConfig, catalog },
100+
const writeOutput = dest.write(
101+
{ config: destConfig as Record<string, unknown>, catalog },
73102
verificationMessages
74103
)
75104

@@ -83,7 +112,10 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) {
83112
}
84113
}
85114

86-
log.info({ pipelineId, deleteCount, syncRunStartedAt }, 'reconcile_cleanup: completed')
115+
log.info(
116+
{ pipelineId, destinationType: resolved.type, deleteCount, syncRunStartedAt },
117+
'reconcile_cleanup: completed'
118+
)
87119
} catch (err) {
88120
// Cleanup is best-effort — log and swallow so the workflow's reconcile
89121
// loop keeps running on the next interval.

e2e/stripe-reconcile-cleanup.test.ts

Lines changed: 141 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
/**
22
* Verifies the Temporal `reconcileCleanup` activity tombstones rows for
3-
* records that were hard-deleted in Stripe without the corresponding
4-
* `*.deleted` event being processed — the "missed delete" path that
5-
* complements stripe-delete.test.ts (the event-driven path).
6-
*
7-
* Seeds destination rows via in-process engine, then runs the production
8-
* activity through `MockActivityEnvironment` so the composition
9-
* (`pg.getStaleRecords` → `stripe.verifyRecords` → `pg.write`) is exercised
10-
* end-to-end with a Temporal Activity Context active (heartbeats become no-ops).
3+
* records hard-deleted in Stripe without a `*.deleted` event — the "missed
4+
* delete" path complementing stripe-delete.test.ts. Two suites (postgres,
5+
* google_sheets) run the production activity via `MockActivityEnvironment`.
116
*/
127
import pg from 'pg'
138
import Stripe from 'stripe'
9+
import { google } from 'googleapis'
1410
import { afterAll, beforeAll, expect, it } from 'vitest'
1511
import { MockActivityEnvironment } from '@temporalio/testing'
1612
import source from '@stripe/sync-source-stripe'
1713
import destinationPostgres from '@stripe/sync-destination-postgres'
14+
import destinationSheets, { readSheet } from '@stripe/sync-destination-google-sheets'
1815
import { createEngine } from '@stripe/sync-engine'
1916
import type { ConnectorResolver } from '@stripe/sync-engine'
2017
import { createActivities } from '@stripe/sync-service'
@@ -129,9 +126,7 @@ describeWithEnv(
129126
try {
130127
// Backfill-only sync (no websocket, no event polling) — both rows
131128
// land in postgres with `_synced_at ≈ T0`.
132-
for await (const _msg of engine.pipeline_sync(pipeline)) {
133-
void _msg
134-
}
129+
await drain(engine.pipeline_sync(pipeline))
135130

136131
const seeded = await pool.query<{ id: string }>(
137132
`SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`,
@@ -177,3 +172,138 @@ describeWithEnv(
177172
}, 180_000)
178173
}
179174
)
175+
176+
// MARK: - Google Sheets
177+
178+
describeWithEnv(
179+
'temporal reconcile-cleanup activity → google sheets (missed delete)',
180+
['STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN'],
181+
({ STRIPE_API_KEY, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_REFRESH_TOKEN }) => {
182+
const PIPELINE_ID = `pipe_recon_sheets_${ts}`
183+
let stripe: Stripe
184+
let sheetsClient: ReturnType<typeof google.sheets>
185+
let driveClient: ReturnType<typeof google.drive>
186+
let spreadsheetId = process.env.GOOGLE_SPREADSHEET_ID ?? ''
187+
let createdSpreadsheetHere = false
188+
189+
const sourceConfig = { api_key: STRIPE_API_KEY, backfill_limit: BACKFILL_LIMIT }
190+
191+
const resolver: ConnectorResolver = {
192+
resolveSource: async (name) => {
193+
if (name !== 'stripe') throw new Error(`Unknown source: ${name}`)
194+
return source
195+
},
196+
resolveDestination: async (name) => {
197+
if (name !== 'google_sheets') throw new Error(`Unknown destination: ${name}`)
198+
return destinationSheets
199+
},
200+
sources: () => new Map(),
201+
destinations: () => new Map(),
202+
}
203+
204+
function makePipeline() {
205+
return {
206+
source: { type: 'stripe', stripe: sourceConfig },
207+
destination: {
208+
type: 'google_sheets',
209+
google_sheets: {
210+
client_id: GOOGLE_CLIENT_ID,
211+
client_secret: GOOGLE_CLIENT_SECRET,
212+
refresh_token: GOOGLE_REFRESH_TOKEN,
213+
...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}),
214+
spreadsheet_title: `e2e-recon-sheets-${ts}`,
215+
batch_size: 50,
216+
},
217+
},
218+
streams: [{ name: STREAM }],
219+
}
220+
}
221+
222+
beforeAll(async () => {
223+
stripe = new Stripe(STRIPE_API_KEY)
224+
const auth = new google.auth.OAuth2(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET)
225+
auth.setCredentials({ refresh_token: GOOGLE_REFRESH_TOKEN })
226+
sheetsClient = google.sheets({ version: 'v4', auth })
227+
driveClient = google.drive({ version: 'v3', auth })
228+
})
229+
230+
afterAll(async () => {
231+
if (createdSpreadsheetHere && spreadsheetId && !process.env.KEEP_TEST_DATA) {
232+
try {
233+
await driveClient.files.delete({ fileId: spreadsheetId })
234+
} catch {}
235+
}
236+
})
237+
238+
it('tombstones customers deleted in stripe without a delete event', async () => {
239+
const engine = await createEngine(resolver)
240+
241+
// pipeline_setup creates the spreadsheet if needed and emits the new
242+
// id via destination_config — capture so the second pipeline run reuses it.
243+
for await (const m of engine.pipeline_setup(makePipeline())) {
244+
if (
245+
m.type === 'control' &&
246+
m.control.control_type === 'destination_config' &&
247+
typeof m.control.destination_config.spreadsheet_id === 'string' &&
248+
m.control.destination_config.spreadsheet_id !== spreadsheetId
249+
) {
250+
spreadsheetId = m.control.destination_config.spreadsheet_id
251+
createdSpreadsheetHere = true
252+
}
253+
}
254+
expect(spreadsheetId, 'no spreadsheet_id available (env or destination)').toBeTruthy()
255+
console.log(`\n Sheets: https://docs.google.com/spreadsheets/d/${spreadsheetId}/`)
256+
console.log(` Pipeline: ${PIPELINE_ID}`)
257+
258+
const pipeline = makePipeline()
259+
const pipelineStore = memoryPipelineStore()
260+
await pipelineStore.set(PIPELINE_ID, { id: PIPELINE_ID, ...pipeline } as Pipeline)
261+
262+
const survivor = await stripe.customers.create({
263+
name: `e2e-recon-sheets-survivor-${Date.now()}`,
264+
})
265+
const doomed = await stripe.customers.create({
266+
name: `e2e-recon-sheets-doomed-${Date.now()}`,
267+
})
268+
const cleanupIds = new Set<string>([survivor.id, doomed.id])
269+
270+
try {
271+
// Backfill seeds both customers with `_synced_at ≈ T0`.
272+
await drain(engine.pipeline_sync(pipeline))
273+
274+
const seededRows = await readSheet(sheetsClient, spreadsheetId, STREAM)
275+
const seededHeader = (seededRows[0] ?? []) as string[]
276+
const idIdx = seededHeader.indexOf('id')
277+
expect(idIdx, 'id column missing in sheet header').toBeGreaterThanOrEqual(0)
278+
const seededIds = new Set(seededRows.slice(1).map((row) => String(row[idIdx] ?? '')))
279+
expect(seededIds.has(survivor.id)).toBe(true)
280+
expect(seededIds.has(doomed.id)).toBe(true)
281+
282+
await stripe.customers.del(doomed.id)
283+
cleanupIds.delete(doomed.id)
284+
285+
await new Promise((r) => setTimeout(r, 50))
286+
const syncRunStartedAt = new Date().toISOString()
287+
288+
const activities = createActivities({ engineUrl: 'http://unused', pipelineStore })
289+
const env = new MockActivityEnvironment()
290+
await env.run(activities.reconcileCleanup, PIPELINE_ID, syncRunStartedAt)
291+
292+
const afterRows = await readSheet(sheetsClient, spreadsheetId, STREAM)
293+
const afterIds = new Set(afterRows.slice(1).map((row) => String(row[idIdx] ?? '')))
294+
expect(afterIds.has(survivor.id), `survivor ${survivor.id} was tombstoned`).toBe(true)
295+
expect(afterIds.has(doomed.id), `doomed ${doomed.id} was not tombstoned`).toBe(false)
296+
console.log(` Survived: ${survivor.id}`)
297+
console.log(` Tombstoned: ${doomed.id}`)
298+
} finally {
299+
if (!process.env.KEEP_TEST_DATA) {
300+
for (const id of cleanupIds) {
301+
try {
302+
await stripe.customers.del(id)
303+
} catch {}
304+
}
305+
}
306+
}
307+
}, 240_000)
308+
}
309+
)

0 commit comments

Comments
 (0)