Skip to content

Commit 33e8dab

Browse files
committed
make sure it's called _last_syned_at in all places
1 parent e46ce39 commit 33e8dab

4 files changed

Lines changed: 19 additions & 15 deletions

File tree

e2e/stripe-reconcile-cleanup.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ describeWithEnv(
125125

126126
try {
127127
// Backfill-only sync (no websocket, no event polling) — both rows
128-
// land in postgres with `_synced_at ≈ T0`.
128+
// land in postgres with `_last_synced_at ≈ T0`.
129129
await drain(engine.pipeline_sync(pipeline))
130130

131131
const seeded = await pool.query<{ id: string }>(
@@ -139,8 +139,8 @@ describeWithEnv(
139139
await stripe.customers.del(doomed.id)
140140
cleanupIds.delete(doomed.id)
141141

142-
// `_synced_at` is set with millisecond precision by the destination,
143-
// so a small forward skew guarantees `syncRunStartedAt > _synced_at`.
142+
// `_last_synced_at` is set with millisecond precision by the destination,
143+
// so a small forward skew guarantees `syncRunStartedAt > _last_synced_at`.
144144
await new Promise((r) => setTimeout(r, 50))
145145
const syncRunStartedAt = new Date().toISOString()
146146

@@ -268,7 +268,7 @@ describeWithEnv(
268268
const cleanupIds = new Set<string>([survivor.id, doomed.id])
269269

270270
try {
271-
// Backfill seeds both customers with `_synced_at ≈ T0`.
271+
// Backfill seeds both customers with `_last_synced_at ≈ T0`.
272272
await drain(engine.pipeline_sync(pipeline))
273273

274274
const seededRows = await readSheet(sheetsClient, spreadsheetId, STREAM)

packages/destination-google-sheets/__tests__/integration.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ describeWithEnv(
2727
function stripUpdatedAt(rows: unknown[][]): unknown[][] {
2828
const header = rows[0] ?? []
2929
const indexes = new Set(
30-
['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0)
30+
['_updated_at', '_last_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0)
3131
)
3232
if (indexes.size === 0) return rows
3333
return rows.map((row) => row.filter((_, i) => !indexes.has(i)))

packages/destination-google-sheets/src/index.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ import { createMemorySheets } from '../__tests__/memory-sheets.js'
2020
/**
2121
* Strip metadata timestamp columns from a 2D rows array.
2222
*
23-
* The destination stamps `_synced_at`; the source may stamp `_updated_at`.
23+
* The destination stamps `_last_synced_at`; the source may stamp `_updated_at`.
2424
* Most tests only care about source data, so drop both at assertion sites.
2525
*/
2626
function stripUpdatedAt(rows: unknown[][] | undefined): unknown[][] {
2727
if (!rows || rows.length === 0) return rows ?? []
2828
const header = rows[0] as unknown[]
2929
const indexes = new Set(
30-
['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0)
30+
['_updated_at', '_last_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0)
3131
)
3232
if (indexes.size === 0) return rows
3333
return rows.map((row) => row.filter((_, i) => !indexes.has(i)))
@@ -1786,9 +1786,9 @@ describe('_updated_at column (source-owned, passthrough)', () => {
17861786
)
17871787

17881788
const rows = getData(getSpreadsheetIds()[0], 'users')!
1789-
expect(rows[0]).toEqual(['id', 'name', '_synced_at'])
1789+
expect(rows[0]).toEqual(['id', 'name', '_last_synced_at'])
17901790
expect(rows[0]).not.toContain('_updated_at')
1791-
const syncedAtIdx = (rows[0] as string[]).indexOf('_synced_at')
1791+
const syncedAtIdx = (rows[0] as string[]).indexOf('_last_synced_at')
17921792
expect(syncedAtIdx).toBeGreaterThanOrEqual(0)
17931793
expect(Date.parse(String(rows[1][syncedAtIdx]))).not.toBeNaN()
17941794
})
@@ -2062,7 +2062,7 @@ describe('getStaleRecords', () => {
20622062
],
20632063
}
20642064

2065-
it('returns ids whose _synced_at predates syncRunStartedAt', async () => {
2065+
it('returns ids whose _last_synced_at predates syncRunStartedAt', async () => {
20662066
const { sheets, getSpreadsheetIds } = createMemorySheets()
20672067
const dest = createDestination(sheets)
20682068

@@ -2091,7 +2091,7 @@ describe('getStaleRecords', () => {
20912091
expect(batches).toEqual([{ stream: 'customer', ids: ['cus_1', 'cus_2'] }])
20922092
})
20932093

2094-
it('does not return rows whose _synced_at is at or after syncRunStartedAt', async () => {
2094+
it('does not return rows whose _last_synced_at is at or after syncRunStartedAt', async () => {
20952095
const { sheets, getSpreadsheetIds } = createMemorySheets()
20962096
const dest = createDestination(sheets)
20972097

packages/destination-google-sheets/src/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -846,10 +846,14 @@ export function createDestination(
846846
}
847847
}
848848

849-
const headers = await ensureHeadersForRecord(stream, cleanData)
849+
const syncedData: Record<string, unknown> = {
850+
...cleanData,
851+
_last_synced_at: new Date().toISOString(),
852+
}
853+
const headers = await ensureHeadersForRecord(stream, syncedData)
850854
const tsFields = streamTimestampFields.get(stream)
851855
const row = headers.map((header) => {
852-
const value = cleanData[header]
856+
const value = syncedData[header]
853857
if (tsFields?.has(header) && typeof value === 'number') {
854858
return unixToIso(value)
855859
}
@@ -998,11 +1002,11 @@ export function createDestination(
9981002

9991003
const headers = (rows[0] as unknown[]).map((h) => String(h ?? ''))
10001004
const idIdx = headers.indexOf('id')
1001-
const syncedAtIdx = headers.indexOf('_synced_at')
1005+
const syncedAtIdx = headers.indexOf('_last_synced_at')
10021006
if (idIdx < 0 || syncedAtIdx < 0) {
10031007
log.warn(
10041008
{ stream: streamName, headers },
1005-
'getStaleRecords: missing id or _synced_at column — skipping stream'
1009+
'getStaleRecords: missing id or _last_synced_at column — skipping stream'
10061010
)
10071011
continue
10081012
}

0 commit comments

Comments
 (0)