| title | Temporal Workflow Architecture |
|---|
When Temporal is enabled, sync lifecycle is managed by durable workflows instead of running in-process. The workflow orchestrates setup, continuous reconciliation, live event processing, and teardown.
Three servers run independently:
- Webhook Server — public-facing; receives raw Stripe events and signals the matching Temporal workflow(s)
- Service API — internal; config CRUD, credential management, config resolution
- Engine API — stateless sync execution (setup, sync, teardown via
X-Sync-Params)
Activities call the service for config resolution, then the engine for execution.
graph TD
subgraph Stripe
StripeAPI["Stripe API"]
StripeWH["Stripe Webhooks"]
end
subgraph WebhookServer["Webhook Server (public)"]
WHRoute["POST /webhooks/{cred_id}"]
end
subgraph Service["Sync Service (internal)"]
CRUD["/syncs CRUD"]
Resolve["GET /syncs/{id}<br/>?include_credentials=true"]
end
subgraph EngineAPI["Sync Engine"]
ESetup["POST /setup"]
ESync["POST /sync"]
ETeardown["POST /teardown"]
end
subgraph Temporal["Temporal Server"]
Workflow["syncWorkflow(syncId)"]
Worker["Worker (activities)"]
end
subgraph Destinations
Postgres["Postgres"]
Sheets["Google Sheets"]
end
%% Stripe → Webhook Server → Temporal
StripeWH --> WHRoute
WHRoute -- "signal: stripe_event" --> Workflow
%% Service → Workflow
CRUD -- "start / signal: delete" --> Workflow
%% Workflow → Activities
Workflow --> Worker
%% Activities → Service (resolve) → Engine (execute)
Worker -- "1. resolve config" --> Resolve
Worker -- "2. X-Sync-Params" --> ESetup
Worker -- "2. X-Sync-Params" --> ESync
Worker -- "2. X-Sync-Params" --> ETeardown
%% Engine → external
ESync -- "read" --> StripeAPI
ESync -- "write" --> Postgres
ESync -- "write" --> Sheets
Each activity makes two HTTP calls — one lightweight (resolve), one heavy (execute):
sequenceDiagram
participant Workflow
participant Activity
participant Service as Sync Service
participant Engine as Sync Engine
participant Dest as Destination
Workflow->>Activity: run(syncId)
rect rgb(240, 248, 255)
Note over Activity,Service: 1. Resolve config
Activity->>Service: GET /syncs/{id}?include_credentials=true
Service-->>Activity: SyncConfig with creds inline
Note over Activity: Build X-Sync-Params<br/>(source: {name, ...config},<br/>destination: {name, ...config},<br/>streams)
end
rect rgb(240, 255, 240)
Note over Activity,Engine: 2. Execute sync
Activity->>Engine: POST /sync<br/>X-Sync-Params: {...}
Engine->>Engine: selectStateStore → load cursors
Engine->>Engine: source.read (Stripe API)
Engine->>Dest: destination.write
Engine->>Engine: persist state checkpoints
Engine-->>Activity: NDJSON stream
Note over Activity: heartbeat every 50 msgs
end
Activity-->>Workflow: RunResult { errors }
The webhook path crosses four boundaries (Stripe → Webhook Server → Temporal → Service → Engine):
sequenceDiagram
participant Stripe
participant Webhook as Webhook Server
participant Workflow as syncWorkflow
participant Activity
participant Service as Sync Service
participant Engine as Sync Engine
Stripe->>Webhook: POST /webhooks/{credential_id}
Note over Webhook: Scan syncs.json for<br/>matching credential_id
Webhook->>Workflow: signal('stripe_event', event)
Webhook-->>Stripe: 200 ok (fire-and-forget)
Note over Workflow: Buffer event
Note over Workflow: Next loop iteration
Workflow->>Activity: run(syncId, [event1, event2, ...])
Activity->>Service: GET /syncs/{id}?include_credentials=true
Service-->>Activity: SyncConfig with creds
Activity->>Engine: POST /sync<br/>X-Sync-Params + NDJSON body (events)
Engine->>Engine: source processes events → destination writes
Engine-->>Activity: NDJSON response stream
Activity-->>Workflow: RunResult
sequenceDiagram
participant User
participant Service as Sync Service
participant Workflow as syncWorkflow
participant Activity
participant Engine as Sync Engine
participant Dest as Destination
User->>Service: POST /syncs (create)
Service->>Workflow: start syncWorkflow(syncId)
Workflow->>Activity: setup(syncId)
Activity->>Service: GET /syncs/{id}?include_credentials=true
Activity->>Engine: POST /setup (X-Sync-Params)
Engine->>Dest: CREATE TABLE / ensure schema
loop Reconciliation loop
Workflow->>Activity: run(syncId)
Activity->>Service: GET /syncs/{id}?include_credentials=true
Activity->>Engine: POST /sync (X-Sync-Params)
Engine->>Engine: load state → read source → write dest → persist state
Engine-->>Activity: NDJSON stream (heartbeat every 50 msgs)
Activity-->>Workflow: RunResult
Note over Workflow: continueAsNew at 500 iterations
end
User->>Service: DELETE /syncs/{id}
Service->>Workflow: signal('delete')
Workflow->>Activity: teardown(syncId)
Activity->>Service: GET /syncs/{id}?include_credentials=true
Activity->>Engine: POST /teardown (X-Sync-Params)
Engine->>Dest: DROP TABLE / cleanup
stateDiagram-v2
[*] --> Setup: phase != 'running'
[*] --> Loop: phase == 'running'<br/>(after continueAsNew)
Setup --> Loop: setup(syncId)
Setup --> Teardown: delete signal<br/>during setup
state Loop {
[*] --> CheckPause
CheckPause --> Paused: paused == true
Paused --> CheckPause: resume signal
CheckPause --> DrainEvents: events buffered
CheckPause --> Backfill: no events
DrainEvents --> CheckIteration: run(syncId, events)
Backfill --> CheckIteration: run(syncId)
CheckIteration --> CheckPause: iteration < 500
CheckIteration --> ContinueAsNew: iteration >= 500
}
Loop --> Teardown: delete signal
Teardown --> [*]: teardown(syncId)
ContinueAsNew --> [*]: continueAsNew(syncId, 'running')
Each server has a single, clearly scoped responsibility:
| Webhook Server | Sync Service | Sync Engine | |
|---|---|---|---|
| Purpose | Public webhook ingress; fan out signals to Temporal | Config CRUD, credential management, config resolution | Stateless sync execution |
| State | None — reads config store to locate matching syncs | Stores configs, credentials | Manages cursor state via selectStateStore |
| Routes | POST /webhooks/{credential_id} |
/syncs, /credentials |
/setup, /sync, /teardown |
| Exposure | Public (Stripe POSTs here) | Internal | Internal |
The webhook server requires only a Temporal client and the config store (read-only) to fan out signals. It never touches credentials or runs connectors.
Activities call GET /syncs/{id}?include_credentials=true on every invocation rather than carrying config in the workflow. This means:
- Workflow stays
syncId-only — lightweightcontinueAsNew - Config changes via
PATCH /syncs/{id}are picked up automatically - No
updateConfigSignalneeded - Credential refresh (if any) is always fresh
- The resolution call is milliseconds; the sync call is seconds to minutes
The engine handles state internally via selectStateStore:
- Engine auto-detects a compatible state store package (
@stripe/sync-state-postgresfor postgres destinations) setupStateStore()creates the_sync_statetable if needed- Engine loads cursors on each run, persists checkpoints during sync
- Activities and workflows never touch state
Input: syncWorkflow(syncId: string, opts?: { phase?: string })
Signals: stripe_event, pause, resume, delete
Query: status → { phase, paused, iteration }
createActivities({ serviceUrl, engineUrl }) returns three activities:
setup(syncId)— resolve from service →POST /setupon enginerun(syncId, input?)— resolve from service →POST /syncon engine (with optional NDJSON body for events)teardown(syncId)— resolve from service →POST /teardownon engine
Runs as a separate process via the CLI:
sync-service worker \
--temporal-address localhost:7233 \
--service-url http://localhost:4020 \
--engine-url http://localhost:4010# Terminal 1: Temporal dev server
temporal server start-dev
# Terminal 2: Sync engine (stateless execution)
sync-engine serve --port 4010
# Terminal 3: Sync service (config CRUD + config resolution)
sync-service serve --port 4020 --temporal-address localhost:7233
# Terminal 4: Webhook server (public ingress)
sync-service webhook --port 4030 --temporal-address localhost:7233
# Terminal 5: Worker
sync-service worker --temporal-address localhost:7233Create a sync — the workflow starts automatically:
# Create sync (via internal service API)
curl -X POST http://localhost:4020/syncs \
-H 'Content-Type: application/json' \
-d '{
"source": { "type": "stripe", "api_key": "sk_test_..." },
"destination": { "type": "postgres", "connection_string": "postgresql://..." },
"streams": [{ "name": "product" }]
}'
# Check workflow status
temporal workflow query --workflow-id sync_<id> --type status
# Pause/resume
curl -X POST http://localhost:4020/syncs/<id>/pause
curl -X POST http://localhost:4020/syncs/<id>/resume
# Delete (triggers teardown)
curl -X DELETE http://localhost:4020/syncs/<id>Point Stripe's webhook dashboard at the webhook server (http://your-host:4030/webhooks/{credential_id}), not the service API.
apps/service/src/__tests__/temporal-workflow.test.ts — uses @temporalio/testing with stubbed activities:
- Setup → reconciliation → delete lifecycle
- Event processing via
stripe_eventsignal - Pause/resume
- Teardown on delete
continueAsNewphase skip
e2e/temporal.test.ts — starts both service + engine servers, uses @temporalio/testing with real Stripe API:
Stripe → Postgres (requires STRIPE_API_KEY):
- Creates sync via service API
- Backfills product from Stripe into Postgres
- Updates a product via Stripe API, signals the event to the workflow
- Verifies the live update landed in Postgres
- Signals delete, verifies teardown (schema dropped)
Stripe → Google Sheets (requires STRIPE_API_KEY + Google OAuth creds):
- Creates sync via service API
- Backfills product into a Google Sheet tab
- Verifies row count and data shape
- Cleans up the test tab
| File | Role |
|---|---|
apps/service/src/api/webhook-app.ts |
createWebhookApp — standalone webhook ingress |
apps/service/src/temporal/types.ts |
RunResult, SyncActivities, WorkflowStatus |
apps/service/src/temporal/activities.ts |
Resolve from service, execute on engine |
apps/service/src/temporal/workflows.ts |
Workflow: signals, queries, main loop |
apps/service/src/temporal/worker.ts |
Worker factory |
apps/service/src/cli/main.ts |
serve, webhook, worker subcommands |
apps/service/src/__tests__/temporal-workflow.test.ts |
Unit tests |
e2e/temporal.test.ts |
E2E tests |