diff --git a/e2e/connector-loading.test.sh b/e2e/connector-loading.test.sh index 88b0c9286..da5c44816 100755 --- a/e2e/connector-loading.test.sh +++ b/e2e/connector-loading.test.sh @@ -24,6 +24,7 @@ TMPDIR_BASE=$(mktemp -d) cleanup() { rm -rf "$TMPDIR_BASE" rm -f "$REPO_ROOT"/stripe-sync-protocol-*.tgz + rm -f "$REPO_ROOT"/stripe-sync-openapi-*.tgz rm -f "$REPO_ROOT"/stripe-sync-engine-*.tgz rm -f "$REPO_ROOT"/stripe-sync-source-stripe-*.tgz rm -f "$REPO_ROOT"/stripe-sync-destination-postgres-*.tgz @@ -44,6 +45,7 @@ echo "" echo "--- Step 1: Packing packages ---" PROTOCOL_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-protocol pack 2>/dev/null | tail -1) +OPENAPI_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-openapi pack 2>/dev/null | tail -1) ENGINE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-engine pack 2>/dev/null | tail -1) SOURCE_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-source-stripe pack 2>/dev/null | tail -1) DEST_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-destination-postgres pack 2>/dev/null | tail -1) @@ -52,7 +54,7 @@ STATE_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-state-postgres pack UTIL_PG_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-util-postgres pack 2>/dev/null | tail -1) TSCLI_TGZ=$(cd "$REPO_ROOT" && pnpm --filter @stripe/sync-ts-cli pack 2>/dev/null | tail -1) -for tgz in "$PROTOCOL_TGZ" "$ENGINE_TGZ" "$SOURCE_TGZ" "$DEST_TGZ" "$DEST_SHEETS_TGZ" \ +for tgz in "$PROTOCOL_TGZ" "$OPENAPI_TGZ" "$ENGINE_TGZ" "$SOURCE_TGZ" "$DEST_TGZ" "$DEST_SHEETS_TGZ" \ "$STATE_PG_TGZ" "$UTIL_PG_TGZ" "$TSCLI_TGZ"; do if [ ! -f "$tgz" ]; then echo "FAIL: tarball not found: $tgz" @@ -84,6 +86,7 @@ cat > package.json < package.json <&1 | tail -5 echo "" @@ -189,14 +192,14 @@ echo "" # --------------------------------------------------------------------------- echo "--- Step 8: unknown connector name → not found ---" UNKNOWN_PARAMS='{"source_name":"nonexistent-xyz","source_config":{},"destination_name":"nonexistent-xyz","destination_config":{},"streams":[{"name":"x"}]}' -STEP8_OUTPUT=$(npx sync-engine check \ +unknown_output=$(npx sync-engine check \ --x-sync-params "$UNKNOWN_PARAMS" \ 2>&1 || true) -if echo "$STEP8_OUTPUT" | grep -qi "not found"; then +if echo "$unknown_output" | grep -qi "not found"; then echo " PASS: unknown connector correctly reports 'not found'" else echo " FAIL: unknown connector did not report 'not found'" - echo " Output: $STEP8_OUTPUT" + echo " Output: $unknown_output" exit 1 fi echo "" diff --git a/packages/openapi/__tests__/fixtures/minimalSpec.ts b/packages/openapi/__tests__/fixtures/minimalSpec.ts new file mode 100644 index 000000000..6a8ab5ff3 --- /dev/null +++ b/packages/openapi/__tests__/fixtures/minimalSpec.ts @@ -0,0 +1,196 @@ +import type { OpenApiSpec, OpenApiPathItem } from '../../types' + +function listPath( + schemaRef: string, + opts: { supportsCreatedFilter?: boolean; supportsLimit?: boolean } = {} +): OpenApiPathItem { + const parameters: { name: string; in: string }[] = [] + if (opts.supportsCreatedFilter) { + parameters.push({ name: 'created', in: 'query' }) + } + if (opts.supportsLimit !== false) { + parameters.push({ name: 'limit', in: 'query' }) + } + return { + get: { + parameters, + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + object: { type: 'string', enum: ['list'] }, + data: { type: 'array', items: { $ref: `#/components/schemas/${schemaRef}` } }, + has_more: { type: 'boolean' }, + url: { type: 'string' }, + }, + }, + }, + }, + }, + }, + }, + } +} + +function v2ListPath(schemaRef: string): OpenApiPathItem { + return { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + data: { type: 'array', items: { $ref: `#/components/schemas/${schemaRef}` } }, + next_page_url: { type: 'string', nullable: true }, + previous_page_url: { type: 'string', nullable: true }, + }, + }, + }, + }, + }, + }, + }, + } +} + +export const minimalStripeOpenApiSpec: OpenApiSpec = { + openapi: '3.0.0', + info: { + version: '2020-08-27', + }, + paths: { + '/v1/customers': listPath('customer', { supportsCreatedFilter: true }), + '/v1/plans': listPath('plan', { supportsCreatedFilter: true }), + '/v1/prices': listPath('price', { supportsCreatedFilter: true }), + '/v1/products': listPath('product', { supportsCreatedFilter: true }), + '/v1/subscription_items': listPath('subscription_item'), + '/v1/checkout/sessions': listPath('checkout_session', { supportsCreatedFilter: true }), + '/v1/radar/early_fraud_warnings': listPath('early_fraud_warning', { + supportsCreatedFilter: true, + }), + '/v1/entitlements/active_entitlements': listPath('active_entitlement'), + '/v1/entitlements/features': listPath('entitlements_feature'), + '/v2/core/accounts': v2ListPath('v2_core_account'), + '/v2/core/event_destinations': v2ListPath('v2_core_event_destination'), + }, + components: { + schemas: { + customer: { + 'x-resourceId': 'customer', + oneOf: [ + { + type: 'object', + properties: { + id: { type: 'string' }, + object: { type: 'string' }, + created: { type: 'integer' }, + }, + }, + { + type: 'object', + properties: { + id: { type: 'string' }, + deleted: { type: 'boolean' }, + }, + }, + ], + }, + plan: { + 'x-resourceId': 'plan', + type: 'object', + properties: { + id: { type: 'string' }, + active: { type: 'boolean' }, + amount: { type: 'integer' }, + }, + }, + price: { + 'x-resourceId': 'price', + type: 'object', + properties: { + id: { type: 'string' }, + product: { type: 'string' }, + unit_amount: { type: 'integer' }, + metadata: { type: 'object', additionalProperties: true }, + }, + }, + product: { + 'x-resourceId': 'product', + type: 'object', + properties: { + id: { type: 'string' }, + name: { type: 'string' }, + }, + }, + subscription_item: { + 'x-resourceId': 'subscription_item', + type: 'object', + properties: { + id: { type: 'string' }, + deleted: { type: 'boolean' }, + subscription: { type: 'string' }, + quantity: { type: 'integer' }, + }, + }, + checkout_session: { + 'x-resourceId': 'checkout.session', + type: 'object', + properties: { + id: { type: 'string' }, + amount_total: { type: 'integer' }, + customer: { type: 'string', nullable: true }, + }, + }, + early_fraud_warning: { + 'x-resourceId': 'radar.early_fraud_warning', + type: 'object', + properties: { + id: { type: 'string' }, + charge: { type: 'string' }, + }, + }, + active_entitlement: { + 'x-resourceId': 'entitlements.active_entitlement', + type: 'object', + properties: { + id: { type: 'string' }, + customer: { type: 'string' }, + feature: { type: 'string' }, + }, + }, + entitlements_feature: { + 'x-resourceId': 'entitlements.feature', + type: 'object', + properties: { + id: { type: 'string' }, + lookup_key: { type: 'string' }, + }, + }, + v2_core_account: { + 'x-resourceId': 'v2.core.account', + type: 'object', + properties: { + id: { type: 'string' }, + display_name: { type: 'string' }, + contact_email: { type: 'string', nullable: true }, + created: { type: 'string', format: 'date-time' }, + }, + }, + v2_core_event_destination: { + 'x-resourceId': 'v2.core.event_destination', + type: 'object', + properties: { + id: { type: 'string' }, + name: { type: 'string' }, + enabled_events: { type: 'array', items: { type: 'string' } }, + livemode: { type: 'boolean' }, + }, + }, + }, + }, +} diff --git a/packages/openapi/__tests__/listFnResolver.test.ts b/packages/openapi/__tests__/listFnResolver.test.ts new file mode 100644 index 000000000..dd3cd6361 --- /dev/null +++ b/packages/openapi/__tests__/listFnResolver.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it } from 'vitest' +import { discoverListEndpoints } from '../listFnResolver' +import { minimalStripeOpenApiSpec } from './fixtures/minimalSpec' + +describe('discoverListEndpoints', () => { + it('maps table names to their API paths', () => { + const endpoints = discoverListEndpoints(minimalStripeOpenApiSpec) + + expect(endpoints.get('customers')).toEqual({ + tableName: 'customers', + resourceId: 'customer', + apiPath: '/v1/customers', + supportsCreatedFilter: true, + supportsLimit: true, + }) + expect(endpoints.get('checkout_sessions')).toEqual({ + tableName: 'checkout_sessions', + resourceId: 'checkout.session', + apiPath: '/v1/checkout/sessions', + supportsCreatedFilter: true, + supportsLimit: true, + }) + expect(endpoints.get('early_fraud_warnings')).toEqual({ + tableName: 'early_fraud_warnings', + resourceId: 'radar.early_fraud_warning', + apiPath: '/v1/radar/early_fraud_warnings', + supportsCreatedFilter: true, + supportsLimit: true, + }) + }) + + it('discovers v2 list endpoints using next_page_url format', () => { + const endpoints = discoverListEndpoints(minimalStripeOpenApiSpec) + + expect(endpoints.get('v2_core_accounts')).toEqual({ + tableName: 'v2_core_accounts', + resourceId: 'v2.core.account', + apiPath: '/v2/core/accounts', + supportsCreatedFilter: false, + supportsLimit: false, + }) + expect(endpoints.get('v2_core_event_destinations')).toEqual({ + tableName: 'v2_core_event_destinations', + resourceId: 'v2.core.event_destination', + apiPath: '/v2/core/event_destinations', + supportsCreatedFilter: false, + supportsLimit: false, + }) + }) + + it('skips paths with path parameters', () => { + const spec = { + ...minimalStripeOpenApiSpec, + paths: { + ...minimalStripeOpenApiSpec.paths, + '/v1/customers/{customer}/sources': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object' as const, + properties: { + object: { type: 'string' as const, enum: ['list'] }, + data: { + type: 'array' as const, + items: { $ref: '#/components/schemas/customer' }, + }, + has_more: { type: 'boolean' as const }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + const endpoints = discoverListEndpoints(spec) + const paths = Array.from(endpoints.values()).map((e) => e.apiPath) + expect(paths).not.toContain('/v1/customers/{customer}/sources') + }) + + it('returns empty map when spec has no paths', () => { + const endpoints = discoverListEndpoints({ openapi: '3.0.0' }) + expect(endpoints.size).toBe(0) + }) +}) diff --git a/packages/openapi/__tests__/specFetchHelper.test.ts b/packages/openapi/__tests__/specFetchHelper.test.ts new file mode 100644 index 000000000..109b10852 --- /dev/null +++ b/packages/openapi/__tests__/specFetchHelper.test.ts @@ -0,0 +1,106 @@ +import fs from 'node:fs/promises' +import os from 'node:os' +import path from 'node:path' +import { afterEach, describe, expect, it, vi } from 'vitest' +import { resolveOpenApiSpec } from '../specFetchHelper' +import { minimalStripeOpenApiSpec } from './fixtures/minimalSpec' + +async function createTempDir(prefix: string): Promise { + return fs.mkdtemp(path.join(os.tmpdir(), `${prefix}-`)) +} + +describe('resolveOpenApiSpec', () => { + afterEach(() => { + vi.unstubAllGlobals() + vi.restoreAllMocks() + }) + + it('prefers explicit local spec path over cache and network', async () => { + const tempDir = await createTempDir('openapi-explicit') + const specPath = path.join(tempDir, 'spec3.json') + await fs.writeFile(specPath, JSON.stringify(minimalStripeOpenApiSpec), 'utf8') + const fetchMock = vi.fn() + vi.stubGlobal('fetch', fetchMock) + + const result = await resolveOpenApiSpec({ + apiVersion: '2020-08-27', + openApiSpecPath: specPath, + cacheDir: tempDir, + }) + + expect(result.source).toBe('explicit_path') + expect(fetchMock).not.toHaveBeenCalled() + await fs.rm(tempDir, { recursive: true, force: true }) + }) + + it('uses cache by api version when available', async () => { + const tempDir = await createTempDir('openapi-cache') + const cachePath = path.join(tempDir, '2020-08-27.spec3.sdk.json') + await fs.writeFile(cachePath, JSON.stringify(minimalStripeOpenApiSpec), 'utf8') + const fetchMock = vi.fn() + vi.stubGlobal('fetch', fetchMock) + + const result = await resolveOpenApiSpec({ + apiVersion: '2020-08-27', + cacheDir: tempDir, + }) + + expect(result.source).toBe('cache') + expect(result.cachePath).toBe(cachePath) + expect(fetchMock).not.toHaveBeenCalled() + await fs.rm(tempDir, { recursive: true, force: true }) + }) + + it('fetches from GitHub when cache misses and persists cache', async () => { + const tempDir = await createTempDir('openapi-fetch') + const fetchMock = vi.fn(async (input: URL | string) => { + const url = String(input) + if (url.includes('/commits')) { + return new Response(JSON.stringify([{ sha: 'abc123def456' }]), { status: 200 }) + } + return new Response(JSON.stringify(minimalStripeOpenApiSpec), { status: 200 }) + }) + vi.stubGlobal('fetch', fetchMock) + + const result = await resolveOpenApiSpec({ + apiVersion: '2020-08-27', + cacheDir: tempDir, + }) + + expect(result.source).toBe('github') + expect(result.commitSha).toBe('abc123def456') + + const cached = await fs.readFile(path.join(tempDir, '2020-08-27.spec3.sdk.json'), 'utf8') + expect(JSON.parse(cached)).toMatchObject({ openapi: '3.0.0' }) + expect(fetchMock).toHaveBeenCalledTimes(2) + await fs.rm(tempDir, { recursive: true, force: true }) + }) + + it('throws for malformed explicit spec files', async () => { + const tempDir = await createTempDir('openapi-malformed') + const specPath = path.join(tempDir, 'spec3.json') + await fs.writeFile(specPath, JSON.stringify({ openapi: '3.0.0' }), 'utf8') + + await expect( + resolveOpenApiSpec({ + apiVersion: '2020-08-27', + openApiSpecPath: specPath, + }) + ).rejects.toThrow(/components|schemas/i) + await fs.rm(tempDir, { recursive: true, force: true }) + }) + + it('fails fast when GitHub resolution fails and no explicit spec path is set', async () => { + const tempDir = await createTempDir('openapi-fail-fast') + const fetchMock = vi.fn(async () => new Response('boom', { status: 500 })) + vi.stubGlobal('fetch', fetchMock) + + await expect( + resolveOpenApiSpec({ + apiVersion: '2020-08-27', + cacheDir: tempDir, + }) + ).rejects.toThrow(/Failed to resolve Stripe OpenAPI commit/) + await fs.rm(tempDir, { recursive: true, force: true }) + }) +}) diff --git a/packages/openapi/__tests__/specParser.test.ts b/packages/openapi/__tests__/specParser.test.ts new file mode 100644 index 000000000..e9cab1b47 --- /dev/null +++ b/packages/openapi/__tests__/specParser.test.ts @@ -0,0 +1,365 @@ +import { describe, expect, it } from 'vitest' +import { SpecParser } from '../specParser' +import { minimalStripeOpenApiSpec } from './fixtures/minimalSpec' +import type { OpenApiSpec } from '../../types' + +describe('SpecParser', () => { + it('parses aliased resources into deterministic tables and column types', () => { + const parser = new SpecParser() + const parsed = parser.parse(minimalStripeOpenApiSpec, { + allowedTables: ['checkout_sessions', 'customers', 'early_fraud_warnings'], + }) + + expect(parsed.tables.map((table) => table.tableName)).toEqual([ + 'checkout_sessions', + 'customers', + 'early_fraud_warnings', + ]) + + const customers = parsed.tables.find((table) => table.tableName === 'customers') + expect(customers?.columns).toEqual([ + { name: 'created', type: 'bigint', nullable: false }, + { name: 'deleted', type: 'boolean', nullable: false }, + { name: 'object', type: 'text', nullable: false }, + ]) + + const checkoutSessions = parsed.tables.find((table) => table.tableName === 'checkout_sessions') + expect(checkoutSessions?.columns).toContainEqual({ + name: 'amount_total', + type: 'bigint', + nullable: false, + }) + }) + + it('injects compatibility columns for runtime-critical tables', () => { + const parser = new SpecParser() + const parsed = parser.parse( + { + ...minimalStripeOpenApiSpec, + components: { schemas: {} }, + }, + { allowedTables: ['active_entitlements', 'subscription_items'] } + ) + + const activeEntitlements = parsed.tables.find( + (table) => table.tableName === 'active_entitlements' + ) + expect(activeEntitlements?.columns).toContainEqual({ + name: 'customer', + type: 'text', + nullable: true, + }) + + const subscriptionItems = parsed.tables.find( + (table) => table.tableName === 'subscription_items' + ) + expect(subscriptionItems?.columns).toContainEqual({ + name: 'deleted', + type: 'boolean', + nullable: true, + }) + expect(subscriptionItems?.columns).toContainEqual({ + name: 'subscription', + type: 'text', + nullable: true, + }) + }) + + it('is deterministic regardless of schema key order', () => { + const parser = new SpecParser() + const normal = parser.parse(minimalStripeOpenApiSpec, { + allowedTables: ['customers', 'plans', 'prices'], + }) + + const reversedSchemas = Object.fromEntries( + Object.entries(minimalStripeOpenApiSpec.components?.schemas ?? {}).reverse() + ) + const reversed = parser.parse( + { + ...minimalStripeOpenApiSpec, + components: { + schemas: reversedSchemas, + }, + }, + { allowedTables: ['customers', 'plans', 'prices'] } + ) + + expect(reversed).toEqual(normal) + }) + + it('marks expandable references from x-expansionResources metadata', () => { + const parser = new SpecParser() + const parsed = parser.parse( + { + ...minimalStripeOpenApiSpec, + components: { + schemas: { + charge: { + 'x-resourceId': 'charge', + type: 'object', + properties: { + id: { type: 'string' }, + customer: { + anyOf: [{ type: 'string' }, { $ref: '#/components/schemas/customer' }], + 'x-expansionResources': { + oneOf: [{ $ref: '#/components/schemas/customer' }], + }, + }, + }, + }, + customer: { + 'x-resourceId': 'customer', + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + }, + }, + { allowedTables: ['charges'] } + ) + + const charges = parsed.tables.find((table) => table.tableName === 'charges') + expect(charges?.columns).toContainEqual({ + name: 'customer', + type: 'json', + nullable: false, + expandableReference: true, + }) + }) + + describe('discoverListableResourceIds', () => { + it('discovers resource ids from list endpoints in paths', () => { + const parser = new SpecParser() + const ids = parser.discoverListableResourceIds(minimalStripeOpenApiSpec) + + expect(ids).toEqual( + new Set([ + 'customer', + 'plan', + 'price', + 'product', + 'subscription_item', + 'checkout.session', + 'radar.early_fraud_warning', + 'entitlements.active_entitlement', + 'entitlements.feature', + 'v2.core.account', + 'v2.core.event_destination', + ]) + ) + }) + + it('optionally includes nested list endpoints', () => { + const parser = new SpecParser() + const spec: OpenApiSpec = { + ...minimalStripeOpenApiSpec, + paths: { + ...minimalStripeOpenApiSpec.paths, + '/v1/accounts/{account}/persons': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object' as const, + properties: { + object: { + type: 'string' as const, + enum: ['list'], + }, + data: { + type: 'array' as const, + items: { + $ref: '#/components/schemas/person', + }, + }, + has_more: { + type: 'boolean' as const, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + components: { + ...minimalStripeOpenApiSpec.components, + schemas: { + ...minimalStripeOpenApiSpec.components.schemas, + person: { + 'x-resourceId': 'person', + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + }, + } + + const ids = parser.discoverListableResourceIds(spec, { includeNested: true }) + expect(ids).toContain('person') + }) + + it('returns empty set when spec has no paths', () => { + const parser = new SpecParser() + const specWithoutPaths: OpenApiSpec = { + ...minimalStripeOpenApiSpec, + paths: undefined, + } + expect(parser.discoverListableResourceIds(specWithoutPaths)).toEqual(new Set()) + }) + + it('ignores non-list GET endpoints', () => { + const parser = new SpecParser() + const spec: OpenApiSpec = { + openapi: '3.0.0', + paths: { + '/v1/customers/{customer}': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + components: { + schemas: { + customer: { + 'x-resourceId': 'customer', + type: 'object', + properties: { id: { type: 'string' } }, + }, + }, + }, + } + expect(parser.discoverListableResourceIds(spec)).toEqual(new Set()) + }) + }) + + describe('auto-discovery via paths (no allowedTables)', () => { + it('creates tables only for resources with list endpoints', () => { + const parser = new SpecParser() + const parsed = parser.parse(minimalStripeOpenApiSpec) + + const tableNames = parsed.tables.map((t) => t.tableName) + expect(tableNames).toEqual([ + 'active_entitlements', + 'checkout_sessions', + 'customers', + 'early_fraud_warnings', + 'features', + 'plans', + 'prices', + 'products', + 'subscription_items', + 'v2_core_accounts', + 'v2_core_event_destinations', + ]) + }) + + it('includes nested listables when they appear in the OpenAPI paths', () => { + const parser = new SpecParser() + const spec: OpenApiSpec = { + ...minimalStripeOpenApiSpec, + paths: { + ...minimalStripeOpenApiSpec.paths, + '/v1/accounts/{account}/persons': { + get: { + responses: { + '200': { + content: { + 'application/json': { + schema: { + type: 'object' as const, + properties: { + object: { + type: 'string' as const, + enum: ['list'], + }, + data: { + type: 'array' as const, + items: { + $ref: '#/components/schemas/person', + }, + }, + has_more: { + type: 'boolean' as const, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + components: { + ...minimalStripeOpenApiSpec.components, + schemas: { + ...minimalStripeOpenApiSpec.components.schemas, + person: { + 'x-resourceId': 'person', + type: 'object', + properties: { + id: { type: 'string' }, + }, + }, + }, + }, + } + + const parsed = parser.parse(spec) + const tableNames = parsed.tables.map((table) => table.tableName) + expect(tableNames).toContain('persons') + }) + + it('excludes schemas that have no list endpoint', () => { + const parser = new SpecParser() + const specWithLimitedPaths: OpenApiSpec = { + ...minimalStripeOpenApiSpec, + paths: { + '/v1/customers': minimalStripeOpenApiSpec.paths!['/v1/customers'], + '/v1/products': minimalStripeOpenApiSpec.paths!['/v1/products'], + }, + } + const parsed = parser.parse(specWithLimitedPaths) + + const tableNames = parsed.tables.map((t) => t.tableName) + expect(tableNames).toEqual(['customers', 'products']) + expect(tableNames).not.toContain('plans') + expect(tableNames).not.toContain('subscription_items') + }) + + it('resolves table name aliases from x-resourceId during discovery', () => { + const parser = new SpecParser() + const parsed = parser.parse(minimalStripeOpenApiSpec) + + const earlyFraud = parsed.tables.find((t) => t.tableName === 'early_fraud_warnings') + expect(earlyFraud).toBeDefined() + expect(earlyFraud?.resourceId).toBe('radar.early_fraud_warning') + + const checkout = parsed.tables.find((t) => t.tableName === 'checkout_sessions') + expect(checkout).toBeDefined() + expect(checkout?.resourceId).toBe('checkout.session') + }) + }) +}) diff --git a/packages/openapi/index.ts b/packages/openapi/index.ts new file mode 100644 index 000000000..8976e3589 --- /dev/null +++ b/packages/openapi/index.ts @@ -0,0 +1,21 @@ +export type * from './types.js' +export { SpecParser, OPENAPI_RESOURCE_TABLE_ALIASES } from './specParser.js' +export { OPENAPI_COMPATIBILITY_COLUMNS } from './runtimeMappings.js' + +export { resolveOpenApiSpec } from './specFetchHelper.js' +export { + discoverListEndpoints, + discoverNestedEndpoints, + isV2Path, + buildListFn, + buildRetrieveFn, + resolveTableName, +} from './listFnResolver.js' +export type { + ListEndpoint, + NestedEndpoint, + ListFn, + RetrieveFn, + ListParams, +} from './listFnResolver.js' +export { parsedTableToJsonSchema } from './jsonSchemaConverter.js' diff --git a/packages/openapi/jsonSchemaConverter.ts b/packages/openapi/jsonSchemaConverter.ts new file mode 100644 index 000000000..1409d9a87 --- /dev/null +++ b/packages/openapi/jsonSchemaConverter.ts @@ -0,0 +1,33 @@ +import type { ParsedResourceTable, ScalarType } from './types.js' + +const SCALAR_TYPE_TO_JSON_SCHEMA: Record = { + text: { type: 'string' }, + boolean: { type: 'boolean' }, + bigint: { type: 'integer' }, + numeric: { type: 'number' }, + json: { type: 'object' }, + timestamptz: { type: 'string', format: 'date-time' }, +} + +export function parsedTableToJsonSchema(table: ParsedResourceTable): Record { + const properties: Record = { + id: { type: 'string' }, + } + const required: string[] = ['id'] + + for (const col of table.columns) { + const mapped = SCALAR_TYPE_TO_JSON_SCHEMA[col.type] ?? { type: 'string' } + if (col.nullable) { + properties[col.name] = { oneOf: [mapped, { type: 'null' }] } + } else { + properties[col.name] = mapped + required.push(col.name) + } + } + + return { + type: 'object', + properties, + required, + } +} diff --git a/packages/openapi/listFnResolver.ts b/packages/openapi/listFnResolver.ts new file mode 100644 index 000000000..b407e87ce --- /dev/null +++ b/packages/openapi/listFnResolver.ts @@ -0,0 +1,301 @@ +import type { OpenApiSchemaObject, OpenApiSpec } from './types.js' +import { OPENAPI_RESOURCE_TABLE_ALIASES } from './runtimeMappings.js' + +const SCHEMA_REF_PREFIX = '#/components/schemas/' + +export type ListParams = { + limit?: number + starting_after?: string + ending_before?: string + created?: { gt?: number; gte?: number; lt?: number; lte?: number } +} + +export type ListResult = { data: unknown[]; has_more: boolean; pageCursor?: string } + +export type ListFn = (params: ListParams) => Promise + +export type RetrieveFn = (id: string) => Promise + +export type ListEndpoint = { + tableName: string + resourceId: string + apiPath: string + supportsCreatedFilter: boolean + supportsLimit: boolean +} + +export type NestedEndpoint = { + tableName: string + resourceId: string + apiPath: string + parentTableName: string + parentParamName: string + supportsPagination: boolean +} + +export function resolveTableName(resourceId: string, aliases: Record): string { + const alias = aliases[resourceId] + if (alias) return alias + const normalized = resourceId.toLowerCase().replace(/[.]/g, '_') + return normalized.endsWith('s') ? normalized : `${normalized}s` +} + +/** + * Detect whether a response schema describes a list endpoint. + * v1 lists have `object: enum ["list"]` with a `data` array. + * v2 lists have a `data` array with `next_page_url`. + */ +function isListResponseSchema(schema: OpenApiSchemaObject): boolean { + const dataProp = schema.properties?.data + if (!dataProp || !('type' in dataProp) || dataProp.type !== 'array') return false + + const objectProp = schema.properties?.object + if (objectProp && 'enum' in objectProp && objectProp.enum?.includes('list')) return true + + if (schema.properties?.next_page_url) return true + + return false +} + +/** + * Scan the spec for list endpoints (GET paths that return a Stripe list object) + * and return one entry per table. Prefers top-level paths over nested ones. + * Supports both v1 (object: "list") and v2 (next_page_url) response formats. + */ +export function discoverListEndpoints( + spec: OpenApiSpec, + aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES +): Map { + const endpoints = new Map() + const paths = spec.paths + if (!paths) return endpoints + + for (const [apiPath, pathItem] of Object.entries(paths)) { + if (apiPath.includes('{')) continue + + const getOp = pathItem.get + if (!getOp?.responses) continue + + const responseSchema = getOp.responses['200']?.content?.['application/json']?.schema + if (!responseSchema) continue + + if (!isListResponseSchema(responseSchema)) continue + + const dataProp = responseSchema.properties?.data + if (!dataProp || !('type' in dataProp) || dataProp.type !== 'array') continue + + const itemsRef = dataProp.items + if (!itemsRef || !('$ref' in itemsRef) || typeof itemsRef.$ref !== 'string') continue + if (!itemsRef.$ref.startsWith(SCHEMA_REF_PREFIX)) continue + + const schemaName = itemsRef.$ref.slice(SCHEMA_REF_PREFIX.length) + const schema = spec.components?.schemas?.[schemaName] + if (!schema || '$ref' in schema) continue + + const resourceId = schema['x-resourceId'] + if (!resourceId || typeof resourceId !== 'string') continue + + const tableName = resolveTableName(resourceId, aliases) + if (!endpoints.has(tableName)) { + const params = getOp.parameters ?? [] + const PAGINATION_PARAMS = new Set([ + 'limit', + 'starting_after', + 'ending_before', + 'created', + 'expand', + ]) + const hasRequiredQueryParams = params.some( + (p: { name?: string; in?: string; required?: boolean }) => + p.required === true && p.in === 'query' && !PAGINATION_PARAMS.has(p.name ?? '') + ) + if (hasRequiredQueryParams) continue + + const supportsCreatedFilter = params.some( + (p: { name?: string; in?: string }) => p.name === 'created' && p.in === 'query' + ) + const supportsLimit = params.some( + (p: { name?: string; in?: string }) => p.name === 'limit' && p.in === 'query' + ) + endpoints.set(tableName, { + tableName, + resourceId, + apiPath, + supportsCreatedFilter, + supportsLimit, + }) + } + } + + return endpoints +} + +/** + * Scan the spec for nested list endpoints (paths with `{param}` segments that + * return a Stripe list object) and map each to its parent resource. + */ +export function discoverNestedEndpoints( + spec: OpenApiSpec, + topLevelEndpoints: Map, + aliases: Record = OPENAPI_RESOURCE_TABLE_ALIASES +): NestedEndpoint[] { + const nested: NestedEndpoint[] = [] + const paths = spec.paths + if (!paths) return nested + + const topLevelByPath = new Map() + for (const endpoint of topLevelEndpoints.values()) { + topLevelByPath.set(endpoint.apiPath, endpoint) + } + + for (const [apiPath, pathItem] of Object.entries(paths)) { + if (!apiPath.includes('{')) continue + + const getOp = pathItem.get + if (!getOp?.responses) continue + + const responseSchema = getOp.responses['200']?.content?.['application/json']?.schema + if (!responseSchema) continue + + if (!isListResponseSchema(responseSchema)) continue + + const dataProp = responseSchema.properties?.data + if (!dataProp || !('type' in dataProp) || dataProp.type !== 'array') continue + + const itemsRef = dataProp.items + if (!itemsRef || !('$ref' in itemsRef) || typeof itemsRef.$ref !== 'string') continue + if (!itemsRef.$ref.startsWith(SCHEMA_REF_PREFIX)) continue + + const schemaName = itemsRef.$ref.slice(SCHEMA_REF_PREFIX.length) + const schema = spec.components?.schemas?.[schemaName] + if (!schema || '$ref' in schema) continue + + const resourceId = schema['x-resourceId'] + if (!resourceId || typeof resourceId !== 'string') continue + + const paramMatch = apiPath.match(/\{([^}]+)\}/) + if (!paramMatch) continue + const parentParamName = paramMatch[1] + + const parentPath = apiPath.slice(0, apiPath.indexOf('/{')) + const parentEndpoint = topLevelByPath.get(parentPath) + if (!parentEndpoint) continue + + const params = getOp.parameters ?? [] + const supportsPagination = params.some((p: { name?: string }) => p.name === 'limit') + + nested.push({ + tableName: resolveTableName(resourceId, aliases), + resourceId, + apiPath, + parentTableName: parentEndpoint.tableName, + parentParamName, + supportsPagination, + }) + } + + return nested +} + +export function isV2Path(apiPath: string): boolean { + return apiPath.startsWith('/v2/') +} + +// --------------------------------------------------------------------------- +// HTTP-based list / retrieve builders (no Stripe SDK dependency) +// --------------------------------------------------------------------------- + +const DEFAULT_STRIPE_API_BASE = 'https://api.stripe.com' + +function authHeaders(apiKey: string): Record { + return { Authorization: `Bearer ${apiKey}` } +} + +/** + * Build a callable list function that hits the Stripe HTTP API directly. + * Supports both v1 (has_more pagination) and v2 (next_page_url pagination). + */ +export function buildListFn( + apiKey: string, + apiPath: string, + apiVersion?: string, + baseUrl?: string +): ListFn { + const base = baseUrl ?? DEFAULT_STRIPE_API_BASE + + if (isV2Path(apiPath)) { + return async (params) => { + const qs = new URLSearchParams() + qs.set('limit', String(Math.min(params.limit ?? 20, 20))) + if (params.starting_after) qs.set('page', params.starting_after) + + const headers = authHeaders(apiKey) + if (apiVersion) headers['Stripe-Version'] = apiVersion + + const response = await fetch(`${base}${apiPath}?${qs}`, { headers }) + const body = (await response.json()) as { + data: unknown[] + next_page_url?: string | null + } + const pageCursor = extractPageToken(body.next_page_url) + return { data: body.data ?? [], has_more: !!body.next_page_url, pageCursor } + } + } + + return async (params) => { + const qs = new URLSearchParams() + if (params.limit != null) qs.set('limit', String(params.limit)) + if (params.starting_after) qs.set('starting_after', params.starting_after) + if (params.ending_before) qs.set('ending_before', params.ending_before) + if (params.created) { + for (const [op, val] of Object.entries(params.created)) { + if (val != null) qs.set(`created[${op}]`, String(val)) + } + } + + const response = await fetch(`${base}${apiPath}?${qs}`, { + headers: authHeaders(apiKey), + }) + const body = (await response.json()) as { data: unknown[]; has_more: boolean } + return { data: body.data ?? [], has_more: body.has_more } + } +} + +/** + * Build a callable retrieve function that hits the Stripe HTTP API directly. + */ +export function buildRetrieveFn( + apiKey: string, + apiPath: string, + apiVersion?: string, + baseUrl?: string +): RetrieveFn { + const base = baseUrl ?? DEFAULT_STRIPE_API_BASE + + if (isV2Path(apiPath)) { + return async (id) => { + const headers = authHeaders(apiKey) + if (apiVersion) headers['Stripe-Version'] = apiVersion + + const response = await fetch(`${base}${apiPath}/${id}`, { headers }) + return await response.json() + } + } + + return async (id) => { + const response = await fetch(`${base}${apiPath}/${id}`, { + headers: authHeaders(apiKey), + }) + return await response.json() + } +} + +function extractPageToken(nextPageUrl: string | null | undefined): string | undefined { + if (!nextPageUrl) return undefined + try { + const url = new URL(nextPageUrl, DEFAULT_STRIPE_API_BASE) + return url.searchParams.get('page') ?? undefined + } catch { + return undefined + } +} diff --git a/packages/openapi/package.json b/packages/openapi/package.json new file mode 100644 index 000000000..b0c0fd040 --- /dev/null +++ b/packages/openapi/package.json @@ -0,0 +1,25 @@ +{ + "name": "@stripe/sync-openapi", + "version": "0.1.0", + "private": true, + "type": "module", + "exports": { + ".": { + "bun": "./index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc", + "test": "vitest --passWithNoTests" + }, + "files": [ + "dist" + ], + "dependencies": {}, + "devDependencies": { + "@types/node": "^24.5.0", + "vitest": "^3.2.4" + } +} diff --git a/packages/openapi/runtimeMappings.ts b/packages/openapi/runtimeMappings.ts new file mode 100644 index 000000000..423986662 --- /dev/null +++ b/packages/openapi/runtimeMappings.ts @@ -0,0 +1,46 @@ +import type { ParsedColumn } from './types.js' + +/** + * Overrides for x-resourceId values whose table name cannot be inferred by the + * default pluralisation / dot-to-underscore rule in SpecParser.resolveTableName. + */ +export const OPENAPI_RESOURCE_TABLE_ALIASES: Record = { + 'radar.early_fraud_warning': 'early_fraud_warnings', + 'entitlements.active_entitlement': 'active_entitlements', + 'entitlements.feature': 'features', + item: 'checkout_session_line_items', +} + +/** + * Compatibility columns that should exist even if not present in the current OpenAPI shape. + * This preserves backwards compatibility for existing queries and write paths. + * todo: Remove this + */ +export const OPENAPI_COMPATIBILITY_COLUMNS: Record = { + active_entitlements: [ + { name: 'customer', type: 'text', nullable: true }, + { name: 'object', type: 'text', nullable: true }, + { name: 'feature', type: 'text', nullable: true }, + { name: 'livemode', type: 'boolean', nullable: true }, + { name: 'lookup_key', type: 'text', nullable: true }, + ], + checkout_session_line_items: [ + { name: 'checkout_session', type: 'text', nullable: true }, + { name: 'amount_discount', type: 'bigint', nullable: true }, + { name: 'amount_tax', type: 'bigint', nullable: true }, + ], + customers: [{ name: 'deleted', type: 'boolean', nullable: true }], + early_fraud_warnings: [{ name: 'payment_intent', type: 'text', nullable: true }], + features: [ + { name: 'object', type: 'text', nullable: true }, + { name: 'name', type: 'text', nullable: true }, + { name: 'lookup_key', type: 'text', nullable: true }, + { name: 'active', type: 'boolean', nullable: true }, + { name: 'livemode', type: 'boolean', nullable: true }, + { name: 'metadata', type: 'json', nullable: true }, + ], + subscription_items: [ + { name: 'deleted', type: 'boolean', nullable: true }, + { name: 'subscription', type: 'text', nullable: true }, + ], +} diff --git a/packages/openapi/specFetchHelper.ts b/packages/openapi/specFetchHelper.ts new file mode 100644 index 000000000..2b0dcec4b --- /dev/null +++ b/packages/openapi/specFetchHelper.ts @@ -0,0 +1,181 @@ +import os from 'node:os' +import fs from 'node:fs/promises' +import path from 'node:path' +import type { OpenApiSpec, ResolveSpecConfig, ResolvedOpenApiSpec } from './types.js' + +const DEFAULT_CACHE_DIR = path.join(os.tmpdir(), 'stripe-sync-openapi-cache') + +export async function resolveOpenApiSpec(config: ResolveSpecConfig): Promise { + const apiVersion = config.apiVersion + if (!apiVersion || !/^\d{4}-\d{2}-\d{2}(\.\w+)?$/.test(apiVersion)) { + throw new Error( + `Invalid Stripe API version "${apiVersion}". Expected YYYY-MM-DD or YYYY-MM-DD.codename.` + ) + } + + if (config.openApiSpecPath) { + const explicitSpec = await readSpecFromPath(config.openApiSpecPath) + return { + apiVersion, + spec: explicitSpec, + source: 'explicit_path', + cachePath: config.openApiSpecPath, + } + } + + const cacheDir = config.cacheDir ?? DEFAULT_CACHE_DIR + const cachePath = getCachePath(cacheDir, apiVersion) + const cachedSpec = await tryReadCachedSpec(cachePath) + if (cachedSpec) { + return { + apiVersion, + spec: cachedSpec, + source: 'cache', + cachePath, + } + } + + let commitSha = await resolveCommitShaForApiVersion(apiVersion) + if (!commitSha) { + commitSha = await resolveLatestCommitSha() + } + if (!commitSha) { + throw new Error( + `Could not resolve Stripe OpenAPI commit for API version ${apiVersion} and no local spec path was provided.` + ) + } + + const spec = await fetchSpecForCommit(commitSha) + validateOpenApiSpec(spec) + await tryWriteCache(cachePath, spec) + + return { + apiVersion, + spec, + source: 'github', + cachePath, + commitSha, + } +} + +async function readSpecFromPath(openApiSpecPath: string): Promise { + const raw = await fs.readFile(openApiSpecPath, 'utf8') + let parsed: unknown + try { + parsed = JSON.parse(raw) + } catch (error) { + throw new Error( + `Failed to parse OpenAPI spec at ${openApiSpecPath}: ${error instanceof Error ? error.message : String(error)}` + ) + } + validateOpenApiSpec(parsed) + return parsed +} + +async function tryReadCachedSpec(cachePath: string): Promise { + try { + const raw = await fs.readFile(cachePath, 'utf8') + const parsed = JSON.parse(raw) as unknown + validateOpenApiSpec(parsed) + return parsed + } catch { + return null + } +} + +async function tryWriteCache(cachePath: string, spec: OpenApiSpec): Promise { + try { + await fs.mkdir(path.dirname(cachePath), { recursive: true }) + await fs.writeFile(cachePath, JSON.stringify(spec), 'utf8') + } catch { + // Best effort only. Cache writes should never block migration flow. + } +} + +function getCachePath(cacheDir: string, apiVersion: string): string { + const safeVersion = apiVersion.replace(/[^0-9a-zA-Z_-]/g, '_') + return path.join(cacheDir, `${safeVersion}.spec3.sdk.json`) +} + +function extractDatePart(apiVersion: string): string { + const match = apiVersion.match(/^(\d{4}-\d{2}-\d{2})/) + return match ? match[1] : apiVersion +} + +async function resolveLatestCommitSha(): Promise { + const url = new URL('https://api.github.com/repos/stripe/openapi/commits') + url.searchParams.set('path', 'latest/openapi.spec3.sdk.json') + url.searchParams.set('per_page', '1') + + const response = await fetch(url, { headers: githubHeaders() }) + if (!response.ok) { + throw new Error( + `Failed to resolve latest Stripe OpenAPI commit (${response.status} ${response.statusText})` + ) + } + + const json = (await response.json()) as Array<{ sha?: string }> + const commitSha = json[0]?.sha + return typeof commitSha === 'string' && commitSha.length > 0 ? commitSha : null +} + +async function resolveCommitShaForApiVersion(apiVersion: string): Promise { + const until = `${extractDatePart(apiVersion)}T23:59:59Z` + const url = new URL('https://api.github.com/repos/stripe/openapi/commits') + url.searchParams.set('path', 'latest/openapi.spec3.sdk.json') + url.searchParams.set('until', until) + url.searchParams.set('per_page', '1') + + const response = await fetch(url, { headers: githubHeaders() }) + if (!response.ok) { + throw new Error( + `Failed to resolve Stripe OpenAPI commit (${response.status} ${response.statusText})` + ) + } + + const json = (await response.json()) as Array<{ sha?: string }> + const commitSha = json[0]?.sha + return typeof commitSha === 'string' && commitSha.length > 0 ? commitSha : null +} + +async function fetchSpecForCommit(commitSha: string): Promise { + const url = `https://raw.githubusercontent.com/stripe/openapi/${commitSha}/latest/openapi.spec3.sdk.json` + const response = await fetch(url, { headers: githubHeaders() }) + if (!response.ok) { + throw new Error( + `Failed to download Stripe OpenAPI spec for commit ${commitSha} (${response.status} ${response.statusText})` + ) + } + + const spec = (await response.json()) as unknown + validateOpenApiSpec(spec) + return spec +} + +function validateOpenApiSpec(spec: unknown): asserts spec is OpenApiSpec { + if (!spec || typeof spec !== 'object') { + throw new Error('OpenAPI spec is not an object') + } + const candidate = spec as Partial + if (typeof candidate.openapi !== 'string' || candidate.openapi.trim().length === 0) { + throw new Error('OpenAPI spec is missing the "openapi" field') + } + if (!candidate.components || typeof candidate.components !== 'object') { + throw new Error('OpenAPI spec is missing "components"') + } + if (!candidate.components.schemas || typeof candidate.components.schemas !== 'object') { + throw new Error('OpenAPI spec is missing "components.schemas"') + } +} + +function githubHeaders(): HeadersInit { + const headers: Record = { + Accept: 'application/vnd.github+json', + 'User-Agent': 'stripe-sync-engine-openapi', + } + const token = process.env.GITHUB_TOKEN || process.env.GH_TOKEN + if (token) { + headers.Authorization = `Bearer ${token}` + } + return headers +} diff --git a/packages/openapi/specParser.ts b/packages/openapi/specParser.ts new file mode 100644 index 000000000..e670b4218 --- /dev/null +++ b/packages/openapi/specParser.ts @@ -0,0 +1,416 @@ +import type { + OpenApiSchemaObject, + OpenApiSchemaOrReference, + OpenApiSpec, + ParseSpecOptions, + ParsedColumn, + ParsedOpenApiSpec, + ScalarType, +} from './types.js' +import { OPENAPI_COMPATIBILITY_COLUMNS, OPENAPI_RESOURCE_TABLE_ALIASES } from './runtimeMappings.js' + +const SCHEMA_REF_PREFIX = '#/components/schemas/' + +const RESERVED_COLUMNS = new Set([ + 'id', + '_raw_data', + '_last_synced_at', + '_updated_at', + '_account_id', +]) + +export { OPENAPI_RESOURCE_TABLE_ALIASES } + +type ColumnAccumulator = { + type: ScalarType + nullable: boolean + expandableReference: boolean +} + +export class SpecParser { + parse(spec: OpenApiSpec, options: ParseSpecOptions = {}): ParsedOpenApiSpec { + const schemas = spec.components?.schemas + if (!schemas || typeof schemas !== 'object') { + throw new Error('OpenAPI spec is missing components.schemas') + } + + const aliases = { ...OPENAPI_RESOURCE_TABLE_ALIASES, ...(options.resourceAliases ?? {}) } + const excluded = new Set(options.excludedTables ?? []) + const allowedTables = options.allowedTables + ? new Set(options.allowedTables.filter((t) => !excluded.has(t))) + : this.discoverAllowedTables(spec, aliases, excluded) + const tableMap = new Map< + string, + { + resourceId: string + sourceSchemaName: string + columns: Map + } + >() + + for (const schemaName of Object.keys(schemas).sort((a, b) => a.localeCompare(b))) { + const schema = this.resolveSchema({ $ref: `#/components/schemas/${schemaName}` }, spec) + const resourceId = schema['x-resourceId'] + if (!resourceId || typeof resourceId !== 'string') { + continue + } + + const tableName = this.resolveTableName(resourceId, aliases) + if (!allowedTables.has(tableName)) { + continue + } + + const propCandidates = this.collectPropertyCandidates( + { $ref: `#/components/schemas/${schemaName}` }, + spec + ) + const parsedColumns = this.parseColumns(propCandidates, spec) + + const existing = + tableMap.get(tableName) ?? + ({ + resourceId, + sourceSchemaName: schemaName, + columns: new Map(), + } as const) + + for (const column of parsedColumns) { + const current = existing.columns.get(column.name) + if (!current) { + existing.columns.set(column.name, { + type: column.type, + nullable: column.nullable, + expandableReference: column.expandableReference ?? false, + }) + continue + } + existing.columns.set(column.name, { + type: this.mergeTypes(current.type, column.type), + nullable: current.nullable || column.nullable, + expandableReference: current.expandableReference || (column.expandableReference ?? false), + }) + } + + tableMap.set(tableName, existing) + } + + for (const tableName of Array.from(allowedTables).sort((a, b) => a.localeCompare(b))) { + const current = + tableMap.get(tableName) ?? + ({ + resourceId: tableName, + sourceSchemaName: 'compatibility_fallback', + columns: new Map(), + } as const) + for (const compatibilityColumn of OPENAPI_COMPATIBILITY_COLUMNS[tableName] ?? []) { + const existing = current.columns.get(compatibilityColumn.name) + if (!existing) { + current.columns.set(compatibilityColumn.name, { + type: compatibilityColumn.type, + nullable: compatibilityColumn.nullable, + expandableReference: compatibilityColumn.expandableReference ?? false, + }) + } + } + tableMap.set(tableName, current) + } + + const tables = Array.from(tableMap.entries()) + .sort(([a], [b]) => a.localeCompare(b)) + .map(([tableName, table]) => ({ + tableName, + resourceId: table.resourceId, + sourceSchemaName: table.sourceSchemaName, + columns: Array.from(table.columns.entries()) + .map(([name, value]) => ({ + name, + type: value.type, + nullable: value.nullable, + ...(value.expandableReference ? { expandableReference: true } : {}), + })) + .sort((a, b) => a.name.localeCompare(b.name)), + })) + + return { + apiVersion: spec.info?.version ?? spec.openapi ?? 'unknown', + tables, + } + } + + /** + * Scan the spec's `paths` for GET endpoints that return Stripe list objects, + * and resolve each listed resource's x-resourceId into a table name. + */ + private discoverAllowedTables( + spec: OpenApiSpec, + aliases: Record, + excluded: Set + ): Set { + const resourceIds = this.discoverListableResourceIds(spec, { + includeNested: true, + }) + const tables = new Set() + for (const resourceId of resourceIds) { + const tableName = this.resolveTableName(resourceId, aliases) + if (!excluded.has(tableName)) { + tables.add(tableName) + } + } + return tables + } + + /** + * Extract x-resourceId values for every schema that is returned by a list + * endpoint. Supports both v1 (object: "list") and v2 (next_page_url) formats. + */ + discoverListableResourceIds( + spec: OpenApiSpec, + options: { includeNested: boolean } = { includeNested: false } + ): Set { + const resourceIds = new Set() + const paths = spec.paths + if (!paths) { + return resourceIds + } + + for (const [apiPath, pathItem] of Object.entries(paths)) { + if (!options.includeNested && apiPath.includes('{')) continue + + const getOp = pathItem.get + if (!getOp?.responses) continue + + const responseSchema = getOp.responses['200']?.content?.['application/json']?.schema + if (!responseSchema) continue + + if (!this.isListResponseSchema(responseSchema)) continue + + const dataProp = responseSchema.properties?.data + if (!dataProp || !('type' in dataProp) || dataProp.type !== 'array') continue + + const itemsRef = dataProp.items + if (!itemsRef || !this.isReference(itemsRef)) continue + if (!itemsRef.$ref.startsWith(SCHEMA_REF_PREFIX)) continue + + const schemaName = itemsRef.$ref.slice(SCHEMA_REF_PREFIX.length) + const schema = spec.components?.schemas?.[schemaName] + if (!schema || '$ref' in schema) continue + + const resourceId = schema['x-resourceId'] + if (resourceId && typeof resourceId === 'string') { + resourceIds.add(resourceId) + } + } + + return resourceIds + } + + /** + * Detect whether a response schema describes a list endpoint. + * v1 lists have `object: enum ["list"]` with a `data` array. + * v2 lists have a `data` array with `next_page_url`. + */ + private isListResponseSchema(schema: OpenApiSchemaObject): boolean { + const dataProp = schema.properties?.data + if (!dataProp || !('type' in dataProp) || dataProp.type !== 'array') return false + + const objectProp = schema.properties?.object + if (objectProp && 'enum' in objectProp && objectProp.enum?.includes('list')) return true + + if (schema.properties?.next_page_url) return true + + return false + } + + private resolveTableName(resourceId: string, aliases: Record): string { + const alias = aliases[resourceId] + if (alias) { + return alias + } + + const normalized = resourceId.toLowerCase().replace(/[.]/g, '_') + return normalized.endsWith('s') ? normalized : `${normalized}s` + } + + private parseColumns( + propCandidates: Map, + spec: OpenApiSpec + ): ParsedColumn[] { + const columns: ParsedColumn[] = [] + for (const [propertyName, candidates] of Array.from(propCandidates.entries()).sort(([a], [b]) => + a.localeCompare(b) + )) { + if (RESERVED_COLUMNS.has(propertyName)) { + continue + } + const inferred = this.inferFromCandidates(candidates, spec) + columns.push({ + name: propertyName, + type: inferred.type, + nullable: inferred.nullable, + ...(inferred.expandableReference ? { expandableReference: true } : {}), + }) + } + return columns + } + + private inferFromCandidates( + candidates: OpenApiSchemaOrReference[], + spec: OpenApiSpec + ): { type: ScalarType; nullable: boolean; expandableReference: boolean } { + if (candidates.length === 0) { + return { type: 'text', nullable: true, expandableReference: false } + } + + let mergedType: ScalarType | null = null + let nullable = false + let expandableReference = false + for (const candidate of candidates) { + const inferred = this.inferType(candidate, spec) + mergedType = mergedType ? this.mergeTypes(mergedType, inferred.type) : inferred.type + nullable = nullable || inferred.nullable + expandableReference = + expandableReference || this.isExpandableReferenceCandidate(candidate, spec) + } + + return { type: mergedType ?? 'text', nullable, expandableReference } + } + + private mergeTypes(left: ScalarType, right: ScalarType): ScalarType { + if (left === right) return left + if (left === 'json' || right === 'json') return 'json' + if ((left === 'numeric' && right === 'bigint') || (left === 'bigint' && right === 'numeric')) { + return 'numeric' + } + if (left === 'timestamptz' && right === 'text') return 'text' + if (left === 'text' && right === 'timestamptz') return 'text' + return 'text' + } + + private inferType( + schemaOrRef: OpenApiSchemaOrReference, + spec: OpenApiSpec + ): { type: ScalarType; nullable: boolean } { + const schema = this.resolveSchema(schemaOrRef, spec) + const nullable = Boolean(schema.nullable) + + if (schema.oneOf?.length) { + const merged = this.inferFromCandidates(schema.oneOf, spec) + return { type: merged.type, nullable: nullable || merged.nullable } + } + if (schema.anyOf?.length) { + const merged = this.inferFromCandidates(schema.anyOf, spec) + return { type: merged.type, nullable: nullable || merged.nullable } + } + if (schema.allOf?.length) { + const merged = this.inferFromCandidates(schema.allOf, spec) + return { type: merged.type, nullable: nullable || merged.nullable } + } + + if (schema.type === 'boolean') return { type: 'boolean', nullable } + if (schema.type === 'integer') return { type: 'bigint', nullable } + if (schema.type === 'number') return { type: 'numeric', nullable } + if (schema.type === 'string') { + if (schema.format === 'date-time') { + return { type: 'timestamptz', nullable } + } + return { type: 'text', nullable } + } + if (schema.type === 'array') return { type: 'json', nullable } + if (schema.type === 'object') return { type: 'json', nullable } + if (schema.properties || schema.additionalProperties) return { type: 'json', nullable } + + if (schema.enum && schema.enum.length > 0) { + const values = schema.enum + if (values.every((value) => typeof value === 'boolean')) { + return { type: 'boolean', nullable } + } + if (values.every((value) => typeof value === 'number' && Number.isInteger(value))) { + return { type: 'bigint', nullable } + } + if (values.every((value) => typeof value === 'number')) { + return { type: 'numeric', nullable } + } + } + + return { type: 'text', nullable: true } + } + + private isExpandableReferenceCandidate( + schemaOrRef: OpenApiSchemaOrReference, + spec: OpenApiSpec + ): boolean { + const schema = this.resolveSchema(schemaOrRef, spec) + return Boolean(schema['x-expansionResources']) + } + + private collectPropertyCandidates( + schemaOrRef: OpenApiSchemaOrReference, + spec: OpenApiSpec, + seenRefs = new Set(), + seenSchemas = new Set() + ): Map { + if (this.isReference(schemaOrRef)) { + if (seenRefs.has(schemaOrRef.$ref)) { + return new Map() + } + seenRefs.add(schemaOrRef.$ref) + } + + const schema = this.resolveSchema(schemaOrRef, spec) + if (seenSchemas.has(schema)) { + return new Map() + } + seenSchemas.add(schema) + + const merged = new Map() + const pushProp = (name: string, value: OpenApiSchemaOrReference) => { + const existing = merged.get(name) ?? [] + existing.push(value) + merged.set(name, existing) + } + + for (const [name, value] of Object.entries(schema.properties ?? {})) { + pushProp(name, value) + } + + for (const composed of [schema.allOf, schema.oneOf, schema.anyOf]) { + if (!composed) continue + for (const subSchema of composed) { + const subProps = this.collectPropertyCandidates(subSchema, spec, seenRefs, seenSchemas) + for (const [name, candidates] of subProps.entries()) { + for (const candidate of candidates) { + pushProp(name, candidate) + } + } + } + } + + return merged + } + + private resolveSchema( + schemaOrRef: OpenApiSchemaOrReference, + spec: OpenApiSpec + ): OpenApiSchemaObject { + if (!this.isReference(schemaOrRef)) { + return schemaOrRef + } + + if (!schemaOrRef.$ref.startsWith(SCHEMA_REF_PREFIX)) { + throw new Error(`Unsupported OpenAPI reference: ${schemaOrRef.$ref}`) + } + const schemaName = schemaOrRef.$ref.slice(SCHEMA_REF_PREFIX.length) + const resolved = spec.components?.schemas?.[schemaName] + if (!resolved) { + throw new Error(`Failed to resolve OpenAPI schema reference: ${schemaOrRef.$ref}`) + } + if (this.isReference(resolved)) { + return this.resolveSchema(resolved, spec) + } + return resolved + } + + private isReference(schemaOrRef: OpenApiSchemaOrReference): schemaOrRef is { $ref: string } { + return typeof (schemaOrRef as { $ref?: string }).$ref === 'string' + } +} diff --git a/packages/openapi/tsconfig.json b/packages/openapi/tsconfig.json new file mode 100644 index 000000000..05a09c37d --- /dev/null +++ b/packages/openapi/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "target": "esnext", + "module": "nodenext", + "moduleResolution": "nodenext", + "declaration": true, + "declarationMap": true, + "outDir": "dist", + "esModuleInterop": true, + "skipLibCheck": true, + "strict": true, + "resolveJsonModule": true + }, + "include": ["*.ts"], + "exclude": ["__tests__"] +} diff --git a/packages/openapi/types.ts b/packages/openapi/types.ts new file mode 100644 index 000000000..b4ccbebf9 --- /dev/null +++ b/packages/openapi/types.ts @@ -0,0 +1,120 @@ +export type OpenApiSchemaObject = { + type?: string + format?: string + nullable?: boolean + properties?: Record + items?: OpenApiSchemaOrReference + oneOf?: OpenApiSchemaOrReference[] + anyOf?: OpenApiSchemaOrReference[] + allOf?: OpenApiSchemaOrReference[] + enum?: unknown[] + additionalProperties?: boolean | OpenApiSchemaOrReference + 'x-resourceId'?: string + 'x-expandableFields'?: string[] + 'x-expansionResources'?: { + oneOf?: OpenApiSchemaOrReference[] + } +} + +export type OpenApiReferenceObject = { + $ref: string +} + +export type OpenApiSchemaOrReference = OpenApiSchemaObject | OpenApiReferenceObject + +export type OpenApiResponseContent = { + schema?: OpenApiSchemaObject +} + +export type OpenApiResponse = { + content?: { + 'application/json'?: OpenApiResponseContent + } +} + +export type OpenApiOperationObject = { + operationId?: string + parameters?: { + name?: string + in?: string + required?: boolean + schema?: OpenApiSchemaOrReference + }[] + responses?: Record +} + +export type OpenApiPathItem = { + get?: OpenApiOperationObject + put?: OpenApiOperationObject + post?: OpenApiOperationObject + delete?: OpenApiOperationObject + options?: OpenApiOperationObject + head?: OpenApiOperationObject + patch?: OpenApiOperationObject + trace?: OpenApiOperationObject +} + +export type OpenApiSpec = { + openapi: string + info?: { + version?: string + } + paths?: Record + components?: { + schemas?: Record + } +} + +export type ScalarType = 'text' | 'boolean' | 'bigint' | 'numeric' | 'json' | 'timestamptz' + +export type ParsedColumn = { + name: string + type: ScalarType + nullable: boolean + expandableReference?: boolean +} + +export type ParsedResourceTable = { + tableName: string + resourceId: string + sourceSchemaName: string + columns: ParsedColumn[] +} + +export type ParsedOpenApiSpec = { + apiVersion: string + tables: ParsedResourceTable[] +} + +export type ParseSpecOptions = { + /** + * Map Stripe x-resourceId values to concrete Postgres table names. + * Entries are matched case-sensitively. + */ + resourceAliases?: Record + /** + * Restrict parsing to these table names. + * If omitted, listable resources are discovered from the spec's paths. + */ + allowedTables?: string[] + /** + * Table names to exclude from parsing, even if discovered or allowed. + * Used to avoid collisions with tables managed outside of the OpenAPI adapter + * (e.g. the bootstrap `_accounts` table). + */ + excludedTables?: string[] +} + +export type ResolveSpecConfig = { + apiVersion: string + openApiSpecPath?: string + cacheDir?: string +} + +export type ResolvedOpenApiSpec = { + apiVersion: string + spec: OpenApiSpec + source: 'explicit_path' | 'cache' | 'github' + cachePath?: string + commitSha?: string +} diff --git a/packages/source-stripe/package.json b/packages/source-stripe/package.json index 25dd61ea8..330e740a5 100644 --- a/packages/source-stripe/package.json +++ b/packages/source-stripe/package.json @@ -27,6 +27,7 @@ "src" ], "dependencies": { + "@stripe/sync-openapi": "workspace:*", "@stripe/sync-protocol": "workspace:*", "stripe": "^17.7.0", "ws": "^8.18.0", diff --git a/packages/source-stripe/src/catalog.ts b/packages/source-stripe/src/catalog.ts index d754968c9..855c55aaf 100644 --- a/packages/source-stripe/src/catalog.ts +++ b/packages/source-stripe/src/catalog.ts @@ -1,7 +1,7 @@ import type { CatalogMessage, Stream } from '@stripe/sync-protocol' import type { ResourceConfig } from './types.js' -import type { ParsedResourceTable } from './openapi/types.js' -import { parsedTableToJsonSchema } from './openapi/jsonSchemaConverter.js' +import type { ParsedResourceTable } from '@stripe/sync-openapi' +import { parsedTableToJsonSchema } from '@stripe/sync-openapi' /** Derive a CatalogMessage from the existing resource registry (no json_schema). */ export function catalogFromRegistry(registry: Record): CatalogMessage { diff --git a/packages/source-stripe/src/index.test.ts b/packages/source-stripe/src/index.test.ts index ba3224316..acbb7ce62 100644 --- a/packages/source-stripe/src/index.test.ts +++ b/packages/source-stripe/src/index.test.ts @@ -116,8 +116,8 @@ describe('StripeSource', () => { describe('discover()', () => { it('returns a CatalogMessage with known streams', async () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), - invoice: makeConfig({ order: 2, tableName: 'invoices' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), + invoices: makeConfig({ order: 2, tableName: 'invoices' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -130,7 +130,7 @@ describe('StripeSource', () => { it('excludes resources with sync: false', async () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), internal: makeConfig({ order: 2, tableName: 'internal', sync: false }), } @@ -169,7 +169,7 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -236,12 +236,12 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: custListFn as ResourceConfig['listFn'], }), - invoice: makeConfig({ + invoices: makeConfig({ order: 2, tableName: 'invoices', listFn: invListFn as ResourceConfig['listFn'], @@ -298,7 +298,7 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -334,7 +334,7 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -372,7 +372,7 @@ describe('StripeSource', () => { describe('fromWebhookEvent() — live mode scenarios', () => { it('webhook mode emits one RecordMessage + one StateMessage per event', () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } const event = makeEvent({ @@ -404,7 +404,7 @@ describe('StripeSource', () => { it('returns null for unsupported object type', () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } const event = makeEvent({ @@ -417,7 +417,7 @@ describe('StripeSource', () => { it('returns null for objects without id (preview/draft)', () => { const registry: Record = { - invoice: makeConfig({ order: 1, tableName: 'invoices' }), + invoices: makeConfig({ order: 1, tableName: 'invoices' }), } const event = makeEvent({ @@ -431,7 +431,7 @@ describe('StripeSource', () => { it('passes through deleted flag from event data', () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } const event = makeEvent({ @@ -451,7 +451,7 @@ describe('StripeSource', () => { it('returns null when event data.object has no object field', () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } const event = makeEvent({ @@ -467,7 +467,7 @@ describe('StripeSource', () => { // The same Stripe.Event structure is received regardless of transport. // This test verifies fromWebhookEvent works for any Stripe.Event input. const registry: Record = { - invoice: makeConfig({ order: 1, tableName: 'invoices' }), + invoices: makeConfig({ order: 1, tableName: 'invoices' }), } const event = makeEvent({ @@ -491,7 +491,7 @@ describe('StripeSource', () => { const listFn = vi.fn().mockRejectedValueOnce(new Error('Rate limit exceeded')) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -541,7 +541,7 @@ describe('StripeSource', () => { const listFn = vi.fn().mockRejectedValueOnce(new Error('Connection refused')) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -568,12 +568,12 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: failingListFn as ResourceConfig['listFn'], }), - invoice: makeConfig({ + invoices: makeConfig({ order: 2, tableName: 'invoices', listFn: successListFn as ResourceConfig['listFn'], @@ -621,7 +621,7 @@ describe('StripeSource', () => { // Shared registry for these tests const listFn = vi.fn() const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -810,7 +810,7 @@ describe('StripeSource', () => { describe('read(input) — enriched webhook processing', () => { it('delete event yields record with deleted: true', async () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -840,7 +840,7 @@ describe('StripeSource', () => { it('delete event detected by event type (not just deleted flag)', async () => { const registry: Record = { - product: makeConfig({ order: 1, tableName: 'products' }), + products: makeConfig({ order: 1, tableName: 'products' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -866,7 +866,7 @@ describe('StripeSource', () => { it('subscription event yields subscription_items from nested items.data', async () => { const registry: Record = { - subscription: makeConfig({ order: 1, tableName: 'subscriptions' }), + subscriptions: makeConfig({ order: 1, tableName: 'subscriptions' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -992,7 +992,7 @@ describe('StripeSource', () => { }) const registry: Record = { - subscription: makeConfig({ + subscriptions: makeConfig({ order: 1, tableName: 'subscriptions', retrieveFn: retrieveFn as ResourceConfig['retrieveFn'], @@ -1027,7 +1027,7 @@ describe('StripeSource', () => { const retrieveFn = vi.fn() const registry: Record = { - subscription: makeConfig({ + subscriptions: makeConfig({ order: 1, tableName: 'subscriptions', retrieveFn: retrieveFn as ResourceConfig['retrieveFn'], @@ -1061,7 +1061,7 @@ describe('StripeSource', () => { it('preview objects (no id) produce no output', async () => { const registry: Record = { - invoice: makeConfig({ order: 1, tableName: 'invoices' }), + invoices: makeConfig({ order: 1, tableName: 'invoices' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -1105,7 +1105,7 @@ describe('StripeSource', () => { it('throws when raw webhook input is provided without webhook_secret', async () => { const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers' }), + customers: makeConfig({ order: 1, tableName: 'customers' }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) @@ -1124,12 +1124,12 @@ describe('StripeSource', () => { describe('read() — WebSocket streaming', () => { const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: (() => Promise.resolve({ data: [], has_more: false })) as ResourceConfig['listFn'], }), - invoice: makeConfig({ + invoices: makeConfig({ order: 2, tableName: 'invoices', listFn: (() => Promise.resolve({ data: [], has_more: false })) as ResourceConfig['listFn'], @@ -1256,7 +1256,7 @@ describe('StripeSource', () => { }) const wsRegistry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -1418,7 +1418,7 @@ describe('StripeSource', () => { it('starts an HTTP server on webhook_port and processes POSTed webhooks', async () => { const listFn = vi.fn().mockResolvedValue({ data: [], has_more: false }) const registry: Record = { - customer: makeConfig({ order: 1, tableName: 'customers', listFn }), + customers: makeConfig({ order: 1, tableName: 'customers', listFn }), } vi.mocked(buildResourceRegistry).mockReturnValue(registry as any) const cat = catalog({ name: 'customers' }) @@ -1452,7 +1452,7 @@ describe('StripeSource', () => { it('skips backfill when all streams are already complete', async () => { const listFn = vi.fn() const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -1481,7 +1481,7 @@ describe('StripeSource', () => { it('stamps initial events_cursor after first backfill completes', async () => { const listFn = vi.fn() const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -1511,7 +1511,7 @@ describe('StripeSource', () => { it('does not run events polling when poll_events is false/absent', async () => { const listFn = vi.fn() const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: listFn as ResourceConfig['listFn'], @@ -1542,12 +1542,12 @@ describe('StripeSource', () => { }) const registry: Record = { - customer: makeConfig({ + customers: makeConfig({ order: 1, tableName: 'customers', listFn: custListFn as ResourceConfig['listFn'], }), - invoice: makeConfig({ + invoices: makeConfig({ order: 2, tableName: 'invoices', listFn: (() => diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 065e3fb6e..8f4dd3b0a 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -8,12 +8,11 @@ import Stripe from 'stripe' import { z } from 'zod' import { buildResourceRegistry } from './resourceRegistry.js' import { catalogFromRegistry, catalogFromOpenApi } from './catalog.js' -import { resolveOpenApiSpec } from './openapi/specFetchHelper.js' import { + resolveOpenApiSpec, SpecParser, - RUNTIME_REQUIRED_TABLES, OPENAPI_RESOURCE_TABLE_ALIASES, -} from './openapi/specParser.js' +} from '@stripe/sync-openapi' import { processStripeEvent } from './process-event.js' import { processWebhookInput, createInputQueue, startWebhookServer } from './src-webhook.js' import { listApiBackfill } from './src-list-api.js' @@ -119,13 +118,19 @@ const source = { }, async discover({ config }) { - const registry = buildResourceRegistry(makeClient(config)) + const resolved = await resolveOpenApiSpec({ + apiVersion: config.api_version ?? '2020-08-27', + }) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url + ) try { - const resolved = await resolveOpenApiSpec({ apiVersion: '2020-08-27' }) const parser = new SpecParser() const parsed = parser.parse(resolved.spec, { resourceAliases: OPENAPI_RESOURCE_TABLE_ALIASES, - allowedTables: [...RUNTIME_REQUIRED_TABLES], }) return catalogFromOpenApi(parsed.tables, registry) } catch { @@ -172,8 +177,16 @@ const source = { }, async *read({ config, catalog, state }, $stdin?) { - const registry = buildResourceRegistry(makeClient(config)) const stripe = makeClient(config) + const resolved = await resolveOpenApiSpec({ + apiVersion: config.api_version ?? '2020-08-27', + }) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url + ) const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) // Event-driven mode: iterate over incoming webhook inputs diff --git a/packages/source-stripe/src/process-event.ts b/packages/source-stripe/src/process-event.ts index ed3433147..9ce197bcb 100644 --- a/packages/source-stripe/src/process-event.ts +++ b/packages/source-stripe/src/process-event.ts @@ -47,8 +47,7 @@ export function fromWebhookEvent( | undefined if (!dataObject?.object) return null - // Find config by matching registry keys to the Stripe object type - const objectType = dataObject.object + const objectType = normalizeStripeObjectName(dataObject.object) const config = registry[objectType] if (!config) return null @@ -147,18 +146,18 @@ export async function* processStripeEvent( // 5. Revalidation — re-fetch from Stripe API if configured let data: Record = dataObject if ( - config.revalidate_objects?.includes(objectType) && + config.revalidate_objects?.some((r) => normalizeStripeObjectName(r) === objectType) && resourceConfig.isFinalState && !resourceConfig.isFinalState(dataObject) ) { - data = (await resourceConfig.retrieveFn(dataObject.id)) as Record + data = (await resourceConfig.retrieveFn!(dataObject.id)) as Record } // 6. Yield main record yield toRecordMessage(resourceConfig.tableName, data) // 7. Yield subscription items if applicable - if (objectType === 'subscription' && (data as { items?: { data?: unknown[] } }).items?.data) { + if (objectType === 'subscriptions' && (data as { items?: { data?: unknown[] } }).items?.data) { for (const item of (data as { items: { data: Record[] } }).items.data) { yield toRecordMessage('subscription_items', item) } diff --git a/packages/source-stripe/src/resourceRegistry.ts b/packages/source-stripe/src/resourceRegistry.ts index f16bd898b..84a33fb51 100644 --- a/packages/source-stripe/src/resourceRegistry.ts +++ b/packages/source-stripe/src/resourceRegistry.ts @@ -1,367 +1,162 @@ -import Stripe from 'stripe' -import type { ResourceConfig, StripeListResourceConfig } from './types.js' +import type { ResourceConfig } from './types.js' +import type { OpenApiSpec, NestedEndpoint } from '@stripe/sync-openapi' +import { + discoverListEndpoints, + discoverNestedEndpoints, + isV2Path, + buildListFn, + buildRetrieveFn, + resolveTableName, + OPENAPI_RESOURCE_TABLE_ALIASES, +} from '@stripe/sync-openapi' + +/** + * The default set of table names synced when no explicit selection is made. + * These correspond to the resources that were previously hardcoded with sync: true. + */ +export const DEFAULT_SYNC_OBJECTS: readonly string[] = [ + 'products', + 'coupons', + 'prices', + 'plans', + 'customers', + 'subscriptions', + 'subscription_schedules', + 'invoices', + 'charges', + 'setup_intents', + 'payment_methods', + 'payment_intents', + 'tax_ids', + 'credit_notes', + 'disputes', + 'early_fraud_warnings', + 'refunds', + 'checkout_sessions', +] -interface ResourceDef { - /** Backfill sequencing order. Lower numbers sync first, ensuring dependencies are populated before dependents. */ - readonly order: number - /** Destination table name (e.g. 'payment_intents'). */ - readonly tableName: string - /** Resource keys that must be backfilled before this one (e.g. ['customer', 'invoice']). */ - readonly dependencies?: readonly string[] - /** Curried list endpoint: `list(stripe)(params) → page`. */ - readonly list: ( - s: Stripe - ) => ( - p: Stripe.PaginationParams & { created?: Stripe.RangeQueryParam } - ) => Promise<{ data: unknown[]; has_more: boolean }> - /** Curried retrieve endpoint: `retrieve(stripe)(id) → object`. */ - readonly retrieve: (s: Stripe) => (id: string) => Promise> - /** Whether the list API accepts a `created` range filter for incremental backfill. */ - readonly supportsCreatedFilter: boolean - /** Whether this resource is included in a default full sync. */ - readonly sync: boolean - /** Returns true when the object has reached a terminal state and won't change again. */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - readonly isFinalState?: (entity: any) => boolean - /** Tables created from nested data (like line items) that don't have their own top-level list API. */ - readonly childTables?: readonly string[] - /** Sub-list expansions fetched per parent object (e.g. subscription items, invoice line items). */ - readonly listExpands?: readonly Record< - string, - (s: Stripe) => (id: string) => Promise> - >[] -} - -const RESOURCE_MAP: Record = { - product: { - order: 1, - tableName: 'products', - list: (s) => (p) => s.products.list(p), - retrieve: (s) => (id) => s.products.retrieve(id), - supportsCreatedFilter: true, - sync: true, - }, - coupon: { - order: 1, - tableName: 'coupons', - list: (s) => (p) => s.coupons.list(p), - retrieve: (s) => (id) => s.coupons.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (c: Stripe.Coupon | Stripe.DeletedCoupon) => 'deleted' in c && c.deleted === true, - }, - price: { - order: 2, - tableName: 'prices', - dependencies: ['product'], - list: (s) => (p) => s.prices.list(p), - retrieve: (s) => (id) => s.prices.retrieve(id), - supportsCreatedFilter: true, - sync: true, - }, - plan: { - order: 3, - tableName: 'plans', - dependencies: ['product'], - list: (s) => (p) => s.plans.list(p), - retrieve: (s) => (id) => s.plans.retrieve(id), - supportsCreatedFilter: true, - sync: true, - }, - customer: { - order: 4, - tableName: 'customers', - list: (s) => (p) => s.customers.list(p), - retrieve: (s) => (id) => s.customers.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (c: Stripe.Customer | Stripe.DeletedCustomer) => - 'deleted' in c && c.deleted === true, - }, - subscription: { - order: 5, - tableName: 'subscriptions', - dependencies: ['customer', 'price'], - list: (s) => (p) => s.subscriptions.list(p), - retrieve: (s) => (id) => s.subscriptions.retrieve(id), - listExpands: [ - { items: (s) => (id) => s.subscriptionItems.list({ subscription: id, limit: 100 }) }, - ], - supportsCreatedFilter: true, - sync: true, - childTables: ['subscription_items'], - isFinalState: (s: Stripe.Subscription) => - s.status === 'canceled' || s.status === 'incomplete_expired', - }, - subscription_schedules: { - order: 6, - tableName: 'subscription_schedules', - dependencies: ['customer'], - list: (s) => (p) => s.subscriptionSchedules.list(p), - retrieve: (s) => (id) => s.subscriptionSchedules.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (s: Stripe.SubscriptionSchedule) => - s.status === 'canceled' || s.status === 'completed', - }, - invoice: { - order: 7, - tableName: 'invoices', - dependencies: ['customer', 'subscription'], - list: (s) => (p) => s.invoices.list(p), - retrieve: (s) => (id) => s.invoices.retrieve(id), - listExpands: [{ lines: (s) => (id) => s.invoices.listLineItems(id, { limit: 100 }) }], - supportsCreatedFilter: true, - sync: true, - isFinalState: (i: Stripe.Invoice) => i.status === 'void', - }, - charge: { - order: 8, - tableName: 'charges', - dependencies: ['customer', 'invoice'], - list: (s) => (p) => s.charges.list(p), - retrieve: (s) => (id) => s.charges.retrieve(id), - listExpands: [{ refunds: (s) => (id) => s.refunds.list({ charge: id, limit: 100 }) }], - supportsCreatedFilter: true, - sync: true, - isFinalState: (c: Stripe.Charge) => c.status === 'failed' || c.status === 'succeeded', - }, - setup_intent: { - order: 9, - tableName: 'setup_intents', - dependencies: ['customer'], - list: (s) => (p) => s.setupIntents.list(p), - retrieve: (s) => (id) => s.setupIntents.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (s: Stripe.SetupIntent) => s.status === 'canceled' || s.status === 'succeeded', - }, - payment_method: { - order: 10, - tableName: 'payment_methods', - dependencies: ['customer'], - list: (s) => (p) => s.paymentMethods.list(p), - retrieve: (s) => (id) => s.paymentMethods.retrieve(id), - supportsCreatedFilter: false, - sync: true, - }, - payment_intent: { - order: 11, - tableName: 'payment_intents', - dependencies: ['customer', 'invoice'], - list: (s) => (p) => s.paymentIntents.list(p), - retrieve: (s) => (id) => s.paymentIntents.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (p: Stripe.PaymentIntent) => p.status === 'canceled' || p.status === 'succeeded', - }, - tax_id: { - order: 12, - tableName: 'tax_ids', - dependencies: ['customer'], - list: (s) => (p) => s.taxIds.list(p), - retrieve: (s) => (id) => s.taxIds.retrieve(id), - supportsCreatedFilter: false, - sync: true, - }, - credit_note: { - order: 13, - tableName: 'credit_notes', - dependencies: ['customer', 'invoice'], - list: (s) => (p) => s.creditNotes.list(p), - retrieve: (s) => (id) => s.creditNotes.retrieve(id), - listExpands: [{ lines: (s) => (id) => s.creditNotes.listLineItems(id, { limit: 100 }) }], - supportsCreatedFilter: true, - sync: true, - isFinalState: (c: Stripe.CreditNote) => c.status === 'void', - }, - dispute: { - order: 14, - tableName: 'disputes', - dependencies: ['charge'], - list: (s) => (p) => s.disputes.list(p), - retrieve: (s) => (id) => s.disputes.retrieve(id), - supportsCreatedFilter: true, - sync: true, - isFinalState: (d: Stripe.Dispute) => d.status === 'won' || d.status === 'lost', - }, - early_fraud_warning: { - order: 15, - tableName: 'early_fraud_warnings', - dependencies: ['payment_intent', 'charge'], - list: (s) => (p) => s.radar.earlyFraudWarnings.list(p), - retrieve: (s) => (id) => s.radar.earlyFraudWarnings.retrieve(id), - supportsCreatedFilter: true, - sync: true, - }, - refund: { - order: 16, - tableName: 'refunds', - dependencies: ['payment_intent', 'charge'], - list: (s) => (p) => s.refunds.list(p), - retrieve: (s) => (id) => s.refunds.retrieve(id), - supportsCreatedFilter: true, - sync: true, - }, - checkout_sessions: { - order: 17, - tableName: 'checkout_sessions', - dependencies: ['customer', 'subscription', 'payment_intent', 'invoice'], - list: (s) => (p) => s.checkout.sessions.list(p), - retrieve: (s) => (id) => s.checkout.sessions.retrieve(id), - listExpands: [{ lines: (s) => (id) => s.checkout.sessions.listLineItems(id, { limit: 100 }) }], - supportsCreatedFilter: true, - sync: true, - childTables: ['checkout_session_line_items'], - }, - active_entitlements: { - order: 18, - tableName: 'active_entitlements', - dependencies: ['customer'], - list: (s) => (p) => - s.entitlements.activeEntitlements.list(p as Stripe.Entitlements.ActiveEntitlementListParams), - retrieve: (s) => (id) => s.entitlements.activeEntitlements.retrieve(id), - supportsCreatedFilter: true, - sync: false, - }, - review: { - order: 19, - tableName: 'reviews', - dependencies: ['payment_intent', 'charge'], - list: (s) => (p) => s.reviews.list(p), - retrieve: (s) => (id) => s.reviews.retrieve(id), - supportsCreatedFilter: true, - sync: false, - }, -} satisfies Record - -// Union of all object keys defined in RESOURCE_MAP. Used as the canonical object-name type across sync and registry helpers. -export type StripeObject = keyof typeof RESOURCE_MAP - -// Sync-enabled objects derived from RESOURCE_MAP metadata. -// Used for default full-sync selection and SyncObjectName composition. -export const CORE_SYNC_OBJECTS = Object.keys(RESOURCE_MAP).filter( - (k) => RESOURCE_MAP[k].sync -) as StripeObject[] - -// Type for one sync-enabled object key (excludes pseudo objects). -// Used where callers must operate on concrete sync resources only. -export type CoreSyncObject = (typeof CORE_SYNC_OBJECTS)[number] - -// Public sync object options including pseudo entries like "all". -// Used by sync input typing/validation for object selection. -export const SYNC_OBJECTS = ['all', 'customer_with_entitlements', ...CORE_SYNC_OBJECTS] as const - -// Type of valid sync object input values. -// Used by exported config/types and CLI/object selection paths. -export type SyncObjectName = (typeof SYNC_OBJECTS)[number] - -// Entity names accepted for webhook revalidation overrides. -// Used by StripeSyncConfig.revalidateObjectsViaStripeApi typing. export const REVALIDATE_ENTITIES = [ - ...Object.keys(RESOURCE_MAP), + ...DEFAULT_SYNC_OBJECTS, 'radar.early_fraud_warning', 'subscription_schedule', 'entitlements', ] as const -// Type for a single revalidation entity name. -// Used by RevalidateEntity in shared sync config types. export type RevalidateEntityName = (typeof REVALIDATE_ENTITIES)[number] -// Tables that must exist for runtime sync and webhook processing. -// Used by migration/spec filtering to assert required schema coverage. -export const RUNTIME_REQUIRED_TABLES: ReadonlyArray = Array.from( - new Set([ - ...Object.values(RESOURCE_MAP).map((r) => r.tableName), - ...Object.values(RESOURCE_MAP).flatMap((r) => r.childTables ?? []), - 'features', // from customer_with_entitlements - ]) -) - -// Canonical table names for each RESOURCE_MAP object key. -// Used by OpenAPI/runtime adapters to avoid duplicating table mappings. -export const RESOURCE_TABLE_NAME_MAP = Object.fromEntries( - Object.entries(RESOURCE_MAP).map(([objectName, def]) => [objectName, def.tableName]) -) as Record - -// Builds runtime ResourceConfig objects from RESOURCE_MAP + Stripe client. -// Used by StripeSync constructor to initialize this.resourceRegistry. -export function buildResourceRegistry(stripe: Stripe): Record { - return Object.fromEntries( - Object.entries(RESOURCE_MAP).map(([key, def]) => { - const config: StripeListResourceConfig = { - order: def.order, - tableName: def.tableName, - supportsCreatedFilter: def.supportsCreatedFilter, - sync: def.sync, - dependencies: def.dependencies ? [...def.dependencies] : [], - isFinalState: def.isFinalState, - listFn: def.list(stripe), - retrieveFn: def.retrieve(stripe), - listExpands: def.listExpands?.map((expand) => - Object.fromEntries(Object.entries(expand).map(([prop, fn]) => [prop, fn(stripe)])) - ), - } - return [key, config] - }) - ) as Record +/** + * Build a ResourceConfig for every listable resource discovered in the OpenAPI spec. + * All resources get list + retrieve functions derived dynamically from the spec paths. + */ +export function buildResourceRegistry( + spec: OpenApiSpec, + apiKey: string, + apiVersion?: string, + baseUrl?: string +): Record { + const endpoints = discoverListEndpoints(spec) + const nestedEndpoints = discoverNestedEndpoints(spec, endpoints) + const registry: Record = {} + const seenNested = new Set() + + for (const [tableName, endpoint] of endpoints) { + const v2 = isV2Path(endpoint.apiPath) + + const children = nestedEndpoints + .filter((n: NestedEndpoint) => n.parentTableName === tableName) + .map((n: NestedEndpoint) => ({ + tableName: n.tableName, + resourceId: n.resourceId, + apiPath: n.apiPath, + parentParamName: n.parentParamName, + supportsPagination: n.supportsPagination, + })) + + const config: ResourceConfig = { + order: 1, + tableName, + supportsCreatedFilter: !v2 && endpoint.supportsCreatedFilter, + supportsLimit: endpoint.supportsLimit, + sync: true, + dependencies: [], + listFn: buildListFn(apiKey, endpoint.apiPath, apiVersion, baseUrl), + retrieveFn: buildRetrieveFn(apiKey, endpoint.apiPath, apiVersion, baseUrl), + nestedResources: children.length > 0 ? children : undefined, + } + registry[tableName] = config + } + + for (const nested of nestedEndpoints) { + if (!nested.parentTableName || registry[nested.tableName]) { + continue + } + if (seenNested.has(nested.tableName)) { + continue + } + seenNested.add(nested.tableName) + + const config: ResourceConfig = { + order: 2, + tableName: nested.tableName, + supportsCreatedFilter: false, + supportsLimit: nested.supportsPagination, + sync: false, + dependencies: [], + listFn: undefined, + retrieveFn: undefined, + nestedResources: undefined, + parentParamName: nested.parentParamName, + } + + registry[nested.tableName] = config + } + + return registry } -// Alias map from Stripe event object names to internal registry keys. -// Used by normalizeStripeObjectName during webhook/upsert ingestion. -export const STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES: Record = { +export const STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES: Record = { 'checkout.session': 'checkout_sessions', - 'radar.early_fraud_warning': 'early_fraud_warning', + 'radar.early_fraud_warning': 'early_fraud_warnings', 'entitlements.active_entitlement': 'active_entitlements', 'entitlements.feature': 'active_entitlements', subscription_schedule: 'subscription_schedules', } -// Converts Stripe object names into canonical RESOURCE_MAP keys. -// Used before config/table lookups in webhook and sync flows. -export function normalizeStripeObjectName(stripeObjectName: string): StripeObject { - const normalizedObjectName = - STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES[stripeObjectName] ?? stripeObjectName - return normalizedObjectName as StripeObject +export function normalizeStripeObjectName(stripeObjectName: string): string { + return resolveTableName(stripeObjectName, { + ...OPENAPI_RESOURCE_TABLE_ALIASES, + ...STRIPE_OBJECT_TO_SYNC_OBJECT_ALIASES, + }) } -// Maps Stripe ID prefixes (e.g. cus_) to registry object names. -// Used when we only have an ID and need to resolve resource type. -export const PREFIX_RESOURCE_MAP: Record = { - cus_: 'customer', - gcus_: 'customer', - in_: 'invoice', - price_: 'price', - prod_: 'product', - sub_: 'subscription', - seti_: 'setup_intent', - pm_: 'payment_method', - dp_: 'dispute', - du_: 'dispute', - ch_: 'charge', - pi_: 'payment_intent', - txi_: 'tax_id', - cn_: 'credit_note', - issfr_: 'early_fraud_warning', - prv_: 'review', - re_: 'refund', +export const PREFIX_RESOURCE_MAP: Record = { + cus_: 'customers', + gcus_: 'customers', + in_: 'invoices', + price_: 'prices', + prod_: 'products', + sub_: 'subscriptions', + seti_: 'setup_intents', + pm_: 'payment_methods', + dp_: 'disputes', + du_: 'disputes', + ch_: 'charges', + pi_: 'payment_intents', + txi_: 'tax_ids', + cn_: 'credit_notes', + issfr_: 'early_fraud_warnings', + prv_: 'reviews', + re_: 'refunds', feat_: 'active_entitlements', cs_: 'checkout_sessions', } -// Prefixes sorted longest-first to avoid partial-prefix collisions. -// Used by getResourceFromPrefix for deterministic prefix matching. const SORTED_PREFIXES = Object.keys(PREFIX_RESOURCE_MAP).sort((a, b) => b.length - a.length) -// Resolves a Stripe ID string to a registry object key by prefix. -// Used by getResourceConfigFromId and single-entity sync routing. export function getResourceFromPrefix(stripeId: string): string | undefined { const prefix = SORTED_PREFIXES.find((p) => stripeId.startsWith(p)) - return prefix ? (PREFIX_RESOURCE_MAP[prefix] as string) : undefined + return prefix ? PREFIX_RESOURCE_MAP[prefix] : undefined } -// Gets ResourceConfig for a raw Stripe ID like cus_/ch_/pi_. -// Used by StripeSync.syncSingleEntity to pick retrieve/upsert behavior. export function getResourceConfigFromId( stripeId: string, registry: Record @@ -370,8 +165,6 @@ export function getResourceConfigFromId( return resourceName ? registry[resourceName] : undefined } -// Resolves table name for a canonical object key in a registry. -// Used by webhook and worker paths before writing to Postgres. export function getTableName(object: string, registry: Record): string { const config = registry[object] if (!config) throw new Error(`No resource config found for object type: ${object}`) diff --git a/packages/source-stripe/src/src-list-api.ts b/packages/source-stripe/src/src-list-api.ts index 96f75990c..0aca0dafe 100644 --- a/packages/source-stripe/src/src-list-api.ts +++ b/packages/source-stripe/src/src-list-api.ts @@ -7,6 +7,19 @@ import type { import { toRecordMessage } from '@stripe/sync-protocol' import type { ResourceConfig } from './types.js' +const SKIPPABLE_ERROR_PATTERNS = [ + 'only available in testmode', + 'not in live mode', + 'Must provide customer', + 'Must provide ', + 'Missing required param', +] + +function isSkippableError(err: unknown): boolean { + const msg = err instanceof Error ? err.message : String(err) + return SKIPPABLE_ERROR_PATTERNS.some((p) => msg.includes(p)) +} + function findConfigByTableName( registry: Record, tableName: string @@ -36,6 +49,8 @@ export async function* listApiBackfill(opts: { continue } + if (!resourceConfig.listFn) continue + // Skip already-complete streams (e.g. resuming after full backfill for events polling) const streamState = state?.[stream.name] if (streamState?.status === 'complete') continue @@ -56,12 +71,17 @@ export async function* listApiBackfill(opts: { // Drain any queued events before each page if (drainQueue) yield* drainQueue() - const params: { limit: number; starting_after?: string } = { limit: 100 } + const params: Record = {} + if (resourceConfig.supportsLimit !== false) { + params.limit = 100 + } if (pageCursor) { params.starting_after = pageCursor } - const response = await resourceConfig.listFn(params) + const response = await resourceConfig.listFn( + params as Parameters[0] + ) for (const item of response.data) { yield toRecordMessage(stream.name, item as Record) @@ -69,7 +89,9 @@ export async function* listApiBackfill(opts: { } hasMore = response.has_more - if (response.data.length > 0) { + if (response.pageCursor) { + pageCursor = response.pageCursor + } else if (response.data.length > 0) { pageCursor = (response.data[response.data.length - 1] as { id: string }).id } @@ -95,6 +117,14 @@ export async function* listApiBackfill(opts: { status: 'complete', } satisfies StreamStatusMessage } catch (err) { + if (isSkippableError(err)) { + yield { + type: 'stream_status', + stream: stream.name, + status: 'complete', + } satisfies StreamStatusMessage + continue + } const isRateLimit = err instanceof Error && err.message.includes('Rate limit') yield { type: 'error', diff --git a/packages/source-stripe/src/types.ts b/packages/source-stripe/src/types.ts index 56106c9b7..91b1a7815 100644 --- a/packages/source-stripe/src/types.ts +++ b/packages/source-stripe/src/types.ts @@ -1,4 +1,5 @@ -import Stripe from 'stripe' +import type Stripe from 'stripe' +import type { ListFn, RetrieveFn } from '@stripe/sync-openapi' import type { RevalidateEntityName } from './resourceRegistry.js' /** @@ -29,22 +30,23 @@ export type BaseResourceConfig = { isFinalState?: (entity: any) => boolean } -export type StripeListResourceConfig = BaseResourceConfig & { - /** Function to list items from Stripe API */ - listFn: (params: Stripe.PaginationParams & { created?: Stripe.RangeQueryParam }) => Promise<{ - data: unknown[] - has_more: boolean - }> - /** Function to retrieve a single item by ID from Stripe API */ - // eslint-disable-next-line @typescript-eslint/no-explicit-any - retrieveFn: (id: string) => Promise> - /** Optional list of sub-resources to expand during upsert/fetching (e.g. 'refunds', 'listLineItems') */ - listExpands?: Record Promise>>[] +export type ResourceConfig = BaseResourceConfig & { + listFn?: ListFn + retrieveFn?: RetrieveFn + /** Whether the list API supports the `limit` parameter */ + supportsLimit?: boolean + /** Nested child resources discovered from the spec (e.g. subscription items under subscriptions) */ + nestedResources?: { + tableName: string + resourceId: string + apiPath: string + parentParamName: string + supportsPagination: boolean + }[] + /** For nested resources, the parent path parameter name */ + parentParamName?: string } -/** Union of all resource configuration types */ -export type ResourceConfig = StripeListResourceConfig - export type RevalidateEntity = RevalidateEntityName export const SUPPORTED_WEBHOOK_EVENTS: Stripe.WebhookEndpointCreateParams.EnabledEvent[] = [ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ec7da7796..fd53d0085 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -345,6 +345,15 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/openapi: + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/protocol: dependencies: citty: @@ -363,6 +372,9 @@ importers: packages/source-stripe: dependencies: + '@stripe/sync-openapi': + specifier: workspace:* + version: link:../openapi '@stripe/sync-protocol': specifier: workspace:* version: link:../protocol