Skip to content

Latest commit

 

History

History
516 lines (393 loc) · 21.3 KB

File metadata and controls

516 lines (393 loc) · 21.3 KB

Architecture

Scope

This repo currently contains:

  • oxia-server/: the Oxia server
  • requirements.txt: the pinned Python client/runtime dependencies, including oxia
  • leaderless-log-protocol/: the protocol specification and examples
  • server/leaderless_log_compactor.py: the Python compactor
  • server/leaderless_log_compactor_server.py: the long-running Python compactor service
  • server/leaderless_log_broker.py: a minimal HTTP broker adapter for produce and consume requests
  • server/leaderless_log_common.py: shared leaderless metadata, path, S3 config, and batch codec helpers

The local application-level architecture in this repo is a Python leaderless log writer, reader, compactor, and broker adapter that use Oxia as:

  • the coordination store
  • the sparse index store

WAL payload bytes are stored in S3-compatible object storage.

Writer Architecture

Components:

  1. Oxia server
  2. Oxia Python client
  3. Python writer module at server/leaderless_log_writer.py
  4. shared schema/path/S3 config/batch codec module at server/leaderless_log_common.py
  5. one in-process BatchingProducer layer inside the writer module for broker-side size/time batching

The unit of a log is a topic-partition:

  1. topic: str
  2. partition: int

Each topic-partition is its own WAL with its own:

  1. log state
  2. offset sequence
  3. sparse index
  4. compaction cursor

Direct writer API:

  1. append_shared(groups) accepts one or more topic-partition groups
  2. each group is encoded as one inner bytes-batch-v1 body
  3. all group bodies are packed into one shared S3 WAL blob under llog/wal-shared/{uuid}
  4. the writer commits each partition independently by:
    • CASing meta/control with a pending record that includes the shared data_key
    • storing byte_offset and byte_length so the partition points at its slice inside the shared blob
    • materializing the sparse index entry at the batch end offset
    • clearing pending state

Shared WAL blob layout:

  1. 4-byte magic: LLS1
  2. 4-byte big-endian header length
  3. JSON header with:
    • version
    • created_at_ms
    • partitions[], each with:
      • topic
      • partition
      • msg_count
      • encoding
      • body_offset
      • body_length
  4. concatenated per-partition bytes-batch-v1 bodies

Broker-side batching path:

  1. the broker initializes partitions eagerly
  2. the broker submits all request batches to BatchingProducer.append_shared()
  3. BatchingProducer coalesces groups across threads until either:
    • LLOG_BATCH_MAX_BYTES is reached
    • LLOG_BATCH_MAX_DELAY_MS elapses from the first buffered record
  4. one flusher thread calls the real writer append_shared() with one aggregated group per topic-partition
  5. waiter futures are fanned back out to the original per-request partition results

Compactor Architecture

Components:

  1. Oxia server
  2. Oxia Python client
  3. Python compactor module at server/leaderless_log_compactor.py
  4. shared schema/path/S3 config/batch codec module at server/leaderless_log_common.py

Compaction path:

  1. recover meta/control.pending if a writer crash left a logically committed append unmaterialized
  2. read meta/compaction-cursor and scan the sparse index from that offset
  3. select one contiguous WAL-backed range that starts exactly at the cursor
  4. read the source WAL blobs from S3 and rewrite them into one compacted S3 blob
  5. create meta/compaction as the recoverable in-flight compaction record
  6. overwrite the range end index key with one COMPACTED entry
  7. delete_range() the lower sparse index keys in the compacted span
  8. advance meta/compaction-cursor
  9. clear meta/compaction

Crash recovery model:

  1. meta/compaction is authoritative for interrupted compactions
  2. state WRITING_COMPACTED_INDEX means the end-key overwrite may still need to happen
  3. state DELETING_OLD means the end key is already compacted and lower keys still need deleting
  4. state UPDATING_CURSOR means deletes are done and the cursor still needs advancing
  5. any compactor process can safely resume an interrupted compaction because each step is idempotent
  6. the pending meta/compaction record stores the full compacted range and the last selected WAL entry start offset so recovery can verify the end-key overwrite against the correct tail entry

Compactor Service Architecture

Components:

  1. one-file Python compactor service module at server/leaderless_log_compactor_server.py
  2. one scheduler thread
  3. one bounded worker pool
  4. one or more LeaderlessLogCompactor client instances created lazily per worker thread
  5. Oxia server
  6. S3-compatible object storage
  7. in-process JSON metrics snapshot module at server/leaderless_log_metrics.py

Scheduler path:

  1. start from a configured seed target set
  2. periodically discover newly initialized partitions by scanning Oxia for meta/control
  3. random-shuffle the current known target set on each scheduler pass
  4. submit at most one in-process task per topic-partition at a time
  5. each worker acquires meta/compactor-claim as an Oxia ephemeral record before it calls compact_once(...)
  6. if claim acquisition fails, the worker treats the partition as busy and moves on
  7. if claim acquisition succeeds, the worker runs compact_once(...)
  8. the worker releases meta/compactor-claim when the attempt finishes
  9. if the worker process dies, Oxia session expiry clears the claim automatically

Compactor service HTTP surface:

  1. GET /health
  2. GET /metrics
  3. GET /metrics/prometheus

Operational boundaries:

  1. meta/compactor-claim is the scheduler-level ownership primitive for multi-server compactor workers
  2. meta/compaction remains the authoritative recovery record for interrupted compaction steps
  3. periodic discovery is additive in the current implementation: it adds newly seen partitions and does not eagerly prune old ones
  4. compactor metrics are process-local and exported directly from the same HTTP server process, including native compaction run, lag, pending, recovery, volume, candidate, stage timing, and S3 billing-estimate metrics
  5. the long-running service preserves the one-shot compact_once(...) path for debugging and recovery work

Broker Agent Architecture

Components:

  1. one-file Python broker module at server/leaderless_log_broker.py
  2. one shared BatchingProducer instance wrapping LeaderlessLogWriter when the broker role includes write
  3. one shared BrokerConsumeService instance wrapping LeaderlessLogReader when the broker role includes read
  4. one bounded tail cache for recently written records, in-process for same-process write+read brokers and file-backed when writer/reader processes share LLOG_TAIL_CACHE_DIR
  5. Oxia server
  6. S3-compatible object storage
  7. in-process JSON metrics snapshot module at server/leaderless_log_metrics.py

The consume path checks that tail cache before Oxia/S3 and can treat "known empty at cached tail" as a broker-local long-poll wait condition.

HTTP surface:

  1. GET /health
  2. GET /metrics
  3. POST /produce
  4. POST /consume
  5. POST /produce request body contains topic_partitions
  6. each produce topic-partition object contains:
    • topic
    • partition
    • records
  7. each produce record is either:
    • a UTF-8 string
    • an object with a single base64 field for arbitrary bytes
  8. POST /consume request body contains topic_partitions
  9. each consume topic-partition object contains:
    • topic
    • partition
    • fetch_offset
    • optional partition_max_bytes
  10. consume request-level controls are:
  • max_wait_ms
  • min_bytes
  • max_bytes

Health contract:

{
  "status": "ok",
  "broker_id": "broker-1",
  "host": "127.0.0.1",
  "port": 8080,
  "started_at_ms": 1760000000000
}

Request contract:

{
  "topic_partitions": [
    {
      "topic": "orders",
      "partition": 0,
      "records": [
        "alpha",
        {"base64": "AAE="}
      ]
    }
  ]
}

Record encoding rules:

  1. JSON strings are encoded to bytes with UTF-8 before calling the writer
  2. {"base64":"..."} is decoded to raw bytes before calling the writer
  3. records must be a non-empty list
  4. topic_partitions must be a non-empty list
  5. topic must be a non-empty string
  6. partition must be a non-negative integer

Produce response contract:

{
  "results": [
    {
      "topic": "orders",
      "partition": 0,
      "ok": true,
      "start_offset": 1,
      "end_offset": 2,
      "count": 2,
      "index_key": "llog/orders/partitions/0/index/00000000000000000002",
      "wal_uri": "s3://leaderless-log-wal/llog/wal-shared/<uuid>"
    }
  ],
  "success_count": 1,
  "error_count": 0
}

Error semantics:

  1. 400 Bad Request for malformed JSON or invalid request shape
  2. 404 Not Found for unknown paths
  3. 200 OK when every topic-partition batch succeeds
  4. 503 Service Unavailable when every topic-partition batch fails with BackPressureRejected
  5. 409 Conflict for other partial or full failures
  6. mixed outcomes are returned as per-batch result objects with ok: false, error_type, and error

Implementation note:

  1. the contract is implemented directly in server/leaderless_log_broker.py
  2. the human-readable summary lives in server/HTTP_API.md

Broker write path:

  1. accept one produce request with multiple topic-partition batches
  2. lazily initialize meta/control and meta/compaction-cursor on the first write seen for that topic-partition in the broker process
  3. cache successfully initialized topic-partitions in memory so later writes skip the Oxia existence check
  4. convert each topic-partition object into one shared-append group
  5. call append_shared() once for the full request
  6. return per-topic-partition offsets and WAL/index references from the shared flush outcome

Broker consume path:

  1. check the local tail cache first for each requested topic-partition
  2. return cached records directly when the requested offsets are still inside the retained hot tail
  3. when the cache knows the fetch offset is already past its cached high watermark, keep the request in broker-local long-poll instead of re-reading Oxia/S3 at the tail
  4. only on cache miss, gap, or eviction, fall back to LeaderlessLogReader for meta/control, sparse-index or pending resolution, and S3 fetches
  5. de-duplicate shared-WAL blob downloads inside one consume request when multiple partitions resolve to the same data_key

Execution model:

  1. the HTTP server uses Python stdlib ThreadingHTTPServer
  2. request handling is multi-threaded at the HTTP layer
  3. broker handler threads do not serialize produce calls with a process-local lock
  4. consume requests are also handled on those HTTP worker threads and can long-poll independently

Loadtest Harness Architecture

Components:

  1. Python orchestrator module at loadtest/orchestrate.py
  2. Python partition bootstrap module at loadtest/precreate.py
  3. Python metrics collector at loadtest/metrics_collector.py
  4. Python routing helper module at loadtest/routing.py
  5. Go HTTP load generator at loadtest/http_hammer.go
  6. Python quick-observability wrapper at loadtest/quick_observability.py
  7. repo-local Prometheus and Grafana compose stack at loadtest/observability/docker-compose.yml
  8. local MinIO process or container
  9. local Oxia coordinator plus dataserver processes, or one standalone Oxia process
  10. local broker process fanout

Run directory model:

  1. every scenario run writes to loadtest/results/<utc-timestamp>-<scenario-name>/
  2. the run directory stores the resolved scenario, broker layout, ports, stack metadata, git/environment snapshots, logs, routing manifest, and metrics artifacts
  3. the orchestrator treats stack.json plus resolved-config.json as the source of truth for later status, precreate, run, and down commands

Routing model:

  1. owner sets are computed once per run with rendezvous hashing over (topic, partition, broker_id, seed)
  2. the canonical mapping is written to routing-manifest.json
  3. the hammer only routes a partition to brokers inside that partition's owner set
  4. round_robin and least_inflight selection happen inside that owner set
  5. if a broker becomes unavailable, remap stays inside the original owner set; no global rebalance is introduced

Execution flow:

  1. orchestrate.py up prepares the run directory, writes stack metadata, starts MinIO, starts Oxia, optionally pre-creates partitions, starts brokers, and waits for health
  2. orchestrate.py run starts the stack, starts the collector, runs the Go hammer, forces one final collector sample, and tears the stack down unless the scenario keeps it alive
  3. quick_run.sh calls quick_observability.py, which starts the local Prometheus and Grafana compose stack, calls start_stack(...), writes the live broker ports into Prometheus file_sd, and waits until Prometheus sees the loadtest metrics before it starts the hammer
  4. metrics_collector.py scrapes broker /metrics, samples tracked local processes plus host counters, writes metrics-30s.jsonl, and rolls those samples into summary.json
  5. http_hammer.go writes client-30s.jsonl, client-summary.json, and remap-events.jsonl
  6. brokers launched by the orchestrator also export static loadtest scenario info and gauge series on /metrics/prometheus so Grafana can show the active run shape live

Reconciliation model:

  1. client-summary.json is the client intent and success record
  2. broker /metrics provide broker accepted records/bytes plus Oxia counters, S3 operation counters, and S3 billing-estimate fields
  3. summary.json joins the latest broker totals, client summary, artifact presence, and reconciliation warnings so another engineer can audit a run without guessing
  4. BatchingProducer owns the shared buffer, waiter futures, and one background flusher thread
  5. BatchingProducer also owns one process-local set of successfully initialized topic-partitions
  6. direct calls into the underlying writer are serialized inside BatchingProducer so Oxia/S3 client use stays single-threaded at the write boundary
  7. the broker exports one process-local JSON metrics snapshot directly from the same HTTP server process
  8. the broker metrics cover broker identity, roles, batch settings, loadtest scenario metadata when present, HTTP counters, consume counters, batcher queue and flush counters, shared WAL blob size totals, Oxia/S3 write-path counters, and S3 billing estimates
  9. the broker keeps no durable state of its own beyond the existing Oxia and S3 writes performed by LeaderlessLogWriter

Operational boundaries:

  1. the broker is a produce adapter over the existing writer path and a consume adapter over the existing reader path
  2. there is no cross-topic-partition atomic commit
  3. one failed topic-partition batch does not roll back successful batches in the same HTTP request
  4. partition initialization is idempotent and retried until one successful initialization is observed by the current broker process
  5. the initialized-partition cache is process-local only and is rebuilt after broker restart
  6. the metrics surface is process-local and adds no new persisted Oxia keys or S3 objects, but it does issue cached list_objects_v2 scans under the configured S3 prefix to estimate current stored bytes and objects
  7. the shared metrics object can also be passed into LeaderlessLogReader and LeaderlessLogCompactor for embedded Oxia/S3 instrumentation outside the broker
  8. the local tail cache is opportunistic: misses, gaps, and evictions fall back to Oxia/S3, but a cached high-watermark-only result lets hot-tail long polls stay local; cross-process hits require writer and reader processes to share the same local LLOG_TAIL_CACHE_DIR
  9. the broker does not currently expose fencing APIs or compaction APIs beyond health and metrics
  10. the quick Grafana flow is local-only: Prometheus scrapes the host brokers through Docker Desktop loopback using host.docker.internal, and the wrapper clears the file_sd targets again on teardown

S3 billing-estimate semantics:

  1. request cost is derived from observed S3 put, list, get, and range_get counters
  2. list operations are generated by the bucket-usage refresher itself and are included in the request-cost estimate
  3. current stored bytes and object count are derived from cached prefix scans against s3://<bucket>/<LLOG_ROOT_PREFIX>/...
  4. pricing is hardcoded to S3 Standard us-east-1 assumptions for visibility only
  5. request-cost counters can be summed across processes, but bucket-size and storage-cost gauges should be aggregated with max

Reader Architecture

Components:

  1. Oxia server
  2. Oxia Python client
  3. Python reader module at server/leaderless_log_reader.py
  4. shared schema/path/S3 config/batch codec module at server/leaderless_log_common.py

Read path:

  1. read meta/control to get sequence_counter and pending
  2. for random reads, use Oxia CEILING lookup on index/{offset_20d}
  3. for forward scans, use Oxia range_scan() over the index/ prefix
  4. if the sparse index does not cover the requested tail offset, fall back to meta/control.pending
  5. if the resolved entry has byte_offset and byte_length, issue an S3 range GET for just that slice
  6. decode the resulting bytes-batch-v1 payload and return the requested logical record

Database Schema

Logical root:

  • llog/{topic}/partitions/{partition}/
  • shared WAL objects also live outside the topic-partition subtree at llog/wal-shared/{uuid}

The broker HTTP adapter adds no new persisted keys. It writes through the same control, index, and S3 WAL layout described below. The compactor service adds one ephemeral scheduler key, meta/compactor-claim.

Metadata Keys

  • llog/{topic}/partitions/{partition}/meta/control

    • JSON:
      • log_state: "OPEN" or "FENCED"
      • sequence_counter: integer
      • pending: object or null
  • llog/{topic}/partitions/{partition}/meta/compaction-cursor

    • JSON:
      • offset: integer
  • llog/{topic}/partitions/{partition}/meta/compaction

    • JSON or absent:
      • compaction_id: string UUID
      • state: "WRITING_COMPACTED_INDEX", "DELETING_OLD", or "UPDATING_CURSOR"
      • start_offset: integer
      • end_offset: integer
      • msg_count: integer
      • data_key: full compacted S3 URI string
      • encoding: "bytes-batch-v1"
      • created_at_ms: integer
  • llog/{topic}/partitions/{partition}/meta/compactor-claim

    • JSON or absent:
      • compactor_id: string
      • claimed_at_ms: integer
    • the key is written as an Oxia ephemeral record by the long-running compactor service
    • the key is used as a scheduler-level claim before compact_once(...)
    • the key is not part of compaction crash recovery and can disappear on client shutdown or session expiry

S3 WAL Object Keys

  • llog/wal-shared/{uuid}
    • S3 object key
    • every new WAL write stores one shared blob containing one or more partition-local bytes-batch-v1 bodies
    • readers and the compactor use byte_offset and byte_length from the index or pending entry to read the correct partition slice

The full S3 URI is:

  • s3://{bucket}/llog/wal-shared/{uuid}

Legacy partition-local WAL keys can still exist from older data and remain readable:

  • s3://{bucket}/llog/{topic}/partitions/{partition}/data/wal/{uuid}

S3 Compacted Object Keys

  • llog/{topic}/partitions/{partition}/data/compacted/{uuid}
    • S3 object key
    • stores one bytes-batch-v1 envelope containing the full compacted logical range

The full S3 URI is:

  • s3://{bucket}/llog/{topic}/partitions/{partition}/data/compacted/{uuid}

Index Keys

  • llog/{topic}/partitions/{partition}/index/{end_offset_20d}
    • JSON:
      • type: "WAL" or "COMPACTED"
      • msg_count: integer
      • data_key: full S3 URI string
      • encoding: "bytes-batch-v1"
      • byte_offset: integer, optional, only for shared WAL entries
      • byte_length: integer, optional, only for shared WAL entries
      • created_at_ms: integer

The index is sparse:

  • one key per append operation
  • the key is stored at the batch end offset
  • a batch of N messages ending at offset E covers offsets [E - N + 1, E]

Pending Recovery State

meta/control.pending is authoritative during crash recovery.

It contains:

  • append_id
  • start_offset
  • end_offset
  • msg_count
  • entry_type
  • data_key as full S3 URI
  • encoding
  • byte_offset as integer when data_key points into a shared WAL blob
  • byte_length as integer when data_key points into a shared WAL blob
  • created_at_ms

The reader also treats pending as a logically readable tail entry.

Pending Compaction Recovery State

meta/compaction is authoritative during compactor crash recovery.

It contains:

  • compaction_id
  • state
  • start_offset
  • end_offset
  • msg_count
  • data_key as full S3 URI
  • encoding
  • created_at_ms

Discovery Anchor

The long-running compactor service uses meta/control as the periodic discovery anchor.

Reason:

  • partition initialization already creates meta/control
  • the service can scan the Oxia keyspace for llog/{topic}/partitions/{partition}/meta/control
  • parsing those keys yields the currently known initialized topic-partitions without introducing a second catalog

Sharding Model

All Oxia records for a topic-partition use:

  • partition_key = f"{topic}:{partition}"

That keeps one topic-partition as one Oxia-routed coordination stream.

S3 WAL objects are not routed by Oxia. They use the object key layout above.

Future Extensions

Not implemented yet:

  • vacuum of orphan S3 objects