A simplified version of the Airbyte Protocol, mostly compatible, optimized for real-time streams.
The core ideas are the same:
- Message types —
record,state,catalog,log,error. Discriminated bytype, serialized as NDJSON. - Source / Destination — connectors implement
spec,check,discover,read(source) orwrite(destination). - Configured catalog — user selects streams from the discovered catalog, sets
sync_mode(full_refresh / incremental) anddestination_sync_mode(append / overwrite / append_dedup). - State checkpoints — source emits
StateMessagebetween records. Destination re-emits after committing. Orchestrator persists for resume. - NDJSON wire format — one JSON object per line. Works for both in-process async iterators and subprocess stdin/stdout pipes.
If you've built an Airbyte connector, the mental model transfers directly.
Airbyte has three state modes: global, per-stream, and per-stream-per-partition. We only have per-stream.
State is a flat Record<string, TStreamState> map keyed by stream name. No mode negotiation, no partition keys.
// Airbyte: array of state messages with type/stream_descriptor/global discriminators
[
{ type: "STREAM", stream_descriptor: { name: "customer", namespace: "public" }, stream_state: { cursor: "..." } },
{ type: "GLOBAL", global: { shared_state: { cdc_lsn: "..." }, stream_states: [...] } }
]
// Ours: flat map
{ customer: { cursor: "cus_999" }, invoice: { cursor: "inv_500" } }Airbyte uses namespace (typically a database schema) alongside stream name. We use Stream.metadata instead — a generic bag where sources put whatever context applies (schema, database, api_version, account_id, etc.).
Airbyte has AirbyteControlMessage (connector version upgrades, config updates) and AirbyteTraceMessage (structured traces with timing). We fold both into LogMessage and ErrorMessage with level and failure_type discriminators. Fewer message types, same information.
Airbyte connectors declare a protocol version and the platform negotiates compatibility. We skip this — connectors are in-process TypeScript modules with compile-time type checking. Subprocess connectors use the same NDJSON format without version headers.
Airbyte's orchestrator is a platform service (Temporal workflows, Kubernetes pods, connection manager). Our engine is a pure function:
async function* runSync(config, source, destination): AsyncIterable<StateMessage>No database, no filesystem, no module discovery. The caller imports source and destination explicitly. Platform concerns (scheduling, state persistence, retries) live in the orchestrator layer above.
Airbyte's read is always finite — it runs, emits records, exits. Our read() returns AsyncIterable<Message> which can be:
- Finite — backfill, same as Airbyte. Read all records, emit state, done.
- Infinite — live/streaming. Webhooks, CDC, WebSocket, event bridge. The iterator never returns.
Same interface, same message types. The source decides the duration. A source can even transition from finite to infinite mid-stream (backfill then live).
Real-time sources come in two flavors:
| Pattern | Example | Who manages the connection | read() behavior |
|---|---|---|---|
| Encapsulated | WebSocket, CDC, polling | Source opens and manages it | Infinite iterator, no input |
| Inversion of control | Webhooks | External system pushes events in | Called per-event with input |
Airbyte only supports the encapsulated pattern. We support both through a single read() method:
// Encapsulated: source manages its own WebSocket connection
source.read({ config, catalog, state }) // infinite iterator
// Inversion of control: orchestrator receives webhook, passes payload in
source.read({ config, catalog, state, input: webhookEvent }) // finite, one eventConnectorSpecification.input declares the JSON Schema for the event payload, so the orchestrator can validate webhook bodies before passing them to the source.
Live sources often need external resources provisioned before read() can work:
| Source type | setup() provisions |
teardown() cleans up |
|---|---|---|
| Stripe webhooks | Creates webhook endpoint via API | Deletes webhook endpoint |
| Postgres CDC | Creates replication slot | Drops replication slot |
| File watcher | Registers inotify watch | Removes watch |
Airbyte has no lifecycle hooks — everything happens inside read(). We make provisioning explicit because:
setup()runs once on sync creation, not on every readteardown()runs on sync deletion, not on pause (so resume is instant)- Multiple syncs can share a resource (e.g. one webhook endpoint per Stripe account) —
teardown()checks for other active consumers before deleting
Both methods are optional. Pull-based sources (REST API polling) don't need them.
Airbyte uses namespace for one piece of source context (typically a schema name). We use Stream.metadata for arbitrary source-specific fields:
// Stripe source
{ api_version: "2025-04-30.basil", account_id: "acct_123", live_mode: true }
// Postgres source
{ schema: "public", database: "mydb" }Destinations can use metadata for schema naming, partitioning, routing, etc.
A Transform is (messages: AsyncIterable<Message>) => AsyncIterable<Message>. Transforms compose with compose(a, b, c) (left-to-right piping) and can filter, map, buffer, or aggregate messages between source and destination.
Airbyte has no transform concept — all transformation happens inside the destination or in a separate dbt step.
Airbyte connectors are Docker containers communicating via stdin/stdout. Our connectors are TypeScript modules with typed interfaces:
import source from '@stripe/sync-source-stripe'
import destination from '@stripe/sync-destination-postgres'
for await (const state of runSync(config, source, destination)) {
persist(state)
}Subprocess mode is an adapter layer — the same NDJSON protocol, but the primary path is in-process async iterators with full type safety.
The protocol is a subset of Airbyte's message vocabulary with extensions for real-time. Wrapping an Airbyte connector requires:
- Messages — rename
typevalues (RECORD→record, etc.) and flatten the envelope (Airbyte wraps every message in{ type: "...", record: { ... } }; we use flat discriminated unions). - State — convert from Airbyte's
AirbyteStateMessagearray to ourRecord<string, unknown>map. - Catalog — field names are nearly identical (
stream_name→name, addprimary_key). - Spec/Check — trivial mapping.
The reverse direction (wrapping our connector for Airbyte) is equally straightforward since we're a subset of their message types.