Concurrent CDK Engine Technical Diagrams #911
Brian Lai (brianjlai)
started this conversation in
Show and tell
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Concurrent CDK: Testing autp-generating diagrams from claude summaries
Create a GitHub-flavored markdown document titled "The Life of a Partition: From Generation to Record Reading in the Airbyte CDK". The audience is software engineers reading this on GitHub. The document should heavily feature Mermaid diagrams — especially sequence diagrams, flowcharts, and class/entity diagrams — rendered using GitHub's built-in Mermaid support (
```mermaidcode blocks). Every section must include at least one Mermaid diagram. Keep prose concise and let the diagrams do the heavy lifting.Section 1 — Overview: What Is a Partition?
A partition is the fundamental unit of work in Airbyte's concurrent sync engine. It represents one "slice" of a stream's data — for example, one date range for an incremental stream, or one region for a geographically partitioned API.
The entire concurrent sync can be understood by following a partition's journey through four stages:
Include this Mermaid flowchart (left-to-right):
flowchart LR A["1. Generation Worker thread produces partitions from slicer"] --> B["2. Queuing Partition placed onto shared queue"] B --> C["3. Scheduling Main thread picks it up, submits to thread pool"] C --> D["4. Reading Worker thread calls retriever.read_records()"]This pipeline is powered by three actors: worker threads (that generate partitions and read records), a shared queue (that passes work between threads), and a main thread (that orchestrates everything by consuming queue items).
Section 2 — The Architecture: Three Actors and a Queue
Include this Mermaid flowchart showing the central architecture:
flowchart TB subgraph Workers["Worker Threads"] PE["Partition Generator\n(PartitionEnqueuer)"] PR["Partition Reader\n(PartitionReader)"] end Q["Shared Queue\n(thread-safe, max 10,000 items)"] subgraph Main["Main Thread (Orchestrator)"] ML["Event Loop\nwhile item = queue.get()"] Route["Route by item type"] ML --> Route end PE -- "Partition items\n+ GenerationCompletedSentinel" --> Q PR -- "Record items\n+ PartitionCompleteSentinel\n+ StreamThreadException" --> Q Q -- "pulls items" --> ML Route -- "submits read tasks" --> PR Route -- "submits generation tasks" --> PEAt the center is a shared Queue (a thread-safe Python Queue with a max size of 10,000 items). It connects three actors:
whileloop pulling items from the queue. It never generates partitions or reads records itself — it only dispatches work and processes results.PartitionEnqueuer.generate_partitions(). They iterate over a stream's slices and push each resulting Partition onto the queue.PartitionReader.process_partition(). They callpartition.read()which ultimately callsretriever.read_records()to hit the API. Each record is pushed onto the queue.All three actors communicate exclusively through the queue. There are no direct thread-to-thread calls.
Section 3 — The Queue's Vocabulary: Six Item Types
Include this Mermaid flowchart showing producers, the queue, and the main thread's routing:
flowchart LR subgraph Producers PE["PartitionEnqueuer\n(worker thread)"] PR["PartitionReader\n(worker thread)"] CMR["ConcurrentMessageRepository"] end Q[/"Shared Queue"/] subgraph MainThread["Main Thread Routing"] direction TB H1["on_partition()\n→ submit read task"] H2["on_record()\n→ yield AirbyteMessage"] H3["on_partition_complete_sentinel()\n→ track completion"] H4["on_partition_generation_completed()\n→ start next stream"] H5["on_exception()\n→ log + flag error"] H6["yield directly\n→ pass-through"] end PE -- "Partition" --> Q PE -- "PartitionGenerationCompletedSentinel" --> Q PE -- "StreamThreadException" --> Q PR -- "Record" --> Q PR -- "PartitionCompleteSentinel" --> Q PR -- "StreamThreadException" --> Q CMR -- "AirbyteMessage" --> Q Q --> H1 Q --> H2 Q --> H3 Q --> H4 Q --> H5 Q --> H6There are six item types, and together they form the complete coordination protocol between threads:
1. Partition
PartitionEnqueuer— created by callingpartition_factory.create(stream_slice).PartitionReader.process_partition(partition, cursor)to the thread pool.2. Record
PartitionReader— yielded bypartition.read()→retriever.read_records().AirbyteMessageand yields as output.3. PartitionCompleteSentinel
PartitionReader— emitted afterpartition.read()completes (success or failure).4. PartitionGenerationCompletedSentinel
PartitionEnqueuer— emitted after the stream's slicer yields its last slice.5. StreamThreadException
PartitionEnqueuerorPartitionReader— on unhandled exceptions. The relevant completion sentinel is always sent after the exception.6. AirbyteMessage
ConcurrentMessageRepository— when any thread callsemit_message()orlog_message().The two sentinel types are the coordination backbone — without them, the main thread would have no way to know when a worker thread has finished, and the sync would hang forever.
Section 4 — Stage 1: Partition Generation
Include this Mermaid sequence diagram showing the generation call chain:
sequenceDiagram participant MT as Main Thread participant TP as Thread Pool participant PE as PartitionEnqueuer<br/>(Worker Thread) participant DS as DefaultStream participant SG as StreamSlicerPartitionGenerator participant SS as StreamSlicer participant PF as DeclarativePartitionFactory participant Q as Queue MT->>TP: submit generate_partitions(stream) TP->>PE: execute in worker thread PE->>DS: stream.generate_partitions() DS->>SG: partition_generator.generate() SG->>SS: stream_slicer.stream_slices() loop For each StreamSlice SS-->>SG: yield StreamSlice SG->>PF: partition_factory.create(slice) PF-->>SG: DeclarativePartition Note over SG: Check futures limit<br/>(sleep 100ms if >= 10,000) SG->>Q: queue.put(Partition) end PE->>Q: queue.put(PartitionGenerationCompletedSentinel) Note over PE,Q: Always sent, even on errorEach
DeclarativePartitionis a self-contained work unit holding:{"start_date": "2024-01-01", "end_date": "2024-02-01"})Throttling: Before generating each partition, the enqueuer checks if the thread pool has too many pending tasks (>= 10,000 futures). If so, it sleeps for 100ms and retries.
If an exception occurs, a
StreamThreadExceptionis queued first, followed by the sentinel (the sentinel must always be sent, or the main thread hangs).Section 5 — Stage 2: The Main Thread's Event Loop
Include this Mermaid flowchart showing the routing logic:
flowchart TD Start["queue.get()\n(blocks until item available)"] --> Check{Item Type?} Check -->|Partition| P["on_partition()\n• Register as running\n• Submit PartitionReader\n to thread pool"] Check -->|Record| R["on_record()\n• Convert to AirbyteMessage\n• Yield as output\n• Emit RUNNING status on first record"] Check -->|PartitionCompleteSentinel| PCS["on_partition_complete_sentinel()\n• Remove from running set\n• If generation done + no running\n partitions → finalize stream"] Check -->|PartitionGenerationCompletedSentinel| PGCS["on_partition_generation_completed()\n• Remove from generating list\n• Start next stream's generation\n if any waiting"] Check -->|StreamThreadException| E["on_exception()\n• Log exception\n• Flag stream as errored\n• Yield error AirbyteMessage"] Check -->|AirbyteMessage| M["yield directly\n(pass-through)"] P --> Start R --> Start PCS --> Start PGCS --> Start E --> Start M --> StartThe critical handoff: when a Partition arrives, the main thread submits it for reading and immediately goes back to the queue. It does NOT read the partition itself.
The loop exits when the queue is empty and all streams are done.
Section 6 — Stage 3: Partition Reading and retriever.read_records()
Include this Mermaid sequence diagram showing the reading call chain:
sequenceDiagram participant MT as Main Thread participant TP as Thread Pool participant PR as PartitionReader<br/>(Worker Thread) participant DP as DeclarativePartition participant RT as Retriever participant API as External API participant CU as Cursor participant Q as Queue MT->>TP: submit process_partition(partition, cursor) TP->>PR: execute in worker thread PR->>DP: partition.read() DP->>RT: retriever.read_records(schema, stream_slice) loop For each page of results RT->>API: HTTP Request API-->>RT: HTTP Response loop For each record in response RT-->>DP: yield Record DP-->>PR: yield Record PR->>Q: queue.put(Record) PR->>CU: cursor.observe(record) end end PR->>CU: cursor.close_partition(partition) Note over CU,Q: Merges partition state into checkpoint<br/>(uses lock for thread safety),<br/>emits state message via ConcurrentMessageRepository PR->>Q: queue.put(PartitionCompleteSentinel)DeclarativePartition.read()is the bridge between the concurrent framework and the declarative retriever. Callingself._retriever.read_records(schema, stream_slice)is the exact moment where the abstract "partition" concept becomes a real HTTP API call.If an exception occurs, a
StreamThreadExceptionis queued followed by the sentinel — same safety pattern as the enqueuer.Section 7 — Putting It All Together: End-to-End Flow
Include this Mermaid sequence diagram — this is the key diagram of the entire document. It shows the complete lifecycle of a single partition across all three actors:
sequenceDiagram participant MT as Main Thread participant Q as Queue participant WA as Worker Thread A<br/>(Generator) participant WB as Worker Thread B<br/>(Reader) Note over MT: Sync starts MT->>WA: Submit generate_partitions(stream) WA->>WA: stream_slicer.stream_slices() WA->>WA: partition_factory.create(slice) WA->>Q: put(Partition) Q->>MT: get() → Partition MT->>MT: Register partition as running MT->>WB: Submit process_partition(partition) WB->>WB: partition.read() → retriever.read_records() loop For each record from API WB->>Q: put(Record) Q->>MT: get() → Record MT->>MT: Convert to AirbyteMessage, yield as output end WB->>WB: cursor.close_partition() Note over WB,Q: State checkpoint emitted<br/>via ConcurrentMessageRepository WB->>Q: put(PartitionCompleteSentinel) WA->>Q: put(PartitionGenerationCompletedSentinel) Q->>MT: get() → PartitionCompleteSentinel MT->>MT: Remove partition from running set Q->>MT: get() → PartitionGenerationCompletedSentinel MT->>MT: Generation done, no running partitions → finalize stream Note over MT: Emit COMPLETE status + final state checkpointKey insight: The partition crosses the queue boundary twice — once as a
Partitionitem (generation → main thread), and once implicitly asRecorditems flowing back (reading → main thread). The queue is the membrane between generation, scheduling, and reading.Section 8 — Thread Safety and Flow Control
Include these two Mermaid diagrams side by side (they'll render sequentially on GitHub).
Backpressure mechanism:
flowchart TD A["PartitionEnqueuer:\nnext slice ready"] --> B{"Thread pool futures\n>= 10,000?"} B -->|Yes| C["Sleep 100ms"] C --> B B -->|No| D{"Queue full?\n(max 10,000 items)"} D -->|Yes| E["queue.put() blocks\nuntil main thread drains"] D -->|No| F["queue.put(Partition)\nPartition enqueued successfully"] E --> FDeadlock prevention:
flowchart TD A["System constraint:\ninitial_partition_generators < num_workers"] --> B["Why?"] B --> C["Generators and readers\nshare the same thread pool"] C --> D{"What if all workers\nare generators?"} D --> E["Generators fill queue\nwith Partition items"] E --> F["Queue fills up →\ngenerators block on queue.put()"] F --> G["Main thread waits for\nRecord items that never come"] G --> H["DEADLOCK"] D --> I["Reserve at least 1 worker\nfor reading → deadlock impossible"]Two mechanisms keep the system healthy:
queue.put()blocks when full.initial_number_of_partitions_to_generate < num_workersensures generators never monopolize the thread pool.Section 9 — Key Takeaway
Include this final Mermaid flowchart as the summary architecture diagram:
flowchart LR subgraph Generate PE["Partition\nGenerator\nThreads"] end subgraph Schedule MT["Main Thread\n(Orchestrator)"] end subgraph Read PR["Partition\nReader\nThreads"] end Q[/"Shared Queue"/] PE -- "Partition\nitems" --> Q Q -- "pull + route" --> MT MT -- "submit\nread tasks" --> PR PR -- "Record items\n+ Sentinels" --> Q PE -- "GenerationCompletedSentinel" --> Q PR -- "PartitionCompleteSentinel" --> Q MT -. "submit generation\ntasks" .-> PEThe concurrent sync engine is a producer-consumer system with a twist: the main thread is both a consumer (of queue items) and a dispatcher (submitting new work to the thread pool). It never does heavy lifting itself — it only routes and tracks.
Partitions are the unit of parallelism. Each one independently flows through:
The queue is the single point of coordination, and the six item types — especially the two sentinels — are the signaling mechanism that keeps everything in sync.
End of document prompt.
Beta Was this translation helpful? Give feedback.
All reactions