Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 64 additions & 15 deletions src/tidesdb.lua
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ ffi.cdef[[
uint64_t min_disk_space;
int l1_file_count_trigger;
int l0_queue_stall_threshold;
double tombstone_density_trigger;
uint64_t tombstone_density_min_entries;
int use_btree;
tidesdb_commit_hook_fn commit_hook_fn;
void *commit_hook_ctx;
Expand Down Expand Up @@ -126,6 +128,7 @@ ffi.cdef[[
uint64_t unified_memtable_sync_interval_us;
void* object_store;
tidesdb_objstore_config_t* object_store_config;
int max_concurrent_flushes;
} tidesdb_config_t;

typedef struct {
Expand All @@ -145,6 +148,11 @@ ffi.cdef[[
uint64_t btree_total_nodes;
uint32_t btree_max_height;
double btree_avg_height;
uint64_t total_tombstones;
double tombstone_ratio;
uint64_t* level_tombstone_counts;
double max_sst_density;
int max_sst_density_level;
} tidesdb_stats_t;

typedef struct {
Expand Down Expand Up @@ -206,6 +214,8 @@ ffi.cdef[[

// Column family operations
int tidesdb_compact(void* cf);
int tidesdb_compact_range(void* cf, const uint8_t* start_key, size_t start_key_size,
const uint8_t* end_key, size_t end_key_size);
int tidesdb_flush_memtable(void* cf);
int tidesdb_is_flushing(void* cf);
int tidesdb_is_compacting(void* cf);
Expand Down Expand Up @@ -442,22 +452,24 @@ end

-- Default configurations
function tidesdb.default_config()
local c_config = lib.tidesdb_default_config()
return {
db_path = "",
num_flush_threads = 2,
num_compaction_threads = 2,
log_level = tidesdb.LogLevel.LOG_INFO,
block_cache_size = 64 * 1024 * 1024,
max_open_sstables = 256,
log_to_file = false,
log_truncation_at = 24 * 1024 * 1024,
max_memory_usage = 0,
unified_memtable = false,
unified_memtable_write_buffer_size = 64 * 1024 * 1024,
unified_memtable_skip_list_max_level = 12,
unified_memtable_skip_list_probability = 0.25,
unified_memtable_sync_mode = tidesdb.SyncMode.SYNC_INTERVAL,
unified_memtable_sync_interval_us = 128000,
num_flush_threads = c_config.num_flush_threads,
num_compaction_threads = c_config.num_compaction_threads,
log_level = c_config.log_level,
block_cache_size = tonumber(c_config.block_cache_size),
max_open_sstables = tonumber(c_config.max_open_sstables),
log_to_file = c_config.log_to_file ~= 0,
log_truncation_at = tonumber(c_config.log_truncation_at),
max_memory_usage = tonumber(c_config.max_memory_usage),
unified_memtable = c_config.unified_memtable ~= 0,
unified_memtable_write_buffer_size = tonumber(c_config.unified_memtable_write_buffer_size),
unified_memtable_skip_list_max_level = c_config.unified_memtable_skip_list_max_level,
unified_memtable_skip_list_probability = c_config.unified_memtable_skip_list_probability,
unified_memtable_sync_mode = c_config.unified_memtable_sync_mode,
unified_memtable_sync_interval_us = tonumber(c_config.unified_memtable_sync_interval_us),
max_concurrent_flushes = c_config.max_concurrent_flushes,
}
end

Expand All @@ -484,6 +496,8 @@ function tidesdb.default_column_family_config()
min_disk_space = tonumber(c_config.min_disk_space),
l1_file_count_trigger = c_config.l1_file_count_trigger,
l0_queue_stall_threshold = c_config.l0_queue_stall_threshold,
tombstone_density_trigger = c_config.tombstone_density_trigger,
tombstone_density_min_entries = tonumber(c_config.tombstone_density_min_entries),
use_btree = c_config.use_btree ~= 0,
object_lazy_compaction = c_config.object_lazy_compaction ~= 0,
object_prefetch_compaction = c_config.object_prefetch_compaction ~= 0,
Expand Down Expand Up @@ -573,6 +587,8 @@ local function config_to_c_struct(config, cf_name)
c_config.min_disk_space = config.min_disk_space or 100 * 1024 * 1024
c_config.l1_file_count_trigger = config.l1_file_count_trigger or 4
c_config.l0_queue_stall_threshold = config.l0_queue_stall_threshold or 20
c_config.tombstone_density_trigger = config.tombstone_density_trigger or 0.0
c_config.tombstone_density_min_entries = config.tombstone_density_min_entries or 1024
c_config.use_btree = config.use_btree and 1 or 0
c_config.object_lazy_compaction = config.object_lazy_compaction and 1 or 0
c_config.object_prefetch_compaction = config.object_prefetch_compaction and 1 or 0
Expand Down Expand Up @@ -715,6 +731,21 @@ function ColumnFamily:compact()
check_result(result, "failed to compact column family")
end

function ColumnFamily:compact_range(start_key, end_key)
local start_ptr, start_len = nil, 0
if start_key ~= nil and #start_key > 0 then
start_ptr = start_key
start_len = #start_key
end
local end_ptr, end_len = nil, 0
if end_key ~= nil and #end_key > 0 then
end_ptr = end_key
end_len = #end_key
end
local result = lib.tidesdb_compact_range(self._cf, start_ptr, start_len, end_ptr, end_len)
check_result(result, "failed to compact range")
end

function ColumnFamily:flush_memtable()
local result = lib.tidesdb_flush_memtable(self._cf)
check_result(result, "failed to flush memtable")
Expand Down Expand Up @@ -810,6 +841,8 @@ function ColumnFamily:get_stats()
min_disk_space = tonumber(c_cfg.min_disk_space),
l1_file_count_trigger = c_cfg.l1_file_count_trigger,
l0_queue_stall_threshold = c_cfg.l0_queue_stall_threshold,
tombstone_density_trigger = c_cfg.tombstone_density_trigger,
tombstone_density_min_entries = tonumber(c_cfg.tombstone_density_min_entries),
use_btree = c_cfg.use_btree ~= 0,
}
end
Expand All @@ -821,6 +854,13 @@ function ColumnFamily:get_stats()
end
end

local level_tombstone_counts = {}
if c_stats.num_levels > 0 and c_stats.level_tombstone_counts ~= nil then
for i = 0, c_stats.num_levels - 1 do
table.insert(level_tombstone_counts, tonumber(c_stats.level_tombstone_counts[i]))
end
end

local stats = {
num_levels = c_stats.num_levels,
memtable_size = tonumber(c_stats.memtable_size),
Expand All @@ -838,6 +878,11 @@ function ColumnFamily:get_stats()
btree_total_nodes = tonumber(c_stats.btree_total_nodes),
btree_max_height = c_stats.btree_max_height,
btree_avg_height = c_stats.btree_avg_height,
total_tombstones = tonumber(c_stats.total_tombstones),
tombstone_ratio = c_stats.tombstone_ratio,
level_tombstone_counts = level_tombstone_counts,
max_sst_density = c_stats.max_sst_density,
max_sst_density_level = c_stats.max_sst_density_level,
}

lib.tidesdb_free_stats(stats_ptr[0])
Expand Down Expand Up @@ -1045,6 +1090,7 @@ function TidesDB.new(config)
c_config.unified_memtable_skip_list_probability = config.unified_memtable_skip_list_probability or 0.25
c_config.unified_memtable_sync_mode = config.unified_memtable_sync_mode or tidesdb.SyncMode.SYNC_INTERVAL
c_config.unified_memtable_sync_interval_us = config.unified_memtable_sync_interval_us or 128000
c_config.max_concurrent_flushes = config.max_concurrent_flushes or 0

-- Object store configuration
if config.object_store then
Expand Down Expand Up @@ -1083,6 +1129,7 @@ function TidesDB.open(path, options)
unified_memtable_skip_list_probability = options.unified_memtable_skip_list_probability,
unified_memtable_sync_mode = options.unified_memtable_sync_mode,
unified_memtable_sync_interval_us = options.unified_memtable_sync_interval_us,
max_concurrent_flushes = options.max_concurrent_flushes,
object_store = options.object_store,
object_store_config = options.object_store_config,
}
Expand Down Expand Up @@ -1365,6 +1412,8 @@ function tidesdb.load_config_from_ini(ini_file, section_name)
min_disk_space = tonumber(c_config.min_disk_space),
l1_file_count_trigger = c_config.l1_file_count_trigger,
l0_queue_stall_threshold = c_config.l0_queue_stall_threshold,
tombstone_density_trigger = c_config.tombstone_density_trigger,
tombstone_density_min_entries = tonumber(c_config.tombstone_density_min_entries),
use_btree = c_config.use_btree ~= 0,
object_lazy_compaction = c_config.object_lazy_compaction ~= 0,
object_prefetch_compaction = c_config.object_prefetch_compaction ~= 0,
Expand All @@ -1378,6 +1427,6 @@ function tidesdb.save_config_to_ini(ini_file, section_name, config)
end

-- Version
tidesdb._VERSION = "0.6.0"
tidesdb._VERSION = "0.7.0"

return tidesdb
159 changes: 159 additions & 0 deletions tests/test_tidesdb.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1390,6 +1390,165 @@ function tests.test_txn_single_delete()
print("PASS: test_txn_single_delete")
end

function tests.test_tombstone_cf_config_roundtrip()
local path = "./test_db_tombstone_cfg"
cleanup_db(path)

-- Defaults from C should be sensible
local defaults = tidesdb.default_column_family_config()
assert_true(defaults.tombstone_density_trigger ~= nil, "tombstone_density_trigger should exist in defaults")
assert_true(defaults.tombstone_density_min_entries ~= nil, "tombstone_density_min_entries should exist in defaults")
assert_true(defaults.tombstone_density_min_entries >= 1, "tombstone_density_min_entries default should be >= 1")

local db = tidesdb.TidesDB.open(path, { log_level = tidesdb.LogLevel.LOG_WARN })
local cf_config = tidesdb.default_column_family_config()
cf_config.tombstone_density_trigger = 0.5
cf_config.tombstone_density_min_entries = 256
db:create_column_family("ts_cf", cf_config)

local cf = db:get_column_family("ts_cf")
local stats = cf:get_stats()
assert_true(stats.config ~= nil, "stats.config should exist")
assert_eq(stats.config.tombstone_density_trigger, 0.5, "tombstone_density_trigger round-trip")
assert_eq(stats.config.tombstone_density_min_entries, 256, "tombstone_density_min_entries round-trip")

db:drop_column_family("ts_cf")
db:close()
cleanup_db(path)
print("PASS: test_tombstone_cf_config_roundtrip")
end

function tests.test_tombstone_stats_after_deletes()
local path = "./test_db_tombstone_stats"
cleanup_db(path)

local db = tidesdb.TidesDB.open(path, { log_level = tidesdb.LogLevel.LOG_WARN })
db:create_column_family("ts_cf")
local cf = db:get_column_family("ts_cf")

-- Insert 100 keys, flush, delete half, flush
local n = 100
local insert_txn = db:begin_txn()
for i = 1, n do
insert_txn:put(cf, string.format("key:%04d", i), string.format("value:%04d", i))
end
insert_txn:commit()
insert_txn:free()
cf:flush_memtable()

local del_txn = db:begin_txn()
for i = 1, n / 2 do
del_txn:delete(cf, string.format("key:%04d", i))
end
del_txn:commit()
del_txn:free()
cf:flush_memtable()

-- Brief wait for flush to land
local deadline = os.time() + 5
while cf:is_flushing() and os.time() < deadline do
os.execute("sleep 0.1")
end

local stats = cf:get_stats()
assert_true(stats.total_tombstones ~= nil, "total_tombstones should exist")
assert_true(stats.total_tombstones > 0, "total_tombstones should be > 0 after deletes")
assert_true(stats.tombstone_ratio ~= nil, "tombstone_ratio should exist")
assert_true(stats.tombstone_ratio >= 0 and stats.tombstone_ratio <= 1, "tombstone_ratio in [0, 1]")
assert_true(stats.max_sst_density ~= nil, "max_sst_density should exist")
assert_true(stats.max_sst_density >= 0 and stats.max_sst_density <= 1, "max_sst_density in [0, 1]")
assert_true(stats.max_sst_density_level ~= nil, "max_sst_density_level should exist")
assert_true(stats.level_tombstone_counts ~= nil, "level_tombstone_counts should exist")
assert_eq(#stats.level_tombstone_counts, stats.num_levels, "level_tombstone_counts length should match num_levels")

db:drop_column_family("ts_cf")
db:close()
cleanup_db(path)
print("PASS: test_tombstone_stats_after_deletes")
end

function tests.test_compact_range()
local path = "./test_db_compact_range"
cleanup_db(path)

local db = tidesdb.TidesDB.open(path, { log_level = tidesdb.LogLevel.LOG_WARN })
db:create_column_family("cr_cf")
local cf = db:get_column_family("cr_cf")

-- Insert several batches and flush each to create multiple SSTables
for batch = 1, 4 do
local txn = db:begin_txn()
for i = 1, 50 do
local k = string.format("key:%02d:%04d", batch, i)
txn:put(cf, k, string.format("v:%d", i))
end
txn:commit()
txn:free()
cf:flush_memtable()
end

-- Wait briefly for flushes to settle
local deadline = os.time() + 5
while cf:is_flushing() and os.time() < deadline do
os.execute("sleep 0.1")
end

-- Narrow range compaction succeeds
cf:compact_range("key:01:0001", "key:02:0050")

-- A key outside the range should still be readable and unchanged
local read_txn = db:begin_txn()
local v = read_txn:get(cf, "key:04:0010")
assert_eq(v, "v:10", "key outside compacted range should be unchanged")
read_txn:free()

-- Both endpoints empty/nil should be rejected with INVALID_ARGS
local err = assert_error(function()
cf:compact_range(nil, nil)
end, "both nil endpoints should fail")
err = assert_error(function()
cf:compact_range("", "")
end, "both empty endpoints should fail")

-- Unbounded one side should be accepted
cf:compact_range(nil, "key:01:0050")
cf:compact_range("key:04:0001", nil)

db:drop_column_family("cr_cf")
db:close()
cleanup_db(path)
print("PASS: test_compact_range")
end

function tests.test_max_concurrent_flushes()
local path = "./test_db_max_flushes"
cleanup_db(path)

-- default_config() should source from C, so max_concurrent_flushes should be non-zero
local defaults = tidesdb.default_config()
assert_true(defaults.max_concurrent_flushes ~= nil, "max_concurrent_flushes should exist in default_config")
assert_true(defaults.max_concurrent_flushes > 0, "default max_concurrent_flushes should be > 0")

-- Open with MaxConcurrentFlushes = 1; basic put + flush should work
local db = tidesdb.TidesDB.open(path, {
log_level = tidesdb.LogLevel.LOG_WARN,
max_concurrent_flushes = 1,
})
db:create_column_family("mcf_cf")
local cf = db:get_column_family("mcf_cf")

local txn = db:begin_txn()
txn:put(cf, "k", "v")
txn:commit()
txn:free()
cf:flush_memtable()

db:drop_column_family("mcf_cf")
db:close()
cleanup_db(path)
print("PASS: test_max_concurrent_flushes")
end

-- Run all tests
local function run_tests()
print("Running TidesDB Lua tests...")
Expand Down
4 changes: 2 additions & 2 deletions tidesdb-0.6.0-1.rockspec → tidesdb-0.7.0-1.rockspec
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package = "tidesdb"
version = "0.6.0-1"
version = "0.7.0-1"
source = {
url = "git://github.com/tidesdb/tidesdb-lua.git",
tag = "v0.6.0"
tag = "v0.7.0"
}
description = {
summary = "Official Lua bindings for TidesDB - A high-performance embedded key-value storage engine",
Expand Down
Loading