This document describes the protocol specification for IndexTables4Spark, a high-performance Spark DataSource implementing full-text search using Tantivy via tantivy4java.
IndexTables4Spark stores table metadata in a transaction log directory (_transaction_log/) alongside the data files. The transaction log provides:
- Atomicity: All changes are either fully committed or not visible
- Consistency: Readers always see a consistent snapshot
- Durability: Once committed, changes are persisted
- Time Travel: Historical versions can be accessed (when retained)
<table_path>/
├── splits/ # Data directory (tantivy4java split files)
│ ├── split-<uuid>.split
│ └── ...
├── <partition_column>=<value>/ # Partitioned data (if applicable)
│ └── splits/
│ └── split-<uuid>.split
└── _transaction_log/
├── 00000000000000000000.json # Version 0 (initial protocol + metadata)
├── 00000000000000000001.json # Version 1 (incremental changes)
├── ...
├── _last_checkpoint # Pointer to latest checkpoint
├── manifests/ # Shared manifest directory (Protocol V4+)
│ ├── manifest-<hash>.avro # Reusable Avro manifest files
│ └── ...
└── state-v00000000000000000042/ # Avro state directory (Protocol V4+)
└── _manifest.avro # State manifest (references shared manifests)
| Version | Features |
|---|---|
| 1 | Basic transaction log, AddAction, RemoveAction, MetadataAction |
| 2 | Extended metadata, footer offsets, SkipAction, split metadata |
| 3 | Multi-part checkpoint, schema deduplication, feature flags |
| 4 | Avro state format, partition pruning, 10x faster reads |
- minReaderVersion: Minimum protocol version required to read the table
- minWriterVersion: Minimum protocol version required to write to the table
- Readers/writers must support all features required by the protocol version
Each transaction is recorded in a version file with a 20-digit zero-padded filename:
00000000000000000000.json # Version 0
00000000000000000001.json # Version 1
...
Version files are immutable once written. Concurrent writers use atomic ifNotExists semantics.
Each version file contains one or more newline-delimited JSON objects. Each object represents an Action with a single top-level key indicating the action type:
{"protocol": {...}}
{"metaData": {...}}
{"add": {...}}
{"add": {...}}
{"remove": {...}}Version files may be GZIP compressed (default: enabled). Readers must detect compression by examining file headers.
Specifies the minimum reader and writer versions required for the table.
| Field | Type | Description |
|---|---|---|
minReaderVersion |
Int | Minimum version for readers |
minWriterVersion |
Int | Minimum version for writers |
readerFeatures |
Set[String] | Optional feature flags for readers (V3+) |
writerFeatures |
Set[String] | Optional feature flags for writers (V3+) |
Example:
{
"protocol": {
"minReaderVersion": 4,
"minWriterVersion": 4,
"readerFeatures": ["avroState", "schemaDeduplication"],
"writerFeatures": ["avroState", "schemaDeduplication"]
}
}Feature Flags (V3+):
avroState: Avro state checkpoint formatmultiPartCheckpoint: Multi-part JSON checkpoint supportschemaDeduplication: Schema registry deduplication
Stores table metadata including schema, partitioning, and configuration.
| Field | Type | Description |
|---|---|---|
id |
String | Unique table identifier (UUID) |
name |
String? | Optional table name |
description |
String? | Optional description |
format |
Object | Format specification ({"provider": "indextables"}) |
schemaString |
String | JSON-encoded Apache Spark schema |
partitionColumns |
Array[String] | Column names used for partitioning |
configuration |
Map[String, String] | Key-value configuration |
createdTime |
Long? | Creation timestamp (epoch milliseconds) |
Example:
{
"metaData": {
"id": "550e8400-e29b-41d4-a716-446655440000",
"format": {"provider": "indextables", "options": {}},
"schemaString": "{\"type\":\"struct\",\"fields\":[...]}",
"partitionColumns": ["date"],
"configuration": {
"docMappingSchema.ab12cd34ef56gh78": "[{\"name\":\"title\",...}]"
},
"createdTime": 1704067200000
}
}Configuration Keys:
docMappingSchema.<hash>: Schema registry entries (V3+ schema deduplication)- Custom indexing configuration
Represents a file (split) added to the table.
| Field | Type | Description |
|---|---|---|
path |
String | Relative path to split file |
partitionValues |
Map[String, String] | Partition column values |
size |
Long | File size in bytes |
modificationTime |
Long | Modification timestamp (epoch ms) |
dataChange |
Boolean | Whether this is a data change |
stats |
String? | JSON-encoded statistics |
minValues |
Map[String, String]? | Minimum values per column |
maxValues |
Map[String, String]? | Maximum values per column |
numRecords |
Long? | Number of records in file |
hasFooterOffsets |
Boolean | Whether footer offsets are populated |
footerStartOffset |
Long? | Byte offset where footer begins |
footerEndOffset |
Long? | Byte offset where footer ends |
splitTags |
Set[String]? | Tags associated with this split |
numMergeOps |
Int? | Number of merge operations applied |
docMappingRef |
String? | Schema hash reference (V3+) |
docMappingJson |
String? | Inline schema JSON (legacy, pre-V3) |
uncompressedSizeBytes |
Long? | Uncompressed data size |
Example:
{
"add": {
"path": "date=2024-01-01/splits/split-abc123.split",
"partitionValues": {"date": "2024-01-01"},
"size": 1048576,
"modificationTime": 1704067200000,
"dataChange": true,
"minValues": {"score": "0.1"},
"maxValues": {"score": "0.9"},
"numRecords": 1000,
"hasFooterOffsets": true,
"footerStartOffset": 900000,
"footerEndOffset": 1048500,
"docMappingRef": "ab12cd34ef56gh78"
}
}Marks a file as logically deleted.
| Field | Type | Description |
|---|---|---|
path |
String | Path to file being removed |
deletionTimestamp |
Long? | When file was deleted (epoch ms) |
dataChange |
Boolean | Whether this affects data |
partitionValues |
Map[String, String]? | Partition values |
size |
Long? | File size |
Example:
{
"remove": {
"path": "splits/old-split.split",
"deletionTimestamp": 1704153600000,
"dataChange": true
}
}Records files that should be temporarily skipped during operations.
| Field | Type | Description |
|---|---|---|
path |
String | Path to file |
skipTimestamp |
Long | When skip was recorded |
reason |
String | Why file was skipped |
operation |
String | Operation that failed |
retryAfter |
Long? | Timestamp after which retry is allowed |
skipCount |
Int | Number of consecutive skips |
Example:
{
"mergeskip": {
"path": "splits/corrupted.split",
"skipTimestamp": 1704067200000,
"reason": "Corrupted index footer",
"operation": "merge",
"retryAfter": 1704153600000,
"skipCount": 1
}
}Checkpoints consolidate the cumulative state of the table at a specific version, enabling faster reads by avoiding scanning all version files.
Points to the current checkpoint. JSON format:
{
"version": 42,
"size": 15000,
"sizeInBytes": 104857600,
"numFiles": 15000,
"createdTime": 1704067200000,
"format": "avro-state",
"stateDir": "state-v00000000000000000042"
}| Field | Type | Description |
|---|---|---|
version |
Long | Transaction version |
size |
Long | Number of actions |
sizeInBytes |
Long | Total bytes |
numFiles |
Long | Number of AddActions |
createdTime |
Long | Creation timestamp |
parts |
Int? | Number of parts (multi-part JSON) |
checkpointId |
String? | UUID for multi-part (V3) |
format |
String? | "json", "json-multipart", or "avro-state" |
stateDir |
String? | State directory name (V4 Avro) |
Single-File Format:
- Filename:
<version>.checkpoint.json - Contains all actions from version 0 to checkpoint version
- GZIP compressed
Multi-Part Format (V3):
- Manifest:
<version>.checkpoint.json(containsMultiPartCheckpointManifest) - Parts:
<version>.checkpoint.<uuid>.<part>.json
The Avro state format provides 10x faster reads compared to JSON checkpoints.
_transaction_log/
├── manifests/ # Shared manifest directory
│ ├── manifest-<hash>.avro # Reusable across state versions
│ └── manifest-<hash2>.avro
└── state-v<version>/
└── _manifest.avro # References manifests/ directory
Incremental Writes (Iceberg-style):
New transactions only write a new manifest for new files. Existing manifests are referenced by path, avoiding O(n) rewrites:
BEFORE (full rewrite):
state-v100/manifest-001.avro (100K entries)
state-v101/manifest-002.avro (100K entries - ALL rewritten!)
AFTER (incremental):
manifests/manifest-001.avro (100K entries - written once, shared)
manifests/manifest-002.avro (10 entries - only NEW files)
state-v100/_manifest.avro → [manifests/manifest-001.avro]
state-v101/_manifest.avro → [manifests/manifest-001.avro, manifests/manifest-002.avro]
{
"formatVersion": 1,
"stateVersion": 42,
"createdAt": 1704067200000,
"numFiles": 15000,
"totalBytes": 1099511627776,
"protocolVersion": 4,
"manifests": [
{
"path": "manifests/manifest-a1b2c3d4.avro",
"numEntries": 50000,
"minAddedAtVersion": 0,
"maxAddedAtVersion": 42,
"partitionBounds": {
"date": {"min": "2024-01-01", "max": "2024-01-31"}
}
}
],
"tombstones": [],
"schemaRegistry": {
"ab12cd34ef56gh78": "[{\"name\":\"title\",...}]"
},
"metadata": "{\"metaData\": {...}}"
}| Field | Type | Description |
|---|---|---|
formatVersion |
Int | State format version (currently 1) |
stateVersion |
Long | Transaction version this state represents |
createdAt |
Long | Creation timestamp (epoch ms) |
numFiles |
Long | Total live files (after tombstones) |
totalBytes |
Long | Total size of live files |
protocolVersion |
Int | Protocol version (4 for Avro state) |
manifests |
Array[ManifestInfo] | List of Avro manifest files |
tombstones |
Array[String] | Paths of removed files |
schemaRegistry |
Map[String, String] | Schema hash -> schema JSON |
metadata |
String? | JSON-encoded MetadataAction |
| Field | Type | Description |
|---|---|---|
path |
String | Path to manifest file (see path formats below) |
numEntries |
Long | Number of FileEntry records |
minAddedAtVersion |
Long | Earliest version in manifest |
maxAddedAtVersion |
Long | Latest version in manifest |
partitionBounds |
Map[String, PartitionBounds]? | Bounds for partition pruning |
Manifest Path Formats:
Manifest paths are relative to the transaction log root and support three formats:
| Format | Example | Description |
|---|---|---|
| Shared | manifests/manifest-abc123.avro |
New format: shared across state versions |
| Normalized legacy | state-v00000000000000000042/manifest-abc123.avro |
Legacy path normalized with state dir prefix |
| Legacy | manifest-abc123.avro |
Legacy format: relative to current state dir |
Path Resolution:
def resolveManifestPath(path: String, txLogRoot: String, stateDir: String): String = {
if (path.startsWith("manifests/")) // Shared format
s"$txLogRoot/$path"
else if (path.startsWith("state-v")) // Normalized legacy
s"$txLogRoot/$path"
else // Legacy format
s"$stateDir/$path"
}| Field | Type | Description |
|---|---|---|
min |
String? | Minimum partition value |
max |
String? | Maximum partition value |
Binary Avro records with field IDs for schema evolution:
| Field ID Range | Category |
|---|---|
| 100-109 | Basic file info |
| 110-119 | Statistics |
| 120-129 | Footer offsets |
| 130-139 | Split metadata |
| 140-149 | Streaming fields |
Fields mirror AddAction with additions:
addedAtVersion: Transaction version when file was addedaddedAtTimestamp: Timestamp when file was added
Avro manifests support configurable compression:
zstd(default, level 3)snappynone
Large schemas (400+ columns) can bloat the transaction log significantly. Schema deduplication reduces this by storing each unique schema once.
-
Hash Computation:
- Normalize JSON to canonical form (sort object keys alphabetically)
- Sort arrays of named objects by their
"name"field - Compute SHA-256 hash of canonical JSON
- Base64 encode and truncate to 16 characters (96 bits entropy)
-
Write Path:
- Compute hash of
docMappingJson - Store hash in
docMappingReffield of AddAction - Store schema in
MetadataAction.configurationwith keydocMappingSchema.<hash> - Remove inline
docMappingJsonfrom AddAction
- Compute hash of
-
Read Path:
- Load schema registry from MetadataAction configuration
- Restore
docMappingJsonfrom registry usingdocMappingRef
docMappingSchema.<16-char-base64-hash>
Example:
docMappingSchema.ab12cd34ef56gh78
When unique docMappingRef count exceeds a threshold (default: 5), checkpoint operations recalculate all hashes using canonical normalization. This consolidates schemas that may have been hashed with different JSON orderings.
AddActions may include minValues and maxValues maps for data skipping:
{
"minValues": {"score": "0.1", "date": "2024-01-01"},
"maxValues": {"score": "0.9", "date": "2024-01-31"}
}Statistics are stored as strings and compared lexicographically for string columns or parsed for numeric columns.
Long text values are truncated to reduce checkpoint size:
- Default: 32 characters
- Partition columns: Never truncated
- Configurable via
spark.indextables.stats.truncation.maxLength
With Avro state format, partition bounds in ManifestInfo enable manifest-level pruning:
- Reader evaluates partition predicates against ManifestInfo.partitionBounds
- Entire manifests with non-matching bounds are skipped
- Reduces I/O for large partitioned tables
Writers use optimistic concurrency with atomic ifNotExists writes:
- Read current version from transaction log
- Prepare actions for new version
- Attempt to write version file with
ifNotExists - If file exists (conflict), retry with higher version
| Config | Default | Description |
|---|---|---|
spark.indextables.transaction.retry.maxAttempts |
10 | Maximum retry attempts |
spark.indextables.transaction.retry.baseDelayMs |
100 | Base delay for exponential backoff |
spark.indextables.transaction.retry.maxDelayMs |
5000 | Maximum backoff delay |
Avro state writes also use conditional writes:
- State directory existence check before writing
_manifest.avrowritten withifNotExists- On conflict, increment version and retry
- Re-read base state on every retry: Prevents stale manifest lists from concurrent writers
| Config | Default | Description |
|---|---|---|
spark.indextables.state.retry.maxAttempts |
10 | Maximum retry attempts for state writes |
spark.indextables.state.retry.baseDelayMs |
100 | Base delay for exponential backoff |
spark.indextables.state.retry.maxDelayMs |
5000 | Maximum backoff delay |
Incremental Write with Retry:
def writeIncrementalWithRetry(newFiles, removedPaths, ...): StateWriteResult = {
while (attempt <= maxAttempts) {
// CRITICAL: Re-read base state on EVERY retry to pick up concurrent changes
val baseState = findLatestState()
if (needsCompaction(baseState, removedPaths.size, config)) {
tryWriteCompactedState(...)
} else {
// Reference existing manifests, only write new manifest for new files
tryWriteIncrementalState(baseState, newFiles, removedPaths, ...)
}
// On conflict, retry with higher version
}
}Old version files are cleaned up after checkpoint creation:
- Default retention: 30 days
- Configurable via
spark.indextables.purge.txLogRetentionHours
Multiple checkpoints may exist temporarily:
- Only the latest checkpoint is used for reads
- Old checkpoints cleaned up by PURGE operations
State directories accumulate tombstones over time. Compaction triggers when:
- Tombstone ratio > 10% (
spark.indextables.state.compaction.tombstoneThreshold) - Manifest count > 20 (
spark.indextables.state.compaction.maxManifests) - Force compaction requested (
CompactionConfig.forceCompaction = true)
Compaction rewrites state with tombstones applied and optimized manifest layout.
With shared manifests, orphaned manifest files can accumulate when state directories
are deleted but their manifests are no longer referenced. The ManifestGarbageCollector
tracks reachable manifests and cleans up orphans:
Collection Process:
- Scan retained state versions to build reachable manifest set
- List all manifests in
manifests/directory - Delete unreferenced manifests older than
minManifestAgeHours
Age-Based Protection:
- Manifests younger than
minManifestAgeHours(default: 1 hour) are never deleted - Prevents race conditions with in-flight writes that reference new manifests
Integration:
PURGE INDEXTABLEcommand invokes manifest GC after state directory cleanup- Automatic GC during
PurgeOrphanedSplitsExecutorexecution
| Config | Default | Description |
|---|---|---|
spark.indextables.checkpoint.enabled |
true | Enable checkpointing |
spark.indextables.checkpoint.interval |
10 | Versions between checkpoints |
spark.indextables.transaction.compression.enabled |
true | GZIP compress version files |
| Config | Default | Description |
|---|---|---|
spark.indextables.state.format |
avro | "avro" or "json" |
spark.indextables.state.compression |
zstd | "zstd", "snappy", "none" |
spark.indextables.state.compressionLevel |
3 | Compression level (1-22 for zstd) |
spark.indextables.state.entriesPerManifest |
50000 | Max entries per Avro manifest |
spark.indextables.state.read.parallelism |
8 | Parallel manifest reads |
| Config | Default | Description |
|---|---|---|
spark.indextables.state.schema.renormalizeThreshold |
5 | Trigger renormalization above this unique ref count |
| Config | Default | Description |
|---|---|---|
spark.indextables.state.compaction.tombstoneThreshold |
0.10 | Tombstone ratio to trigger compaction |
spark.indextables.state.compaction.maxManifests |
20 | Manifest count to trigger compaction |
spark.indextables.state.compaction.largeRemoveThreshold |
MAX_INT | Large remove count (disabled by default) |
spark.indextables.state.compaction.afterMerge |
true | Compact after MERGE SPLITS |
| Config | Default | Description |
|---|---|---|
spark.indextables.state.retention.versions |
2 | State versions to retain |
spark.indextables.state.retention.hours |
168 | Hours to retain old states |
| Config | Default | Description |
|---|---|---|
spark.indextables.state.gc.minManifestAgeHours |
1 | Don't delete manifests younger than this |
{
"type": "object",
"properties": {
"minReaderVersion": {"type": "integer"},
"minWriterVersion": {"type": "integer"},
"readerFeatures": {"type": "array", "items": {"type": "string"}},
"writerFeatures": {"type": "array", "items": {"type": "string"}}
},
"required": ["minReaderVersion", "minWriterVersion"]
}{
"type": "object",
"properties": {
"id": {"type": "string"},
"name": {"type": "string"},
"description": {"type": "string"},
"format": {
"type": "object",
"properties": {
"provider": {"type": "string"},
"options": {"type": "object"}
}
},
"schemaString": {"type": "string"},
"partitionColumns": {"type": "array", "items": {"type": "string"}},
"configuration": {"type": "object"},
"createdTime": {"type": "integer"}
},
"required": ["id", "format", "schemaString", "partitionColumns", "configuration"]
}{
"type": "object",
"properties": {
"path": {"type": "string"},
"partitionValues": {"type": "object"},
"size": {"type": "integer"},
"modificationTime": {"type": "integer"},
"dataChange": {"type": "boolean"},
"stats": {"type": "string"},
"minValues": {"type": "object"},
"maxValues": {"type": "object"},
"numRecords": {"type": "integer"},
"hasFooterOffsets": {"type": "boolean"},
"footerStartOffset": {"type": "integer"},
"footerEndOffset": {"type": "integer"},
"splitTags": {"type": "array", "items": {"type": "string"}},
"numMergeOps": {"type": "integer"},
"docMappingRef": {"type": "string"},
"docMappingJson": {"type": "string"},
"uncompressedSizeBytes": {"type": "integer"}
},
"required": ["path", "partitionValues", "size", "modificationTime", "dataChange"]
}{
"type": "record",
"name": "FileEntry",
"namespace": "io.indextables.spark.transaction.avro",
"fields": [
{"name": "path", "type": "string", "field-id": 100},
{"name": "partitionValues", "type": {"type": "map", "values": "string"}, "field-id": 101},
{"name": "size", "type": "long", "field-id": 102},
{"name": "modificationTime", "type": "long", "field-id": 103},
{"name": "dataChange", "type": "boolean", "field-id": 104},
{"name": "stats", "type": ["null", "string"], "default": null, "field-id": 110},
{"name": "minValues", "type": ["null", {"type": "map", "values": "string"}], "default": null, "field-id": 111},
{"name": "maxValues", "type": ["null", {"type": "map", "values": "string"}], "default": null, "field-id": 112},
{"name": "numRecords", "type": ["null", "long"], "default": null, "field-id": 113},
{"name": "footerStartOffset", "type": ["null", "long"], "default": null, "field-id": 120},
{"name": "footerEndOffset", "type": ["null", "long"], "default": null, "field-id": 121},
{"name": "hasFooterOffsets", "type": "boolean", "default": false, "field-id": 122},
{"name": "splitTags", "type": ["null", {"type": "array", "items": "string"}], "default": null, "field-id": 130},
{"name": "numMergeOps", "type": ["null", "int"], "default": null, "field-id": 131},
{"name": "docMappingRef", "type": ["null", "string"], "default": null, "field-id": 132},
{"name": "uncompressedSizeBytes", "type": ["null", "long"], "default": null, "field-id": 133},
{"name": "addedAtVersion", "type": "long", "field-id": 140},
{"name": "addedAtTimestamp", "type": "long", "field-id": 141}
]
}
- Run
CHECKPOINT INDEXTABLES '<table_path>' - The checkpoint command automatically:
- Creates Avro state directory
- Migrates schema registry
- Updates
_last_checkpointwithformat: "avro-state"
- Verify with
DESCRIBE INDEXTABLES STATE '<table_path>'
To force JSON checkpoint (not recommended):
spark.conf.set("spark.indextables.state.format", "json")
spark.sql("CHECKPOINT INDEXTABLES '<table_path>'")Note: JSON format is deprecated and will be removed in a future release.