This repo currently contains:
oxia-server/: the Oxia serverrequirements.txt: the pinned Python client/runtime dependencies, includingoxialeaderless-log-protocol/: the protocol specification and examplesserver/leaderless_log_compactor.py: the Python compactorserver/leaderless_log_compactor_server.py: the long-running Python compactor serviceserver/leaderless_log_broker.py: a minimal HTTP broker adapter for produce and consume requestsserver/leaderless_log_common.py: shared leaderless metadata, path, S3 config, and batch codec helpers
The local application-level architecture in this repo is a Python leaderless log writer, reader, compactor, and broker adapter that use Oxia as:
- the coordination store
- the sparse index store
WAL payload bytes are stored in S3-compatible object storage.
Components:
- Oxia server
- Oxia Python client
- Python writer module at
server/leaderless_log_writer.py - shared schema/path/S3 config/batch codec module at
server/leaderless_log_common.py - one in-process
BatchingProducerlayer inside the writer module for broker-side size/time batching
The unit of a log is a topic-partition:
topic: strpartition: int
Each topic-partition is its own WAL with its own:
- log state
- offset sequence
- sparse index
- compaction cursor
Direct writer API:
append_shared(groups)accepts one or more topic-partition groups- each group is encoded as one inner
bytes-batch-v1body - all group bodies are packed into one shared S3 WAL blob under
llog/wal-shared/{uuid} - the writer commits each partition independently by:
- CASing
meta/controlwith apendingrecord that includes the shareddata_key - storing
byte_offsetandbyte_lengthso the partition points at its slice inside the shared blob - materializing the sparse index entry at the batch end offset
- clearing pending state
- CASing
Shared WAL blob layout:
- 4-byte magic:
LLS1 - 4-byte big-endian header length
- JSON header with:
versioncreated_at_mspartitions[], each with:topicpartitionmsg_countencodingbody_offsetbody_length
- concatenated per-partition
bytes-batch-v1bodies
Broker-side batching path:
- the broker initializes partitions eagerly
- the broker submits all request batches to
BatchingProducer.append_shared() BatchingProducercoalesces groups across threads until either:LLOG_BATCH_MAX_BYTESis reachedLLOG_BATCH_MAX_DELAY_MSelapses from the first buffered record
- one flusher thread calls the real writer
append_shared()with one aggregated group per topic-partition - waiter futures are fanned back out to the original per-request partition results
Components:
- Oxia server
- Oxia Python client
- Python compactor module at
server/leaderless_log_compactor.py - shared schema/path/S3 config/batch codec module at
server/leaderless_log_common.py
Compaction path:
- recover
meta/control.pendingif a writer crash left a logically committed append unmaterialized - read
meta/compaction-cursorand scan the sparse index from that offset - select one contiguous WAL-backed range that starts exactly at the cursor
- read the source WAL blobs from S3 and rewrite them into one compacted S3 blob
- create
meta/compactionas the recoverable in-flight compaction record - overwrite the range end index key with one
COMPACTEDentry delete_range()the lower sparse index keys in the compacted span- advance
meta/compaction-cursor - clear
meta/compaction
Crash recovery model:
meta/compactionis authoritative for interrupted compactions- state
WRITING_COMPACTED_INDEXmeans the end-key overwrite may still need to happen - state
DELETING_OLDmeans the end key is already compacted and lower keys still need deleting - state
UPDATING_CURSORmeans deletes are done and the cursor still needs advancing - any compactor process can safely resume an interrupted compaction because each step is idempotent
- the pending
meta/compactionrecord stores the full compacted range and the last selected WAL entry start offset so recovery can verify the end-key overwrite against the correct tail entry
Components:
- one-file Python compactor service module at
server/leaderless_log_compactor_server.py - one scheduler thread
- one bounded worker pool
- one or more
LeaderlessLogCompactorclient instances created lazily per worker thread - Oxia server
- S3-compatible object storage
- in-process JSON metrics snapshot module at
server/leaderless_log_metrics.py
Scheduler path:
- start from a configured seed target set
- periodically discover newly initialized partitions by scanning Oxia for
meta/control - random-shuffle the current known target set on each scheduler pass
- submit at most one in-process task per topic-partition at a time
- each worker acquires
meta/compactor-claimas an Oxia ephemeral record before it callscompact_once(...) - if claim acquisition fails, the worker treats the partition as busy and moves on
- if claim acquisition succeeds, the worker runs
compact_once(...) - the worker releases
meta/compactor-claimwhen the attempt finishes - if the worker process dies, Oxia session expiry clears the claim automatically
Compactor service HTTP surface:
GET /healthGET /metricsGET /metrics/prometheus
Operational boundaries:
meta/compactor-claimis the scheduler-level ownership primitive for multi-server compactor workersmeta/compactionremains the authoritative recovery record for interrupted compaction steps- periodic discovery is additive in the current implementation: it adds newly seen partitions and does not eagerly prune old ones
- compactor metrics are process-local and exported directly from the same HTTP server process, including native compaction run, lag, pending, recovery, volume, candidate, stage timing, and S3 billing-estimate metrics
- the long-running service preserves the one-shot
compact_once(...)path for debugging and recovery work
Components:
- one-file Python broker module at
server/leaderless_log_broker.py - one shared
BatchingProducerinstance wrappingLeaderlessLogWriterwhen the broker role includeswrite - one shared
BrokerConsumeServiceinstance wrappingLeaderlessLogReaderwhen the broker role includesread - one bounded tail cache for recently written records, in-process for same-process
write+readbrokers and file-backed when writer/reader processes shareLLOG_TAIL_CACHE_DIR - Oxia server
- S3-compatible object storage
- in-process JSON metrics snapshot module at
server/leaderless_log_metrics.py
The consume path checks that tail cache before Oxia/S3 and can treat "known empty at cached tail" as a broker-local long-poll wait condition.
HTTP surface:
GET /healthGET /metricsPOST /producePOST /consumePOST /producerequest body containstopic_partitions- each produce topic-partition object contains:
topicpartitionrecords
- each produce record is either:
- a UTF-8 string
- an object with a single
base64field for arbitrary bytes
POST /consumerequest body containstopic_partitions- each consume topic-partition object contains:
topicpartitionfetch_offset- optional
partition_max_bytes
- consume request-level controls are:
max_wait_msmin_bytesmax_bytes
Health contract:
{
"status": "ok",
"broker_id": "broker-1",
"host": "127.0.0.1",
"port": 8080,
"started_at_ms": 1760000000000
}Request contract:
{
"topic_partitions": [
{
"topic": "orders",
"partition": 0,
"records": [
"alpha",
{"base64": "AAE="}
]
}
]
}Record encoding rules:
- JSON strings are encoded to bytes with UTF-8 before calling the writer
{"base64":"..."}is decoded to raw bytes before calling the writerrecordsmust be a non-empty listtopic_partitionsmust be a non-empty listtopicmust be a non-empty stringpartitionmust be a non-negative integer
Produce response contract:
{
"results": [
{
"topic": "orders",
"partition": 0,
"ok": true,
"start_offset": 1,
"end_offset": 2,
"count": 2,
"index_key": "llog/orders/partitions/0/index/00000000000000000002",
"wal_uri": "s3://leaderless-log-wal/llog/wal-shared/<uuid>"
}
],
"success_count": 1,
"error_count": 0
}Error semantics:
400 Bad Requestfor malformed JSON or invalid request shape404 Not Foundfor unknown paths200 OKwhen every topic-partition batch succeeds503 Service Unavailablewhen every topic-partition batch fails withBackPressureRejected409 Conflictfor other partial or full failures- mixed outcomes are returned as per-batch result objects with
ok: false,error_type, anderror
Implementation note:
- the contract is implemented directly in
server/leaderless_log_broker.py - the human-readable summary lives in
server/HTTP_API.md
Broker write path:
- accept one produce request with multiple topic-partition batches
- lazily initialize
meta/controlandmeta/compaction-cursoron the first write seen for that topic-partition in the broker process - cache successfully initialized topic-partitions in memory so later writes skip the Oxia existence check
- convert each topic-partition object into one shared-append group
- call
append_shared()once for the full request - return per-topic-partition offsets and WAL/index references from the shared flush outcome
Broker consume path:
- check the local tail cache first for each requested topic-partition
- return cached records directly when the requested offsets are still inside the retained hot tail
- when the cache knows the fetch offset is already past its cached high watermark, keep the request in broker-local long-poll instead of re-reading Oxia/S3 at the tail
- only on cache miss, gap, or eviction, fall back to
LeaderlessLogReaderformeta/control, sparse-index orpendingresolution, and S3 fetches - de-duplicate shared-WAL blob downloads inside one consume request when multiple partitions resolve to the same
data_key
Execution model:
- the HTTP server uses Python stdlib
ThreadingHTTPServer - request handling is multi-threaded at the HTTP layer
- broker handler threads do not serialize produce calls with a process-local lock
- consume requests are also handled on those HTTP worker threads and can long-poll independently
Components:
- Python orchestrator module at
loadtest/orchestrate.py - Python partition bootstrap module at
loadtest/precreate.py - Python metrics collector at
loadtest/metrics_collector.py - Python routing helper module at
loadtest/routing.py - Go HTTP load generator at
loadtest/http_hammer.go - Python quick-observability wrapper at
loadtest/quick_observability.py - repo-local Prometheus and Grafana compose stack at
loadtest/observability/docker-compose.yml - local MinIO process or container
- local Oxia coordinator plus dataserver processes, or one standalone Oxia process
- local broker process fanout
Run directory model:
- every scenario run writes to
loadtest/results/<utc-timestamp>-<scenario-name>/ - the run directory stores the resolved scenario, broker layout, ports, stack metadata, git/environment snapshots, logs, routing manifest, and metrics artifacts
- the orchestrator treats
stack.jsonplusresolved-config.jsonas the source of truth for laterstatus,precreate,run, anddowncommands
Routing model:
- owner sets are computed once per run with rendezvous hashing over
(topic, partition, broker_id, seed) - the canonical mapping is written to
routing-manifest.json - the hammer only routes a partition to brokers inside that partition's owner set
round_robinandleast_inflightselection happen inside that owner set- if a broker becomes unavailable, remap stays inside the original owner set; no global rebalance is introduced
Execution flow:
orchestrate.py upprepares the run directory, writes stack metadata, starts MinIO, starts Oxia, optionally pre-creates partitions, starts brokers, and waits for healthorchestrate.py runstarts the stack, starts the collector, runs the Go hammer, forces one final collector sample, and tears the stack down unless the scenario keeps it alivequick_run.shcallsquick_observability.py, which starts the local Prometheus and Grafana compose stack, callsstart_stack(...), writes the live broker ports into Prometheus file_sd, and waits until Prometheus sees the loadtest metrics before it starts the hammermetrics_collector.pyscrapes broker/metrics, samples tracked local processes plus host counters, writesmetrics-30s.jsonl, and rolls those samples intosummary.jsonhttp_hammer.gowritesclient-30s.jsonl,client-summary.json, andremap-events.jsonl- brokers launched by the orchestrator also export static loadtest scenario info and gauge series on
/metrics/prometheusso Grafana can show the active run shape live
Reconciliation model:
client-summary.jsonis the client intent and success record- broker
/metricsprovide broker accepted records/bytes plus Oxia counters, S3 operation counters, and S3 billing-estimate fields summary.jsonjoins the latest broker totals, client summary, artifact presence, and reconciliation warnings so another engineer can audit a run without guessingBatchingProducerowns the shared buffer, waiter futures, and one background flusher threadBatchingProduceralso owns one process-local set of successfully initialized topic-partitions- direct calls into the underlying writer are serialized inside
BatchingProducerso Oxia/S3 client use stays single-threaded at the write boundary - the broker exports one process-local JSON metrics snapshot directly from the same HTTP server process
- the broker metrics cover broker identity, roles, batch settings, loadtest scenario metadata when present, HTTP counters, consume counters, batcher queue and flush counters, shared WAL blob size totals, Oxia/S3 write-path counters, and S3 billing estimates
- the broker keeps no durable state of its own beyond the existing Oxia and S3 writes performed by
LeaderlessLogWriter
Operational boundaries:
- the broker is a produce adapter over the existing writer path and a consume adapter over the existing reader path
- there is no cross-topic-partition atomic commit
- one failed topic-partition batch does not roll back successful batches in the same HTTP request
- partition initialization is idempotent and retried until one successful initialization is observed by the current broker process
- the initialized-partition cache is process-local only and is rebuilt after broker restart
- the metrics surface is process-local and adds no new persisted Oxia keys or S3 objects, but it does issue cached
list_objects_v2scans under the configured S3 prefix to estimate current stored bytes and objects - the shared metrics object can also be passed into
LeaderlessLogReaderandLeaderlessLogCompactorfor embedded Oxia/S3 instrumentation outside the broker - the local tail cache is opportunistic: misses, gaps, and evictions fall back to Oxia/S3, but a cached high-watermark-only result lets hot-tail long polls stay local; cross-process hits require writer and reader processes to share the same local
LLOG_TAIL_CACHE_DIR - the broker does not currently expose fencing APIs or compaction APIs beyond health and metrics
- the quick Grafana flow is local-only: Prometheus scrapes the host brokers through Docker Desktop loopback using
host.docker.internal, and the wrapper clears the file_sd targets again on teardown
S3 billing-estimate semantics:
- request cost is derived from observed S3
put,list,get, andrange_getcounters listoperations are generated by the bucket-usage refresher itself and are included in the request-cost estimate- current stored bytes and object count are derived from cached prefix scans against
s3://<bucket>/<LLOG_ROOT_PREFIX>/... - pricing is hardcoded to S3 Standard
us-east-1assumptions for visibility only - request-cost counters can be summed across processes, but bucket-size and storage-cost gauges should be aggregated with
max
Components:
- Oxia server
- Oxia Python client
- Python reader module at
server/leaderless_log_reader.py - shared schema/path/S3 config/batch codec module at
server/leaderless_log_common.py
Read path:
- read
meta/controlto getsequence_counterandpending - for random reads, use Oxia
CEILINGlookup onindex/{offset_20d} - for forward scans, use Oxia
range_scan()over theindex/prefix - if the sparse index does not cover the requested tail offset, fall back to
meta/control.pending - if the resolved entry has
byte_offsetandbyte_length, issue an S3 range GET for just that slice - decode the resulting
bytes-batch-v1payload and return the requested logical record
Logical root:
llog/{topic}/partitions/{partition}/- shared WAL objects also live outside the topic-partition subtree at
llog/wal-shared/{uuid}
The broker HTTP adapter adds no new persisted keys. It writes through the same control, index, and S3 WAL layout described below.
The compactor service adds one ephemeral scheduler key, meta/compactor-claim.
-
llog/{topic}/partitions/{partition}/meta/control- JSON:
log_state:"OPEN"or"FENCED"sequence_counter: integerpending: object or null
- JSON:
-
llog/{topic}/partitions/{partition}/meta/compaction-cursor- JSON:
offset: integer
- JSON:
-
llog/{topic}/partitions/{partition}/meta/compaction- JSON or absent:
compaction_id: string UUIDstate:"WRITING_COMPACTED_INDEX","DELETING_OLD", or"UPDATING_CURSOR"start_offset: integerend_offset: integermsg_count: integerdata_key: full compacted S3 URI stringencoding:"bytes-batch-v1"created_at_ms: integer
- JSON or absent:
-
llog/{topic}/partitions/{partition}/meta/compactor-claim- JSON or absent:
compactor_id: stringclaimed_at_ms: integer
- the key is written as an Oxia ephemeral record by the long-running compactor service
- the key is used as a scheduler-level claim before
compact_once(...) - the key is not part of compaction crash recovery and can disappear on client shutdown or session expiry
- JSON or absent:
llog/wal-shared/{uuid}- S3 object key
- every new WAL write stores one shared blob containing one or more partition-local
bytes-batch-v1bodies - readers and the compactor use
byte_offsetandbyte_lengthfrom the index or pending entry to read the correct partition slice
The full S3 URI is:
s3://{bucket}/llog/wal-shared/{uuid}
Legacy partition-local WAL keys can still exist from older data and remain readable:
s3://{bucket}/llog/{topic}/partitions/{partition}/data/wal/{uuid}
llog/{topic}/partitions/{partition}/data/compacted/{uuid}- S3 object key
- stores one
bytes-batch-v1envelope containing the full compacted logical range
The full S3 URI is:
s3://{bucket}/llog/{topic}/partitions/{partition}/data/compacted/{uuid}
llog/{topic}/partitions/{partition}/index/{end_offset_20d}- JSON:
type:"WAL"or"COMPACTED"msg_count: integerdata_key: full S3 URI stringencoding:"bytes-batch-v1"byte_offset: integer, optional, only for shared WAL entriesbyte_length: integer, optional, only for shared WAL entriescreated_at_ms: integer
- JSON:
The index is sparse:
- one key per append operation
- the key is stored at the batch end offset
- a batch of
Nmessages ending at offsetEcovers offsets[E - N + 1, E]
meta/control.pending is authoritative during crash recovery.
It contains:
append_idstart_offsetend_offsetmsg_countentry_typedata_keyas full S3 URIencodingbyte_offsetas integer whendata_keypoints into a shared WAL blobbyte_lengthas integer whendata_keypoints into a shared WAL blobcreated_at_ms
The reader also treats pending as a logically readable tail entry.
meta/compaction is authoritative during compactor crash recovery.
It contains:
compaction_idstatestart_offsetend_offsetmsg_countdata_keyas full S3 URIencodingcreated_at_ms
The long-running compactor service uses meta/control as the periodic discovery anchor.
Reason:
- partition initialization already creates
meta/control - the service can scan the Oxia keyspace for
llog/{topic}/partitions/{partition}/meta/control - parsing those keys yields the currently known initialized topic-partitions without introducing a second catalog
All Oxia records for a topic-partition use:
partition_key = f"{topic}:{partition}"
That keeps one topic-partition as one Oxia-routed coordination stream.
S3 WAL objects are not routed by Oxia. They use the object key layout above.
Not implemented yet:
- vacuum of orphan S3 objects