Skip to content

Commit bf57b3c

Browse files
tsafinclaude
andcommitted
Add TPC-DS Phase 2: store_sales and inventory generators
Adds a complete tpcds_benchmark executable (built with -DTPCDS_ENABLE=ON) that generates TPC-DS store_sales and inventory tables into Parquet, CSV, ORC, Lance, Paimon, or Iceberg via the existing tpch::WriterInterface. Build infrastructure (third_party/dsdgen/): - CMakeLists.txt: mkheader → tables.h/streams.h/columns.h → distcomp → tpcds.idx → gen_dsts.py → dsts_generated.c → dsdgen_objs object library - dsdgen_stubs.c: thin file-I/O stubs (not needed in embedded mode) - tpcds_dsdgen.h: C++-safe forward declarations that bypass the fragile config.h/porting.h LINUX/HUGE_TYPE dependency chain; defines ds_key_t, decimal_t, ds_pricing_t, W_STORE_SALES_TBL, W_INVENTORY_TBL, and the EMBEDDED_DSDGEN callback globals - cmake/gen_dsts.py: embeds tpcds.idx as a C byte array (dsts_generated.c) C++ wrappers (include/tpch/, src/dsdgen/): - dsdgen_wrapper.hpp/.cpp: DSDGenWrapper class — initialises dsdgen from the embedded tpcds.idx (via mkstemp temp file), exposes Arrow schemas for STORE_SALES and INVENTORY, and drives generation via the EMBEDDED_DSDGEN callback trampoline (store_sales is master-detail: 8-16 line items per ticket; callback fires once per line item) - dsdgen_converter.hpp/.cpp: append_*_to_builders() helpers mapping the dsdgen C structs to Arrow array builders; dec_to_double() correctly converts decimal_t scaled integers (avoids buggy upstream dectoflt) Executable (src/tpcds_main.cpp): - CLI mirrors tpch_benchmark: --format, --table, --scale-factor, --output-dir, --max-rows, --verbose - Batched Arrow generation loop (10 000 rows/batch) → writer->write_batch third_party/tpcds submodule updated to branch tpcds_cpp_embedded which adds EMBEDDED_DSDGEN guards to w_store_sales.c (callback + suppressed file I/O for both store_sales and store_returns output files). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 2a8d431 commit bf57b3c

12 files changed

Lines changed: 1550 additions & 0 deletions

File tree

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@
1616
[submodule "third_party/lance"]
1717
path = third_party/lance
1818
url = https://github.com/tsafin/lance.git
19+
[submodule "third_party/tpcds"]
20+
path = third_party/tpcds
21+
url = https://github.com/tsafin/tpchds-tools.git

CMakeLists.txt

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ option(TPCH_ENABLE_ICEBERG "Enable Apache Iceberg table format support" OFF)
2828
option(TPCH_ENABLE_LANCE "Enable Lance columnar format support (requires Rust)" OFF)
2929
option(TPCH_ENABLE_PERF_COUNTERS "Enable performance counters instrumentation" OFF)
3030
option(TPCH_ENABLE_MOLD "Enable mold linker if available (incompatible with GTest in this project)" ON)
31+
option(TPCDS_ENABLE "Enable TPC-DS data generation (tpcds_benchmark executable)" OFF)
3132

3233
# Compiler configuration
3334
include(cmake/CompilerWarnings.cmake)
@@ -258,6 +259,11 @@ set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS
258259
add_subdirectory(third_party/dbgen EXCLUDE_FROM_ALL)
259260
include_directories(${DBGEN_INCLUDE_DIRS})
260261

262+
# TPC-DS dsdgen objects (built only when TPCDS_ENABLE=ON)
263+
if(TPCDS_ENABLE)
264+
add_subdirectory(third_party/dsdgen EXCLUDE_FROM_ALL)
265+
endif()
266+
261267
# Copy TPC-H distribution file to build directory
262268
# Required by dbgen for loading nations, regions, and other lookup tables
263269
configure_file(
@@ -559,6 +565,46 @@ if(TPCH_ENABLE_PERF_COUNTERS)
559565
target_compile_definitions(tpch_benchmark PRIVATE TPCH_ENABLE_PERF_COUNTERS)
560566
endif()
561567

568+
# TPC-DS benchmark executable
569+
if(TPCDS_ENABLE)
570+
add_executable(tpcds_benchmark
571+
src/tpcds_main.cpp
572+
src/dsdgen/dsdgen_wrapper.cpp
573+
src/dsdgen/dsdgen_converter.cpp
574+
${DSDGEN_OBJECTS}
575+
)
576+
target_link_libraries(tpcds_benchmark PRIVATE tpch_core)
577+
target_include_directories(tpcds_benchmark PRIVATE ${DSDGEN_INCLUDE_DIRS})
578+
# dsdgen upstream has several globals defined in multiple source files
579+
# (pCurrentFile in driver.c and grammar_support.c, etc.). Allow duplicates
580+
# at link time — the old GCC linker accepted these by default via -fcommon.
581+
target_link_options(tpcds_benchmark PRIVATE -Wl,--allow-multiple-definition)
582+
# dsdgen headers require LINUX=1 and TPCDS=1 to define ds_key_t and enable
583+
# 64-bit support (config.h/#ifdef LINUX). Also needed by dsdgen_wrapper.cpp
584+
# and dsdgen_converter.cpp which include dsdgen C headers.
585+
target_compile_definitions(tpcds_benchmark PRIVATE TPCDS_ENABLE LINUX=1 TPCDS=1 EMBEDDED_DSDGEN=1)
586+
if(TPCH_ENABLE_ORC)
587+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_ORC)
588+
endif()
589+
if(TPCH_ENABLE_PAIMON)
590+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_PAIMON)
591+
endif()
592+
if(TPCH_ENABLE_ICEBERG)
593+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_ICEBERG)
594+
endif()
595+
if(TPCH_ENABLE_LANCE)
596+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_LANCE)
597+
endif()
598+
if(TPCH_ENABLE_ASYNC_IO AND Uring_FOUND)
599+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_ASYNC_IO)
600+
endif()
601+
if(TPCH_ENABLE_PERF_COUNTERS)
602+
target_compile_definitions(tpcds_benchmark PRIVATE TPCH_ENABLE_PERF_COUNTERS)
603+
endif()
604+
message(STATUS "TPC-DS support enabled: tpcds_benchmark target added")
605+
install(TARGETS tpcds_benchmark RUNTIME DESTINATION bin)
606+
endif()
607+
562608
# Examples
563609
if(TPCH_BUILD_EXAMPLES)
564610
add_subdirectory(examples)

cmake/gen_dsts.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#!/usr/bin/env python3
2+
"""
3+
gen_dsts.py - Embed the TPC-DS binary distribution index (tpcds.idx) into a
4+
C source file as a static byte array.
5+
6+
Usage: gen_dsts.py <tpcds.idx> <output.c>
7+
8+
Background
9+
----------
10+
TPC-DS's dsdgen reads distribution data from a compiled binary file (tpcds.idx)
11+
produced by the 'distcomp' tool. At runtime, dist.c opens this file via:
12+
13+
fopen(get_str("DISTRIBUTIONS"), "rb")
14+
15+
To avoid shipping tpcds.idx as a separate runtime file, we embed its bytes here
16+
as a C uint8_t array. DSDGenWrapper writes the embedded bytes to a tmpfile on
17+
first use and points the DISTRIBUTIONS param at that tmpfile.
18+
19+
This mirrors the approach used by cmake/gen_dists.py for TPC-H's dists.dss.
20+
"""
21+
22+
import sys
23+
import os
24+
25+
26+
def embed_binary(input_path: str, output_path: str) -> None:
27+
with open(input_path, "rb") as f:
28+
data = f.read()
29+
30+
size = len(data)
31+
filename = os.path.basename(input_path)
32+
33+
lines = []
34+
lines.append(
35+
"/* Auto-generated from {} by cmake/gen_dsts.py -- do not edit */".format(filename)
36+
)
37+
lines.append("")
38+
lines.append("#include <stddef.h>")
39+
lines.append("#include <stdint.h>")
40+
lines.append("")
41+
lines.append("/* Embedded binary content of {} ({} bytes) */".format(filename, size))
42+
lines.append("const uint8_t tpcds_idx_data[] = {")
43+
44+
# 16 bytes per row for readability
45+
for i in range(0, size, 16):
46+
chunk = data[i : i + 16]
47+
hex_vals = ", ".join("0x{:02x}".format(b) for b in chunk)
48+
comma = "," if i + 16 < size else ""
49+
lines.append(" {}{}".format(hex_vals, comma))
50+
51+
lines.append("};")
52+
lines.append("")
53+
lines.append(
54+
"const size_t tpcds_idx_size = {};".format(size)
55+
)
56+
lines.append("")
57+
58+
out_dir = os.path.dirname(output_path)
59+
if out_dir:
60+
os.makedirs(out_dir, exist_ok=True)
61+
62+
with open(output_path, "w") as f:
63+
f.write("\n".join(lines) + "\n")
64+
65+
print(
66+
"Embedded {} ({} bytes) -> {}".format(filename, size, os.path.basename(output_path))
67+
)
68+
69+
70+
def main() -> None:
71+
if len(sys.argv) != 3:
72+
print("Usage: gen_dsts.py <tpcds.idx> <output.c>", file=sys.stderr)
73+
sys.exit(1)
74+
75+
input_path = sys.argv[1]
76+
output_path = sys.argv[2]
77+
78+
if not os.path.exists(input_path):
79+
print("Error: input file not found: {}".format(input_path), file=sys.stderr)
80+
sys.exit(1)
81+
82+
embed_binary(input_path, output_path)
83+
84+
85+
if __name__ == "__main__":
86+
main()

include/tpch/dsdgen_converter.hpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#pragma once
2+
3+
#include <memory>
4+
#include <map>
5+
#include <string>
6+
#include <arrow/builder.h>
7+
8+
namespace tpcds {
9+
10+
/**
11+
* Convert dsdgen C struct rows to Arrow array builders.
12+
*
13+
* Each function casts void* to the appropriate dsdgen struct, extracts
14+
* fields, and appends to the matching Arrow builders.
15+
*/
16+
17+
/**
18+
* Append a store_sales row (W_STORE_SALES_TBL*) to Arrow builders.
19+
* Schema matches DSDGenWrapper::get_schema(TableType::STORE_SALES).
20+
*/
21+
void append_store_sales_to_builders(
22+
const void* row,
23+
std::map<std::string, std::shared_ptr<arrow::ArrayBuilder>>& builders);
24+
25+
/**
26+
* Append an inventory row (W_INVENTORY_TBL*) to Arrow builders.
27+
* Schema matches DSDGenWrapper::get_schema(TableType::INVENTORY).
28+
*/
29+
void append_inventory_to_builders(
30+
const void* row,
31+
std::map<std::string, std::shared_ptr<arrow::ArrayBuilder>>& builders);
32+
33+
/**
34+
* Generic dispatcher by table name.
35+
*/
36+
void append_dsdgen_row_to_builders(
37+
const std::string& table_name,
38+
const void* row,
39+
std::map<std::string, std::shared_ptr<arrow::ArrayBuilder>>& builders);
40+
41+
} // namespace tpcds

include/tpch/dsdgen_wrapper.hpp

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#pragma once
2+
3+
#include <memory>
4+
#include <functional>
5+
#include <string>
6+
#include <cstdint>
7+
8+
#include <arrow/api.h>
9+
10+
namespace tpcds {
11+
12+
/**
13+
* TPC-DS table identifiers for the 24 standard W_ (warehouse) tables.
14+
* Numeric values match the generated tables.h constants (STORE_SALES=17, etc.).
15+
*/
16+
enum class TableType {
17+
CALL_CENTER = 0,
18+
CATALOG_PAGE = 1,
19+
CATALOG_RETURNS = 2,
20+
CATALOG_SALES = 3,
21+
CUSTOMER = 4,
22+
CUSTOMER_ADDRESS = 5,
23+
CUSTOMER_DEMOGRAPHICS = 6,
24+
DATE_DIM = 7,
25+
HOUSEHOLD_DEMOGRAPHICS = 8,
26+
INCOME_BAND = 9,
27+
INVENTORY = 10,
28+
ITEM = 11,
29+
PROMOTION = 12,
30+
REASON = 13,
31+
SHIP_MODE = 14,
32+
STORE = 15,
33+
STORE_RETURNS = 16,
34+
STORE_SALES = 17,
35+
TIME_DIM = 18,
36+
WAREHOUSE = 19,
37+
WEB_PAGE = 20,
38+
WEB_RETURNS = 21,
39+
WEB_SALES = 22,
40+
WEB_SITE = 23,
41+
COUNT_
42+
};
43+
44+
/**
45+
* C++ wrapper around the TPC-DS dsdgen reference implementation.
46+
*
47+
* Initializes dsdgen global state (embedded distribution data, scale factor,
48+
* RNG seeds) and provides per-table generation methods with callback API.
49+
*
50+
* THREAD-SAFETY: NOT thread-safe. dsdgen uses global mutable state.
51+
* Use one DSDGenWrapper per process, generate tables sequentially.
52+
*/
53+
class DSDGenWrapper {
54+
public:
55+
/**
56+
* Construct wrapper for the given scale factor.
57+
* @param scale_factor TPC-DS scale factor (1 = ~1GB baseline)
58+
* @param verbose Print verbose diagnostic messages
59+
*/
60+
explicit DSDGenWrapper(long scale_factor, bool verbose = false);
61+
~DSDGenWrapper();
62+
63+
DSDGenWrapper(const DSDGenWrapper&) = delete;
64+
DSDGenWrapper& operator=(const DSDGenWrapper&) = delete;
65+
66+
/**
67+
* Generate store_sales rows.
68+
* Calls callback once per row with a const W_STORE_SALES_TBL*.
69+
* @param callback Invoked for each generated row.
70+
* @param max_rows Limit; -1 or 0 means generate all rows.
71+
*/
72+
void generate_store_sales(
73+
std::function<void(const void* row)> callback,
74+
long max_rows = -1);
75+
76+
/**
77+
* Generate inventory rows.
78+
* Calls callback once per row with a const W_INVENTORY_TBL*.
79+
*/
80+
void generate_inventory(
81+
std::function<void(const void* row)> callback,
82+
long max_rows = -1);
83+
84+
long scale_factor() const { return scale_factor_; }
85+
86+
/**
87+
* Return the Arrow schema for a table type.
88+
*/
89+
static std::shared_ptr<arrow::Schema> get_schema(TableType table);
90+
91+
/**
92+
* Return expected row count for a table at the given scale factor.
93+
* Uses dsdgen's get_rowcount() after initialization.
94+
*/
95+
long get_row_count(TableType table) const;
96+
97+
/**
98+
* Return the dsdgen integer table ID for a TableType.
99+
*/
100+
static int table_id(TableType table);
101+
102+
/**
103+
* Return the canonical lower-case table name string.
104+
*/
105+
static std::string table_name(TableType table);
106+
107+
private:
108+
long scale_factor_;
109+
bool verbose_;
110+
bool initialized_;
111+
std::string tmp_dist_path_; // path to temporary tpcds.idx file
112+
113+
void init_dsdgen();
114+
};
115+
116+
} // namespace tpcds

0 commit comments

Comments
 (0)