A bare-bones diskless Kafka, really a leaderless log, built on Oxia and S3-compatible object storage.
| Action | Link |
|---|---|
| What is this? | The What |
| Path Walkthroughs | Data Paths |
| Config & Usage | Config |
| HTTP Surface | HTTP Services |
| Observability | Metrics, Perf Debugging |
| Extra Docs | Miscellaneous |
This repo is a Python implementation of the Leaderless Log Protocol. Concisely, it's a few main files:
server/leaderless_log_broker.pyis the minimal HTTP broker exposingPOST /produce, backed by the Python writer pathserver/leaderless_log_writer.pyimplements the append pathserver/leaderless_log_reader.pyimplements the read pathserver/leaderless_log_consume.pyimplements the broker consume fast path and long-poll behaviorserver/leaderless_log_compactor_server.pyruns the long-lived compaction workers over the same metadata and blobs
The full code map lives in ARCHITECTURE.md.
Note
The repo runtime dependencies live in requirements.txt. The current Oxia baseline is oxia==0.2.1; older 0.1.x clients lacked the session keepalive/recreation behavior the compaction path relies on and could surface SessionNotFound under load.
The whole design outsources most of the hard work to external stores instead of embedding a local replicated log inside the brokers:
| Layer | Why it matters |
|---|---|
| S3 | S3 stores the actual record bytes: shared WAL blobs under llog/wal-shared/{uuid} first, then compacted per-partition blobs under llog/{topic}/partitions/{partition}/data/compacted/{uuid}. |
| Oxia | Oxia is the linearizable coordination layer. It lets multiple HTTP broker and compactor processes reserve offsets, recover interrupted work, and maintain sparse index entries that map offset ranges to S3 locations. |
The index in Oxia is a sparse mapping of offset ranges to S3 objects, so the client knows where to read from S3.
The main write API is the broker HTTP server at POST /produce.
One request can carry multiple partitions and multiple records per partition, for example:
{
"topic_partitions": [
{"topic": "orders", "partition": 0, "records": ["a", "b"]},
{"topic": "orders", "partition": 1, "records": ["c"]}
]
}Every topic-partition has its own path of keys in Oxia. The most important is
meta/controlstores the next sequence/offset counter and is the contention point for brokers writing to the same topic-partitionmeta/compaction-cursorpoints at the first offset not yet compacted- sparse index entries keyed by batch end offset
The two important ideas to keep in mind are:
- Oxia holds metadata and the sparse lookup structure.
- Actual record bytes live in S3.
The rest is really just a question of "how do you update both in an order that's consistent and safe across failures?". Well... here are the answers:
At the HTTP layer, the broker accepts one request containing many topic-partitions and aggregates them before flush.
The lower-level writer primitive is LeaderlessLogWriter.append_shared(groups).
Each group is one topic-partition plus a list of payloads, so the writer API itself is still partition-oriented even when the broker submits many partitions together.
That write path works like this:
- The broker accepts the per-partition batches from
POST /produce. - The broker-side batching layer waits until either
LLOG_BATCH_MAX_BYTESorLLOG_BATCH_MAX_DELAY_MSis hit, with defaults of8 MiBor500 ms. - The writer then packs the accumulated partition batches into one shared WAL object in S3.
- Each partition gets its own Oxia control state and sparse-index entry pointing at its range inside that shared WAL object.
- Later, the compactor rewrites contiguous WAL ranges for one partition into one partition-local compacted object and updates the sparse index to point at that compacted blob instead.
ASCII sketch:
client
|
| POST /produce
v
+---------------------------+
| HTTP broker |
| topic_partitions[] |
+---------------------------+
|
| aggregate + batch
| flush at 8 MiB or 500 ms
v
+---------------------------+
| LeaderlessLogWriter |
| per-partition groups |
+---------------------------+
|
| 1) write one shared WAL blob
+-------------------------------> S3
| llog/wal-shared/{uuid}
|
| 2) for each partition:
| reserve offsets + persist sparse-index
v
+----------------------------------------------+
| Oxia |
| orders[0] offsets 1..2 -> shared WAL object |
| orders[1] offsets 1..1 -> same WAL object |
+----------------------------------------------+
|
| 3) respond with per-partition offsets/results
v
client
These are the main Oxia keys related to the write path:
| Oxia key | Purpose |
|---|---|
meta/control |
The main per-partition coordination record. It most importantly holds the offset (sequence) counter. |
index/{end_offset} |
The sparse index. Each key says "the batch ending at this offset lives in this S3 object", so readers can map offset ranges to S3 locations. They find the right key via a so-called CEILING_GET op in Oxia |
meta/compaction-cursor |
The first offset in the partition that has not yet been compacted. The writer initializes it when the partition is created. |
Before the first append, initialize_partition(topic, partition) creates:
meta/controlwithlog_state=OPEN,sequence_counter=1, andpending=null(offsets start at1per partition)meta/compaction-cursorwithoffset=1
append_shared() accepts one or more topic-partition groups and does the following:
- Normalizes each payload to
bytes. - Encodes each group as one inner
bytes-batch-v1body. - Packs all group bodies into one shared S3 object at
llog/wal-shared/{uuid}.
After the S3 blob is written, the writer commits each partition independently.
For each group, the atomic reservation step is one CAS update on that partition's meta/control key.
That single record holds both the next sequence_counter and the optional pending append.
The writer reads the current meta/control, computes the next range, then CAS-writes a new meta/control that contains:
- the
start_offsetandend_offsetnow reserved for that partition - the new
sequence_counter = end_offset + 1 - the
data_keypointing at the shared S3 blob
That says "this partition owns this offset range, and its bytes live in that WAL object".
meta/control is the key that moves the offset counter forward.
No sparse-index key is touched in that atomic step.
This CAS advances sequence_counter immediately, so the write is logically reserved even before persisting the sparse-index.
It also means a later writer or compactor can recover the append if the process crashes after reserving offsets but before finishing the sparse-index write.
The writer then writes one sparse index entry at:
index/{end_offset}
That index entry stores the same S3 location and message-count metadata. Because the index key is the batch end offset, the reader can reconstruct the covered range as:
start_offset = end_offset - msg_count + 1
Once the index entry exists, the writer clears meta/control.pending.
At that point the append is fully materialized.
If a writer crashes after reserving offsets but before writing the index entry, the append is still recoverable. The next writer, fence/unfence operation, or compactor run will:
- read
meta/control.pending - write the missing index entry if needed
- clear the pending field
That is why the write path is a two-step commit in Oxia rather than a single blind index write.
The read API is LeaderlessLogReader.read_offset(...), read_range(...), or iter_from(...).
That read path works like this:
- The reader checks
meta/controlto validate the partition and high watermark. - It resolves one sparse index entry via Oxia
CEILING, or falls back tometa/control.pending. - It fetches one S3 blob or byte range.
- It decodes
bytes-batch-v1and returns the requested logical record.
ASCII sketch:
client
|
| read topic=orders partition=0 offset=2
v
+---------------------------+
| LeaderlessLogReader |
+---------------------------+
|
| 1) load control state
v
+---------------------------+
| Oxia |
| meta/control |
+---------------------------+
|
| 2) ceiling lookup in sparse index
| offset 2 -> shared WAL object
v
+----------------------------------------------+
| Oxia |
| index maps offset ranges -> S3 locations |
+----------------------------------------------+
|
| 3) fetch range-based get
+-------------------------------> S3
| shared WAL or compacted blob
|
| 4) decode the record for offset 2
v
+---------------------------+
| payload bytes |
+---------------------------+
|
| return payload
v
client
Main Oxia keys for reads:
| Oxia key | Purpose |
|---|---|
meta/control |
Tells the reader whether the partition exists, the current high watermark via sequence_counter - 1, and whether there is a recoverable pending append. |
index/{end_offset} |
Sparse index used for a ceiling lookup from requested offset to the S3 object that contains it. |
The reader first loads meta/control:
- if the partition does not exist, it raises
PartitionNotInitialized - if the requested offset is
>= sequence_counter, it raisesOffsetOutOfRange
The readable high watermark is therefore sequence_counter - 1.
The reader performs an Oxia CEILING lookup starting at index/{requested_offset}.
Because index keys are batch end offsets, the first key at or above the requested offset is the candidate entry.
From that index record the reader reconstructs:
end_offsetfrom the key namestart_offsetfromend_offset - msg_count + 1
If the resolved range covers the requested offset, the reader has found the owning WAL or compacted entry.
If no sparse index entry covers the offset, the reader checks meta/control.pending.
That means a logically committed append can still be read even if the writer crashed before sparse-index materialization.
Once the entry is resolved, the reader fetches the data blob from S3:
- if
byte_offsetandbyte_lengthare present, it uses an S3 range GET - otherwise it downloads the whole object
For shared WAL blobs, this is what avoids reading unrelated partitions from the same S3 object.
The current readable encoding is bytes-batch-v1 for both WAL and COMPACTED entries.
The reader:
- decodes the batch layout
- verifies the batch record count matches the index metadata
- computes
record_index = requested_offset - start_offset - slices the right payload out of the batch body
iter_from() and read_range() scan the sparse index forward and decode entries in offset order.
They will fail with InconsistentReadState if they detect a gap between batches.
LeaderlessLogReader is the durable fallback path.
The HTTP consume path adds a broker-local hot-tail layer in front of it:
BrokerConsumeServicechecks the local tail cache first for each requested partition.- A cache hit returns recently produced records directly, without touching Oxia or S3.
- If the cache knows the requested offset is already past its cached high watermark, it returns a known-empty tail result. Long polls can then sleep and retry locally instead of probing Oxia/S3 on every poll.
- Only cache misses, gaps, or evictions fall back to the normal reader path:
meta/control-> sparse index orpending-> S3. - During that fallback, one consume request still de-duplicates shared-WAL blob fetches across partitions.
- A broker running
--role bothuses an in-process cache by default. Separate--role writeand--role readprocesses can share the same fast path by pointing both at oneLLOG_TAIL_CACHE_DIR.
The compaction API is LeaderlessLogCompactor.compact_once(topic, partition).
It rewrites a contiguous WAL-backed range into one partition-local compacted blob.
That compaction path works like this:
- The compactor recovers
meta/control.pendingand resumesmeta/compactionif present. - It reads
meta/compaction-cursorand scans sparse index entries from that offset to find one contiguous WAL-backed range. - It range-reads the selected WAL slices from S3 and writes one partition-local compacted blob.
- It stores
meta/compaction, rewrites the highest sparse-index key to aCOMPACTEDentry, deletes lower keys, advances the cursor, and clearsmeta/compaction. - Future reads resolve through the compacted entry, while shared WAL object deletion still needs separate garbage collection.
ASCII sketch:
scheduler / direct caller
|
| 0) choose topic-partition
v
compactor worker
|
| compact_once(topic, partition)
v
+---------------------------+
| LeaderlessLogCompactor |
+---------------------------+
|
| 1) read cursor + scan sparse index
v
+----------------------------------------------+
| Oxia |
| next WAL-backed offset range for partition |
+----------------------------------------------+
|
| 2) fetch range-based gets for source WALs
+-------------------------------> S3
| shared WAL blobs
|
| 3) write one compacted blob
+-------------------------------> S3
| compacted/{uuid}
|
| 4) persist meta/compaction,
| CAS end key, delete old keys,
| advance cursor
v
+----------------------------------------------+
| Oxia |
| one COMPACTED index entry + cursor advanced |
+----------------------------------------------+
|
| future reads now resolve to compacted blob
v
reader / next compactor run
Main Oxia keys for compaction:
| Oxia key | Purpose |
|---|---|
meta/compaction-cursor |
First offset in the partition that has not yet been compacted. |
index/{end_offset} |
Sparse index scanned to find one contiguous WAL-backed range to rewrite. |
meta/control |
Lets the compactor recover any pending writer append before compacting. |
meta/compaction |
Durable in-flight compaction record used to resume after a crash. |
meta/compactor-claim |
Used by the long-running service so only one worker owns a partition while it runs compact_once(...). |
With the one-shot API, the caller passes the topic and partition directly.
With the long-running service, the scheduler chooses from configured or discovered partitions, then acquires meta/compactor-claim before it runs compact_once(...).
Before selecting a compaction candidate, the compactor checks meta/control.pending.
If a writer left a pending append behind, the compactor materializes the missing sparse index entry first.
This ensures compaction never skips a logically committed WAL batch.
The compactor then checks meta/compaction.
If present, that record is authoritative and the compactor resumes from its stored state machine instead of starting over.
The states are:
WRITING_COMPACTED_INDEXDELETING_OLDUPDATING_CURSOR
If no pending compaction exists, the compactor:
- reads
meta/compaction-cursor - scans sparse index entries starting exactly at that cursor
- selects one contiguous run of
WALentries
Selection stops when it hits:
- a gap
- a
COMPACTEDentry - the configured
max_offsetslimit
If the first readable entry at the cursor is not a WAL entry or does not start exactly at the cursor, there is no candidate.
For each selected WAL entry, the compactor:
- uses the stored byte range to issue range GETs against S3
- rebuilds the selected records into one new compacted blob
That compacted blob is written to:
llog/{topic}/partitions/{partition}/data/compacted/{uuid}
Before mutating the sparse index, the compactor writes meta/compaction.
That record stores:
- the compacted
start_offsetandend_offset - the compacted S3 URI
At a high level, that record is the durable breadcrumb that lets another compactor resume the same work after a crash.
Think of compaction as taking, say, 5 sparse index entries for one partition range.
It atomically replaces only the highest-offset sparse index key with one COMPACTED entry spanning the whole range.
It does not atomically replace all 5 keys at once.
After that succeeds, it deletes the lower 4 sparse index keys in a follow-up delete_range() step.
During that brief intermediate state, reads may still resolve through the older lower keys, which is still correct because they point at the same logical records.
Once the lower keys are gone, the single compacted end key represents the whole compacted range.
Once old keys are deleted, the compactor advances:
meta/compaction-cursor = end_offset + 1
Then it clears meta/compaction.
At that point the compacted range is fully committed and future compactor runs start after it. For this partition range, future reads will now resolve through the compacted entry instead of the old per-batch WAL keys.
That does not automatically make the old shared WAL S3 object safe to delete. Those WAL blobs can still be shared with other partitions or other uncompacted ranges, so actual object cleanup would need separate garbage collection and reference tracking.
Every compaction stage is idempotent:
- rewriting the end key is guarded by CAS
- deleting old keys can be retried
- advancing the cursor only moves it forward
- clearing
meta/compactionhappens last
So any compactor process can safely resume interrupted work.
The HTTP compactor service adds one more coordination layer:
- it periodically discovers partitions by scanning Oxia for
meta/control - it acquires an ephemeral
meta/compactor-claimbefore callingcompact_once(...)
meta/compactor-claim prevents two service workers from compacting the same partition at the same time. meta/compaction remains the durable recovery record for the compaction itself.
from server.leaderless_log_compactor import LeaderlessLogCompactor
from server.leaderless_log_reader import LeaderlessLogReader
from server.leaderless_log_writer import LeaderlessLogWriter, SharedAppendGroup
writer = LeaderlessLogWriter.from_env()
writer.initialize_partition("orders", 0)
writer.append_shared(
[SharedAppendGroup(topic="orders", partition=0, payloads=[b"a", b"b", b"c"])]
)
reader = LeaderlessLogReader.from_env()
print(reader.read_offset("orders", 0, 1).payload)
compactor = LeaderlessLogCompactor.from_env()
print(compactor.compact_once("orders", 0))
writer.close()
reader.close()
compactor.close()The writer, reader, and compactor accept direct constructor arguments or can be built from environment variables with from_env().
- Copy
.env.exampleto.env. - Adjust the values.
- Load the file into your shell.
- Start the writer, reader, broker, or compactor.
Example:
cp .env.example .env
set -a && source .env && set +aRequired for all components:
LLOG_OXIA_SERVICE_ADDRESS
Required for the writer, broker, and compactor:
LLOG_S3_BUCKET
The reader does not need LLOG_S3_BUCKET because sparse index entries already contain full S3 URIs.
Optional env vars:
LLOG_OXIA_NAMESPACELLOG_ROOT_PREFIXLLOG_OXIA_SESSION_TIMEOUT_MSLLOG_CLIENT_IDENTIFIERLLOG_S3_ENDPOINT_URLLLOG_S3_REGION_NAMELLOG_S3_ACCESS_KEY_IDLLOG_S3_SECRET_ACCESS_KEYLLOG_S3_SESSION_TOKENLLOG_BATCH_MAX_BYTESLLOG_BATCH_MAX_DELAY_MSLLOG_BATCH_MAX_BUFFER_BYTESLLOG_TAIL_CACHE_DIRLLOG_TAIL_CACHE_MAX_BYTESLLOG_COMPACTOR_BIND_HOSTLLOG_COMPACTOR_PORTLLOG_COMPACTOR_IDLLOG_COMPACTOR_TARGETSLLOG_COMPACTOR_WORKERSLLOG_COMPACTOR_DISCOVERY_INTERVAL_MSLLOG_COMPACTOR_IDLE_SLEEP_MSLLOG_COMPACTOR_ERROR_SLEEP_MSLLOG_COMPACTOR_IDLE_JITTER_MSLLOG_COMPACTOR_ERROR_JITTER_MSLLOG_COMPACTOR_MAX_OFFSETS_PER_RUN
AWS fallbacks also work for credentials and region:
AWS_REGIONAWS_DEFAULT_REGIONAWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN
The broker is a thin HTTP adapter over the writer and reader.
It supports --role write, --role read, or --role both (the default).
It exposes:
GET /healthGET /metricsGET /metrics/prometheusPOST /producePOST /consume
POST /produce accepts a topic_partitions array where each item contains:
topicpartitionrecords
Each record may be:
- a UTF-8 string
- an object of the form
{"base64":"..."}for arbitrary bytes
Example:
curl -X POST http://127.0.0.1:8080/produce \
-H 'content-type: application/json' \
-d '{
"topic_partitions": [
{
"topic": "orders",
"partition": 0,
"records": ["alpha", "beta"]
},
{
"topic": "orders",
"partition": 1,
"records": [{"base64": "AAE="}]
}
]
}'The broker lazily initializes partitions on first write and keeps a process-local cache of successful initializations.
It also wraps the writer with an in-process BatchingProducer that flushes on either:
LLOG_BATCH_MAX_BYTESLLOG_BATCH_MAX_DELAY_MS
POST /consume accepts a topic_partitions array where each item contains:
topicpartitionfetch_offset- optional
partition_max_bytes
Request-level consume controls are:
max_wait_msmin_bytesmax_bytes
Example:
curl -X POST http://127.0.0.1:8080/consume \
-H 'content-type: application/json' \
-d '{
"topic_partitions": [
{
"topic": "orders",
"partition": 0,
"fetch_offset": 1,
"partition_max_bytes": 1048576
},
{
"topic": "orders",
"partition": 1,
"fetch_offset": 1
}
],
"max_wait_ms": 250,
"min_bytes": 1,
"max_bytes": 4194304
}'Consume responses are per partition, return high_watermark and next_fetch_offset, and encode each record as either a UTF-8 payload string or a base64 object for arbitrary bytes.
POST /consume checks the local tail cache before it falls back to Oxia/S3, and a cached known-empty tail result keeps long polls local at the hot tail.
When multiple requested partitions share one WAL blob, the broker fetches that blob once per request and reuses it across those partitions.
When one broker runs both roles, it keeps a bounded in-process tail cache of recently produced records.
Set LLOG_TAIL_CACHE_DIR to switch that cache to a shared file-backed mode across writer and reader processes; consume loadtests set this to <run-dir>/tail-cache automatically.
Set LLOG_TAIL_CACHE_MAX_BYTES=0 to disable either cache mode, or set it to another byte cap; the default is 536870912.
The HTTP contract is in server/HTTP_API.md.
The long-running compactor service exposes:
GET /healthGET /metricsGET /metrics/prometheus
Its target set is:
- the configured
LLOG_COMPACTOR_TARGETS - plus partitions discovered by scanning Oxia for
meta/control
Each worker acquires an ephemeral meta/compactor-claim, runs compact_once(...), then releases the claim.
If the process dies, Oxia session expiry clears the claim automatically.
The broker and compactor service both expose:
- JSON metrics at
GET /metrics - Prometheus text at
GET /metrics/prometheus
Exported metrics include:
- broker metadata and batch settings
- loadtest scenario metadata and static run-shape gauges during orchestrated loadtest runs
- compactor metadata, worker counts, discovery state, and claim counters
- HTTP request counters and request validation failures
- batching depth, flush counts, queue wait, and shared WAL blob sizes
- compaction run outcomes, lag, pending state, recovery events, candidate shape, and stage timings
- Oxia per-operation counters and latency totals
- S3 per-operation counters, bytes, and latency totals
- S3 billing estimates under
s3.billing, including:- request-cost estimate derived from observed
put,list,get, andrange_getcounts - current object count and stored bytes under the configured
LLOG_ROOT_PREFIX - current stored GB and estimated monthly storage cost for that footprint
- request-cost estimate derived from observed
Current S3 billing assumptions are hardcoded and intentionally simple:
- S3 Standard pricing in
us-east-1 $0.023per GB-month for storage$0.005per 1,000PUT/COPY/POST/LISTrequests$0.004per 10,000GETand other requests
The bucket-size metrics come from cached list_objects_v2 scans against the configured S3 prefix.
Those scans are counted as operation="list" in the exported S3 operation metrics.
The estimate does not include:
- data transfer in or out
- replication
- lifecycle transitions
- KMS
- taxes, discounts, support, or account-level billing adjustments
Metrics are process-local only.
Request-cost counters can be summed across processes.
Bucket-size and monthly-storage-cost gauges describe the shared bucket prefix and should be aggregated with max, not sum, across brokers or compactors.
A ready-to-import Grafana dashboard is in grafana/leaderless-log-observability.json.
For a one-command local review flow, run quick_run.sh; it starts repo-local Prometheus and Grafana with provisioning from loadtest/observability/docker-compose.yml, wires the active broker ports into Prometheus file_sd, and serves the provisioned dashboard at http://127.0.0.1:3000.
The load-test metrics, logs, traces, and debugging flow are cataloged in PERF_DEBUGGING.md.
Primary docs in this repo:
- The S3 bucket must already exist.
- The writer, reader, and compactor do not auto-load
.env; load it in the shell first. append_shared(groups)always storesbytes-batch-v1bodies inside a shared WAL blob.- A one-partition write is represented as
append_shared([SharedAppendGroup(...)]). - The sparse index is keyed by batch end offset, not batch start offset.
- The compactor only rewrites contiguous WAL ranges starting exactly at
meta/compaction-cursor.