Skip to content

Commit f31d76b

Browse files
tonyxiaoclaudecodex
authored
feat: add pipeline_sync_batch, takeThroughStates, sync_batch CLI improvements, and observability (#349)
* Add source_discover script * feat: log event IDs at all webhook ingress points Adds structured info-level logs for every Stripe event received across all ingress paths (HTTP webhook, WebSocket, service endpoint) for observability. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude * fix(progress): increment per-stream state_count on source_state messages state_count was never incremented — the old code only created a new stream entry when missing, but createInitialProgress pre-populates all streams so the branch never ran. Bug existed since the reducer was introduced in d6ee08e. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude * Add IxJS iterator behavior test Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com> * feat: add pipeline_sync_batch, --raw flag, and derived total counts - Add pipeline_sync_batch to Engine interface, implementation, remote engine, engine API (/pipeline_sync_batch), and service API (/pipelines/{id}/sync_batch) with CLI subcommand. Returns EofPayload as JSON instead of streaming NDJSON. - Add --raw flag to pipelines sync CLI to dump raw NDJSON to stdout. - Add total_record_count and total_state_count to ProgressPayload.derived. - Fix states_per_second to include per-stream state counts. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude * Add takeThroughStates helper Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com> * Add limit callback to takeThroughStates Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com> * Refactor pipeline_sync_batch around state limits Committed-By-Agent: codex Co-authored-by: codex <noreply@openai.com> * feat(cli): add --loop, --raw, --state-limit to sync_batch command - --loop: repeat sync_batch until has_more is false - --raw: output JSONL to stdout instead of progress to stderr - --state-limit: stop after N source_state messages per request - --time-limit: stop after N seconds per request - Progress rendered via formatProgress after each iteration - JSONL output (one line per eof) for piping into jq Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude * feat(cli): make --loop a count (default 1, 0 = unlimited) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude * feat(source-stripe): log Stripe-Rate-Limited-Reason on 429 retries Extracts the stripe-rate-limited-reason header (global-rate, global-concurrency, endpoint-rate, etc.) from 429 responses and includes it in retry log messages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Committed-By-Agent: claude * fix: add total_record_count/total_state_count to derived progress in tests and update OpenAPI - Add missing required `total_record_count` and `total_state_count` fields to `derived` objects in test fixtures (engine, service, e2e, state-reducer tests) - Regenerate engine and service OpenAPI specs (adds pipeline_sync_batch to table) - Format-only refactors (no logic changes) in protocol.ts, helpers.ts, reducer.ts, app.ts, src-websocket.ts Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Committed-By-Agent: claude --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: codex <noreply@openai.com>
1 parent d77349c commit f31d76b

37 files changed

Lines changed: 1582 additions & 61 deletions

apps/engine/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
"googleapis": "^148.0.0",
6363
"hono": "^4",
6464
"ink": "^7.0.1",
65+
"ix": "^7.0.0",
6566
"openapi-fetch": "^0.17.0",
6667
"pg": "^8.16.3",
6768
"react": "19.2.5",

apps/engine/src/__generated__/openapi.d.ts

Lines changed: 73 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/engine/src/__generated__/openapi.json

Lines changed: 89 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/engine/src/__tests__/openapi.test.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,19 @@ describe('Engine OpenAPI spec', () => {
9898
// SyncState should still be a named component
9999
expect(spec.components.schemas).toHaveProperty('SyncState')
100100
})
101+
102+
it('pipeline_sync_batch uses the narrowed batch request schema', async () => {
103+
const spec = await getSpec()
104+
const op = (spec.paths as Record<string, any>)['/pipeline_sync_batch']?.post
105+
const bodySchema = op?.requestBody?.content?.['application/json']?.schema
106+
107+
expect(bodySchema?.properties?.state).toMatchObject({
108+
$ref: '#/components/schemas/SyncState',
109+
})
110+
expect(bodySchema?.properties).toHaveProperty('run_id')
111+
expect(bodySchema?.properties).toHaveProperty('state_limit')
112+
expect(bodySchema?.properties).not.toHaveProperty('stdin')
113+
expect(bodySchema?.properties).not.toHaveProperty('time_limit')
114+
expect(bodySchema?.properties).not.toHaveProperty('soft_time_limit')
115+
})
101116
})

apps/engine/src/api/app.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
TeardownOutput as TeardownOutputSchema,
2323
SyncState,
2424
emptySyncState,
25+
EofPayload as EofPayloadSchema,
2526
} from '@stripe/sync-protocol'
2627

2728
// Raw $refs for NDJSON content schemas — avoids zod-openapi generating *Output
@@ -162,6 +163,22 @@ export async function createApp(resolver: ConnectorResolver) {
162163
}),
163164
})
164165

166+
const syncBatchRequestBody = z.object({
167+
pipeline: TypedPipelineConfig,
168+
run_id: z.string().optional().meta({
169+
description: 'Optional sync run identifier used to track bounded sync progress.',
170+
example: 'run_demo',
171+
}),
172+
state_limit: z.number().int().positive().optional().meta({
173+
description: 'Stop after yielding N source_state messages, inclusive.',
174+
example: 100,
175+
}),
176+
state: SyncState.optional().meta({
177+
description:
178+
'SyncState ({ source, destination, sync_run }). Falls back to empty state if invalid.',
179+
}),
180+
})
181+
165182
const writeRequestBody = z.object({
166183
pipeline: TypedPipelineConfig,
167184
stdin: z.array(MessageSchema).meta({
@@ -502,6 +519,43 @@ export async function createApp(resolver: ConnectorResolver) {
502519
})
503520
})
504521

522+
const pipelineSyncBatchRoute = createRoute({
523+
operationId: 'pipeline_sync_batch',
524+
method: 'post',
525+
path: '/pipeline_sync_batch',
526+
tags: ['Stateless Sync API'],
527+
summary: 'Run sync pipeline (batch, returns JSON)',
528+
description:
529+
'Runs the full read → write pipeline and returns the final EofPayload as a single JSON response.',
530+
requestBody: {
531+
required: true,
532+
content: { 'application/json': { schema: syncBatchRequestBody } },
533+
},
534+
responses: {
535+
200: {
536+
description: 'Sync result',
537+
content: { 'application/json': { schema: EofPayloadSchema } },
538+
},
539+
400: errorResponse,
540+
},
541+
})
542+
app.openapi(pipelineSyncBatchRoute, async (c) => {
543+
const { pipeline, state, run_id, state_limit } = c.req.valid('json')
544+
545+
const context = { path: '/pipeline_sync_batch', ...syncRequestContext(pipeline) }
546+
const startedAt = Date.now()
547+
log.info(context, 'Engine API /pipeline_sync_batch started')
548+
549+
const result = await engine.pipeline_sync_batch(pipeline, { state, run_id, state_limit })
550+
551+
log.info(
552+
{ ...context, durationMs: Date.now() - startedAt, status: result.status },
553+
'Engine API /pipeline_sync_batch completed'
554+
)
555+
556+
return c.json(result, 200)
557+
})
558+
505559
app.openapi(
506560
createRoute({
507561
operationId: 'meta_sources_list',

0 commit comments

Comments
 (0)