Skip to content

stanislavkozlovski/diskless-kafka-in-python

Repository files navigation

diskless-python-kafka

A bare-bones diskless Kafka, really a leaderless log, built on Oxia and S3-compatible object storage.


Action Link
What is this? The What
Path Walkthroughs Data Paths
Config & Usage Config
HTTP Surface HTTP Services
Observability Metrics, Perf Debugging
Extra Docs Miscellaneous

The What

This repo is a Python implementation of the Leaderless Log Protocol. Concisely, it's a few main files:

The full code map lives in ARCHITECTURE.md.

Note

The repo runtime dependencies live in requirements.txt. The current Oxia baseline is oxia==0.2.1; older 0.1.x clients lacked the session keepalive/recreation behavior the compaction path relies on and could surface SessionNotFound under load.

The whole design outsources most of the hard work to external stores instead of embedding a local replicated log inside the brokers:

Layer Why it matters
S3 S3 stores the actual record bytes: shared WAL blobs under llog/wal-shared/{uuid} first, then compacted per-partition blobs under llog/{topic}/partitions/{partition}/data/compacted/{uuid}.
Oxia Oxia is the linearizable coordination layer. It lets multiple HTTP broker and compactor processes reserve offsets, recover interrupted work, and maintain sparse index entries that map offset ranges to S3 locations.

The index in Oxia is a sparse mapping of offset ranges to S3 objects, so the client knows where to read from S3.

The main write API is the broker HTTP server at POST /produce. One request can carry multiple partitions and multiple records per partition, for example:

{
  "topic_partitions": [
    {"topic": "orders", "partition": 0, "records": ["a", "b"]},
    {"topic": "orders", "partition": 1, "records": ["c"]}
  ]
}

Data Paths

Every topic-partition has its own path of keys in Oxia. The most important is

  • meta/control stores the next sequence/offset counter and is the contention point for brokers writing to the same topic-partition
  • meta/compaction-cursor points at the first offset not yet compacted
  • sparse index entries keyed by batch end offset

The two important ideas to keep in mind are:

  1. Oxia holds metadata and the sparse lookup structure.
  2. Actual record bytes live in S3.

The rest is really just a question of "how do you update both in an order that's consistent and safe across failures?". Well... here are the answers:

Write Path

At the HTTP layer, the broker accepts one request containing many topic-partitions and aggregates them before flush. The lower-level writer primitive is LeaderlessLogWriter.append_shared(groups). Each group is one topic-partition plus a list of payloads, so the writer API itself is still partition-oriented even when the broker submits many partitions together.


Write TLDR

That write path works like this:

  1. The broker accepts the per-partition batches from POST /produce.
  2. The broker-side batching layer waits until either LLOG_BATCH_MAX_BYTES or LLOG_BATCH_MAX_DELAY_MS is hit, with defaults of 8 MiB or 500 ms.
  3. The writer then packs the accumulated partition batches into one shared WAL object in S3.
  4. Each partition gets its own Oxia control state and sparse-index entry pointing at its range inside that shared WAL object.
  5. Later, the compactor rewrites contiguous WAL ranges for one partition into one partition-local compacted object and updates the sparse index to point at that compacted blob instead.

ASCII sketch:

client
  |
  | POST /produce
  v
+---------------------------+
| HTTP broker               |
| topic_partitions[]        |
+---------------------------+
  |
  | aggregate + batch
  | flush at 8 MiB or 500 ms
  v
+---------------------------+
| LeaderlessLogWriter       |
| per-partition groups      |
+---------------------------+
  |
  | 1) write one shared WAL blob
  +-------------------------------> S3
  |                                 llog/wal-shared/{uuid}
  |
  | 2) for each partition:
  |    reserve offsets + persist sparse-index
  v
+----------------------------------------------+
| Oxia                                         |
| orders[0] offsets 1..2 -> shared WAL object  |
| orders[1] offsets 1..1 -> same WAL object    |
+----------------------------------------------+
  |
  | 3) respond with per-partition offsets/results
  v
client

These are the main Oxia keys related to the write path:

Oxia key Purpose
meta/control The main per-partition coordination record. It most importantly holds the offset (sequence) counter.
index/{end_offset} The sparse index. Each key says "the batch ending at this offset lives in this S3 object", so readers can map offset ranges to S3 locations. They find the right key via a so-called CEILING_GET op in Oxia
meta/compaction-cursor The first offset in the partition that has not yet been compacted. The writer initializes it when the partition is created.

1. Partition initialization

Before the first append, initialize_partition(topic, partition) creates:

  • meta/control with log_state=OPEN, sequence_counter=1, and pending=null (offsets start at 1 per partition)
  • meta/compaction-cursor with offset=1

2. Build one shared WAL blob

append_shared() accepts one or more topic-partition groups and does the following:

  1. Normalizes each payload to bytes.
  2. Encodes each group as one inner bytes-batch-v1 body.
  3. Packs all group bodies into one shared S3 object at llog/wal-shared/{uuid}.

3. Reserve offsets in Oxia

After the S3 blob is written, the writer commits each partition independently. For each group, the atomic reservation step is one CAS update on that partition's meta/control key. That single record holds both the next sequence_counter and the optional pending append. The writer reads the current meta/control, computes the next range, then CAS-writes a new meta/control that contains:

  • the start_offset and end_offset now reserved for that partition
  • the new sequence_counter = end_offset + 1
  • the data_key pointing at the shared S3 blob

That says "this partition owns this offset range, and its bytes live in that WAL object". meta/control is the key that moves the offset counter forward. No sparse-index key is touched in that atomic step. This CAS advances sequence_counter immediately, so the write is logically reserved even before persisting the sparse-index. It also means a later writer or compactor can recover the append if the process crashes after reserving offsets but before finishing the sparse-index write.

4. write the sparse index

The writer then writes one sparse index entry at:

  • index/{end_offset}

That index entry stores the same S3 location and message-count metadata. Because the index key is the batch end offset, the reader can reconstruct the covered range as:

  • start_offset = end_offset - msg_count + 1

5. Clear pending state

Once the index entry exists, the writer clears meta/control.pending. At that point the append is fully materialized.

6. crash recovery

If a writer crashes after reserving offsets but before writing the index entry, the append is still recoverable. The next writer, fence/unfence operation, or compactor run will:

  1. read meta/control.pending
  2. write the missing index entry if needed
  3. clear the pending field

That is why the write path is a two-step commit in Oxia rather than a single blind index write.

Read Path

The read API is LeaderlessLogReader.read_offset(...), read_range(...), or iter_from(...).


Read TLDR

That read path works like this:

  1. The reader checks meta/control to validate the partition and high watermark.
  2. It resolves one sparse index entry via Oxia CEILING, or falls back to meta/control.pending.
  3. It fetches one S3 blob or byte range.
  4. It decodes bytes-batch-v1 and returns the requested logical record.

ASCII sketch:

client
  |
  | read topic=orders partition=0 offset=2
  v
+---------------------------+
| LeaderlessLogReader       |
+---------------------------+
  |
  | 1) load control state
  v
+---------------------------+
| Oxia                      |
| meta/control              |
+---------------------------+
  |
  | 2) ceiling lookup in sparse index
  |    offset 2 -> shared WAL object
  v
+----------------------------------------------+
| Oxia                                         |
| index maps offset ranges -> S3 locations     |
+----------------------------------------------+
  |
  | 3) fetch range-based get
  +-------------------------------> S3
  |                                 shared WAL or compacted blob
  |
  | 4) decode the record for offset 2
  v
+---------------------------+
| payload bytes             |
+---------------------------+
  |
  | return payload
  v
client

Main Oxia keys for reads:

Oxia key Purpose
meta/control Tells the reader whether the partition exists, the current high watermark via sequence_counter - 1, and whether there is a recoverable pending append.
index/{end_offset} Sparse index used for a ceiling lookup from requested offset to the S3 object that contains it.

1. Validate against the control record

The reader first loads meta/control:

  • if the partition does not exist, it raises PartitionNotInitialized
  • if the requested offset is >= sequence_counter, it raises OffsetOutOfRange

The readable high watermark is therefore sequence_counter - 1.

2. Resolve the sparse index entry that covers the offset

The reader performs an Oxia CEILING lookup starting at index/{requested_offset}. Because index keys are batch end offsets, the first key at or above the requested offset is the candidate entry.

From that index record the reader reconstructs:

  • end_offset from the key name
  • start_offset from end_offset - msg_count + 1

If the resolved range covers the requested offset, the reader has found the owning WAL or compacted entry.

3. Fall back to pending if needed

If no sparse index entry covers the offset, the reader checks meta/control.pending. That means a logically committed append can still be read even if the writer crashed before sparse-index materialization.

4. Fetch only the required bytes from S3

Once the entry is resolved, the reader fetches the data blob from S3:

  • if byte_offset and byte_length are present, it uses an S3 range GET
  • otherwise it downloads the whole object

For shared WAL blobs, this is what avoids reading unrelated partitions from the same S3 object.

5. Decode the requested record

The current readable encoding is bytes-batch-v1 for both WAL and COMPACTED entries. The reader:

  1. decodes the batch layout
  2. verifies the batch record count matches the index metadata
  3. computes record_index = requested_offset - start_offset
  4. slices the right payload out of the batch body

6. Range reads walk the sparse index in order

iter_from() and read_range() scan the sparse index forward and decode entries in offset order. They will fail with InconsistentReadState if they detect a gap between batches.

Broker /consume fast path

LeaderlessLogReader is the durable fallback path. The HTTP consume path adds a broker-local hot-tail layer in front of it:

  1. BrokerConsumeService checks the local tail cache first for each requested partition.
  2. A cache hit returns recently produced records directly, without touching Oxia or S3.
  3. If the cache knows the requested offset is already past its cached high watermark, it returns a known-empty tail result. Long polls can then sleep and retry locally instead of probing Oxia/S3 on every poll.
  4. Only cache misses, gaps, or evictions fall back to the normal reader path: meta/control -> sparse index or pending -> S3.
  5. During that fallback, one consume request still de-duplicates shared-WAL blob fetches across partitions.
  6. A broker running --role both uses an in-process cache by default. Separate --role write and --role read processes can share the same fast path by pointing both at one LLOG_TAIL_CACHE_DIR.

Compaction Path

The compaction API is LeaderlessLogCompactor.compact_once(topic, partition). It rewrites a contiguous WAL-backed range into one partition-local compacted blob.


Compaction TLDR

That compaction path works like this:

  1. The compactor recovers meta/control.pending and resumes meta/compaction if present.
  2. It reads meta/compaction-cursor and scans sparse index entries from that offset to find one contiguous WAL-backed range.
  3. It range-reads the selected WAL slices from S3 and writes one partition-local compacted blob.
  4. It stores meta/compaction, rewrites the highest sparse-index key to a COMPACTED entry, deletes lower keys, advances the cursor, and clears meta/compaction.
  5. Future reads resolve through the compacted entry, while shared WAL object deletion still needs separate garbage collection.

ASCII sketch:

scheduler / direct caller
  |
  | 0) choose topic-partition
  v
compactor worker
  |
  | compact_once(topic, partition)
  v
+---------------------------+
| LeaderlessLogCompactor    |
+---------------------------+
  |
  | 1) read cursor + scan sparse index
  v
+----------------------------------------------+
| Oxia                                         |
| next WAL-backed offset range for partition   |
+----------------------------------------------+
  |
  | 2) fetch range-based gets for source WALs
  +-------------------------------> S3
  |                                 shared WAL blobs
  |
  | 3) write one compacted blob
  +-------------------------------> S3
  |                                 compacted/{uuid}
  |
  | 4) persist meta/compaction,
  |    CAS end key, delete old keys,
  |    advance cursor
  v
+----------------------------------------------+
| Oxia                                         |
| one COMPACTED index entry + cursor advanced  |
+----------------------------------------------+
  |
  | future reads now resolve to compacted blob
  v
reader / next compactor run

Main Oxia keys for compaction:

Oxia key Purpose
meta/compaction-cursor First offset in the partition that has not yet been compacted.
index/{end_offset} Sparse index scanned to find one contiguous WAL-backed range to rewrite.
meta/control Lets the compactor recover any pending writer append before compacting.
meta/compaction Durable in-flight compaction record used to resume after a crash.
meta/compactor-claim Used by the long-running service so only one worker owns a partition while it runs compact_once(...).

0. Choose a partition to compact

With the one-shot API, the caller passes the topic and partition directly. With the long-running service, the scheduler chooses from configured or discovered partitions, then acquires meta/compactor-claim before it runs compact_once(...).

1. Recover unfinished writer state first

Before selecting a compaction candidate, the compactor checks meta/control.pending. If a writer left a pending append behind, the compactor materializes the missing sparse index entry first.

This ensures compaction never skips a logically committed WAL batch.

2. Resume unfinished compaction if one exists

The compactor then checks meta/compaction. If present, that record is authoritative and the compactor resumes from its stored state machine instead of starting over.

The states are:

  • WRITING_COMPACTED_INDEX
  • DELETING_OLD
  • UPDATING_CURSOR

3. Select one compactable candidate range

If no pending compaction exists, the compactor:

  1. reads meta/compaction-cursor
  2. scans sparse index entries starting exactly at that cursor
  3. selects one contiguous run of WAL entries

Selection stops when it hits:

  • a gap
  • a COMPACTED entry
  • the configured max_offsets limit

If the first readable entry at the cursor is not a WAL entry or does not start exactly at the cursor, there is no candidate.

4. Rebuild the bytes into one compacted blob

For each selected WAL entry, the compactor:

  1. uses the stored byte range to issue range GETs against S3
  2. rebuilds the selected records into one new compacted blob

That compacted blob is written to:

  • llog/{topic}/partitions/{partition}/data/compacted/{uuid}

5. Create the recoverable compaction record

Before mutating the sparse index, the compactor writes meta/compaction. That record stores:

  • the compacted start_offset and end_offset
  • the compacted S3 URI

At a high level, that record is the durable breadcrumb that lets another compactor resume the same work after a crash.

6. Replace many WAL index entries with one compacted entry

Think of compaction as taking, say, 5 sparse index entries for one partition range. It atomically replaces only the highest-offset sparse index key with one COMPACTED entry spanning the whole range. It does not atomically replace all 5 keys at once.

After that succeeds, it deletes the lower 4 sparse index keys in a follow-up delete_range() step. During that brief intermediate state, reads may still resolve through the older lower keys, which is still correct because they point at the same logical records.

Once the lower keys are gone, the single compacted end key represents the whole compacted range.

7. Advance the compaction cursor

Once old keys are deleted, the compactor advances:

  • meta/compaction-cursor = end_offset + 1

Then it clears meta/compaction.

At that point the compacted range is fully committed and future compactor runs start after it. For this partition range, future reads will now resolve through the compacted entry instead of the old per-batch WAL keys.

That does not automatically make the old shared WAL S3 object safe to delete. Those WAL blobs can still be shared with other partitions or other uncompacted ranges, so actual object cleanup would need separate garbage collection and reference tracking.

8. Why the compaction path is recoverable

Every compaction stage is idempotent:

  • rewriting the end key is guarded by CAS
  • deleting old keys can be retried
  • advancing the cursor only moves it forward
  • clearing meta/compaction happens last

So any compactor process can safely resume interrupted work.

9. Long-running service behavior

The HTTP compactor service adds one more coordination layer:

  • it periodically discovers partitions by scanning Oxia for meta/control
  • it acquires an ephemeral meta/compactor-claim before calling compact_once(...)

meta/compactor-claim prevents two service workers from compacting the same partition at the same time. meta/compaction remains the durable recovery record for the compaction itself.

Quick Example

from server.leaderless_log_compactor import LeaderlessLogCompactor
from server.leaderless_log_reader import LeaderlessLogReader
from server.leaderless_log_writer import LeaderlessLogWriter, SharedAppendGroup

writer = LeaderlessLogWriter.from_env()
writer.initialize_partition("orders", 0)
writer.append_shared(
    [SharedAppendGroup(topic="orders", partition=0, payloads=[b"a", b"b", b"c"])]
)

reader = LeaderlessLogReader.from_env()
print(reader.read_offset("orders", 0, 1).payload)

compactor = LeaderlessLogCompactor.from_env()
print(compactor.compact_once("orders", 0))

writer.close()
reader.close()
compactor.close()

Config

The writer, reader, and compactor accept direct constructor arguments or can be built from environment variables with from_env().

Environment Setup

  1. Copy .env.example to .env.
  2. Adjust the values.
  3. Load the file into your shell.
  4. Start the writer, reader, broker, or compactor.

Example:

cp .env.example .env
set -a && source .env && set +a

Required for all components:

  • LLOG_OXIA_SERVICE_ADDRESS

Required for the writer, broker, and compactor:

  • LLOG_S3_BUCKET

The reader does not need LLOG_S3_BUCKET because sparse index entries already contain full S3 URIs.

Optional env vars:

  • LLOG_OXIA_NAMESPACE
  • LLOG_ROOT_PREFIX
  • LLOG_OXIA_SESSION_TIMEOUT_MS
  • LLOG_CLIENT_IDENTIFIER
  • LLOG_S3_ENDPOINT_URL
  • LLOG_S3_REGION_NAME
  • LLOG_S3_ACCESS_KEY_ID
  • LLOG_S3_SECRET_ACCESS_KEY
  • LLOG_S3_SESSION_TOKEN
  • LLOG_BATCH_MAX_BYTES
  • LLOG_BATCH_MAX_DELAY_MS
  • LLOG_BATCH_MAX_BUFFER_BYTES
  • LLOG_TAIL_CACHE_DIR
  • LLOG_TAIL_CACHE_MAX_BYTES
  • LLOG_COMPACTOR_BIND_HOST
  • LLOG_COMPACTOR_PORT
  • LLOG_COMPACTOR_ID
  • LLOG_COMPACTOR_TARGETS
  • LLOG_COMPACTOR_WORKERS
  • LLOG_COMPACTOR_DISCOVERY_INTERVAL_MS
  • LLOG_COMPACTOR_IDLE_SLEEP_MS
  • LLOG_COMPACTOR_ERROR_SLEEP_MS
  • LLOG_COMPACTOR_IDLE_JITTER_MS
  • LLOG_COMPACTOR_ERROR_JITTER_MS
  • LLOG_COMPACTOR_MAX_OFFSETS_PER_RUN

AWS fallbacks also work for credentials and region:

  • AWS_REGION
  • AWS_DEFAULT_REGION
  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_SESSION_TOKEN

HTTP Services

Broker

The broker is a thin HTTP adapter over the writer and reader. It supports --role write, --role read, or --role both (the default). It exposes:

  • GET /health
  • GET /metrics
  • GET /metrics/prometheus
  • POST /produce
  • POST /consume

POST /produce accepts a topic_partitions array where each item contains:

  • topic
  • partition
  • records

Each record may be:

  • a UTF-8 string
  • an object of the form {"base64":"..."} for arbitrary bytes

Example:

curl -X POST http://127.0.0.1:8080/produce \
  -H 'content-type: application/json' \
  -d '{
    "topic_partitions": [
      {
        "topic": "orders",
        "partition": 0,
        "records": ["alpha", "beta"]
      },
      {
        "topic": "orders",
        "partition": 1,
        "records": [{"base64": "AAE="}]
      }
    ]
  }'

The broker lazily initializes partitions on first write and keeps a process-local cache of successful initializations. It also wraps the writer with an in-process BatchingProducer that flushes on either:

  • LLOG_BATCH_MAX_BYTES
  • LLOG_BATCH_MAX_DELAY_MS

POST /consume accepts a topic_partitions array where each item contains:

  • topic
  • partition
  • fetch_offset
  • optional partition_max_bytes

Request-level consume controls are:

  • max_wait_ms
  • min_bytes
  • max_bytes

Example:

curl -X POST http://127.0.0.1:8080/consume \
  -H 'content-type: application/json' \
  -d '{
    "topic_partitions": [
      {
        "topic": "orders",
        "partition": 0,
        "fetch_offset": 1,
        "partition_max_bytes": 1048576
      },
      {
        "topic": "orders",
        "partition": 1,
        "fetch_offset": 1
      }
    ],
    "max_wait_ms": 250,
    "min_bytes": 1,
    "max_bytes": 4194304
  }'

Consume responses are per partition, return high_watermark and next_fetch_offset, and encode each record as either a UTF-8 payload string or a base64 object for arbitrary bytes. POST /consume checks the local tail cache before it falls back to Oxia/S3, and a cached known-empty tail result keeps long polls local at the hot tail. When multiple requested partitions share one WAL blob, the broker fetches that blob once per request and reuses it across those partitions. When one broker runs both roles, it keeps a bounded in-process tail cache of recently produced records. Set LLOG_TAIL_CACHE_DIR to switch that cache to a shared file-backed mode across writer and reader processes; consume loadtests set this to <run-dir>/tail-cache automatically. Set LLOG_TAIL_CACHE_MAX_BYTES=0 to disable either cache mode, or set it to another byte cap; the default is 536870912.

The HTTP contract is in server/HTTP_API.md.

Compactor Service

The long-running compactor service exposes:

  • GET /health
  • GET /metrics
  • GET /metrics/prometheus

Its target set is:

  • the configured LLOG_COMPACTOR_TARGETS
  • plus partitions discovered by scanning Oxia for meta/control

Each worker acquires an ephemeral meta/compactor-claim, runs compact_once(...), then releases the claim. If the process dies, Oxia session expiry clears the claim automatically.

Metrics

The broker and compactor service both expose:

  • JSON metrics at GET /metrics
  • Prometheus text at GET /metrics/prometheus

Exported metrics include:

  • broker metadata and batch settings
  • loadtest scenario metadata and static run-shape gauges during orchestrated loadtest runs
  • compactor metadata, worker counts, discovery state, and claim counters
  • HTTP request counters and request validation failures
  • batching depth, flush counts, queue wait, and shared WAL blob sizes
  • compaction run outcomes, lag, pending state, recovery events, candidate shape, and stage timings
  • Oxia per-operation counters and latency totals
  • S3 per-operation counters, bytes, and latency totals
  • S3 billing estimates under s3.billing, including:
    • request-cost estimate derived from observed put, list, get, and range_get counts
    • current object count and stored bytes under the configured LLOG_ROOT_PREFIX
    • current stored GB and estimated monthly storage cost for that footprint

Current S3 billing assumptions are hardcoded and intentionally simple:

  • S3 Standard pricing in us-east-1
  • $0.023 per GB-month for storage
  • $0.005 per 1,000 PUT/COPY/POST/LIST requests
  • $0.004 per 10,000 GET and other requests

The bucket-size metrics come from cached list_objects_v2 scans against the configured S3 prefix. Those scans are counted as operation="list" in the exported S3 operation metrics.

The estimate does not include:

  • data transfer in or out
  • replication
  • lifecycle transitions
  • KMS
  • taxes, discounts, support, or account-level billing adjustments

Metrics are process-local only. Request-cost counters can be summed across processes. Bucket-size and monthly-storage-cost gauges describe the shared bucket prefix and should be aggregated with max, not sum, across brokers or compactors. A ready-to-import Grafana dashboard is in grafana/leaderless-log-observability.json. For a one-command local review flow, run quick_run.sh; it starts repo-local Prometheus and Grafana with provisioning from loadtest/observability/docker-compose.yml, wires the active broker ports into Prometheus file_sd, and serves the provisioned dashboard at http://127.0.0.1:3000. The load-test metrics, logs, traces, and debugging flow are cataloged in PERF_DEBUGGING.md.

Miscellaneous

Docs

Primary docs in this repo:

Notes

  • The S3 bucket must already exist.
  • The writer, reader, and compactor do not auto-load .env; load it in the shell first.
  • append_shared(groups) always stores bytes-batch-v1 bodies inside a shared WAL blob.
  • A one-partition write is represented as append_shared([SharedAppendGroup(...)]).
  • The sparse index is keyed by batch end offset, not batch start offset.
  • The compactor only rewrites contiguous WAL ranges starting exactly at meta/compaction-cursor.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors