Skip to content

Commit 81ec182

Browse files
Add merge support for Parquet data format plugin (#21079)
* add parquet merge support through a K way merge streaming merge sort Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * add tests Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * add ColumnMapping optimization Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * fix sync_to_disk test Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * refractor change, add ParquetSortConfig class Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * add InvokeIO in RustBridge Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * address comments and refractor changes Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * run spotlessApply Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * do spotlessApply Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> * add IntegTests, CRC in merge and address comments Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com> --------- Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
1 parent 417185a commit 81ec182

44 files changed

Lines changed: 4948 additions & 386 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ once_cell = "1.21.3"
5858
crc32fast = "1.4"
5959
parking_lot = "0.12.5"
6060
lazy_static = "1.4.0"
61+
rayon = "1.10"
6162
thiserror = "1.0"
6263
async-trait = "0.1"
6364
bytes = "1"

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Lines changed: 211 additions & 67 deletions
Large diffs are not rendered by default.

sandbox/plugins/parquet-data-format/benchmarks/src/main/java/org/opensearch/parquet/benchmark/VSRRotationBenchmark.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
import org.apache.arrow.vector.types.pojo.Field;
1212
import org.apache.arrow.vector.types.pojo.Schema;
13+
import org.opensearch.Version;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
1315
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.index.IndexSettings;
1417
import org.opensearch.index.mapper.KeywordFieldMapper;
1518
import org.opensearch.index.mapper.MappedFieldType;
1619
import org.opensearch.index.mapper.NumberFieldMapper;
@@ -80,6 +83,7 @@ public class VSRRotationBenchmark {
8083
private List<MappedFieldType> fieldTypes;
8184
private VSRManager vsrManager;
8285
private String filePath;
86+
private IndexSettings indexSettings;
8387

8488
@Setup(Level.Trial)
8589
public void setupTrial() {
@@ -123,7 +127,10 @@ public void setupTrial() {
123127
public void setup() throws IOException {
124128
bufferPool = new ArrowBufferPool(Settings.EMPTY);
125129
filePath = Path.of(System.getProperty("java.io.tmpdir"), "benchmark_vsr_" + System.nanoTime() + ".parquet").toString();
126-
vsrManager = new VSRManager(filePath, schema, bufferPool, maxRowsPerVSR, threadPool, runAsync);
130+
Settings idxSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build();
131+
IndexMetadata indexMetadata = IndexMetadata.builder("benchmark-index").settings(idxSettings).build();
132+
indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
133+
vsrManager = new VSRManager(filePath, indexSettings, schema, bufferPool, maxRowsPerVSR, threadPool, runAsync);
127134
}
128135

129136
@Benchmark

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java

Lines changed: 137 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,49 +9,169 @@
99
package org.opensearch.parquet;
1010

1111
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.core.common.unit.ByteSizeUnit;
14+
import org.opensearch.core.common.unit.ByteSizeValue;
1215

1316
import java.util.List;
1417

1518
/**
16-
* Node-scoped settings for the Parquet data format plugin.
17-
*
18-
* <p>All settings are registered with OpenSearch via
19-
* {@link ParquetDataFormatPlugin#getSettings()} and can be configured in
20-
* {@code opensearch.yml} or via cluster settings API.
21-
*
22-
* <ul>
23-
* <li>{@link #MAX_NATIVE_ALLOCATION} — Maximum native memory allocation for Arrow buffers,
24-
* expressed as a percentage of available non-heap system memory (default {@code "10%"}).</li>
25-
* <li>{@link #MAX_ROWS_PER_VSR} — Row count threshold that triggers VectorSchemaRoot rotation
26-
* during document ingestion (default {@code 50000}).</li>
27-
* </ul>
19+
* Settings for Parquet data format.
2820
*/
2921
public final class ParquetSettings {
3022

3123
private ParquetSettings() {}
3224

33-
/** Default maximum native memory allocation as a percentage of available non-heap memory. */
3425
public static final String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
35-
/** Default maximum number of rows per VectorSchemaRoot before rotation. */
3626
public static final int DEFAULT_MAX_ROWS_PER_VSR = 50000;
3727

38-
/** Maximum native memory allocation for Arrow buffers, as a percentage of non-heap memory. */
28+
/** Group setting prefix for all Parquet settings. */
29+
public static final Setting<Settings> PARQUET_SETTINGS = Setting.groupSetting("index.parquet.", Setting.Property.IndexScope);
30+
31+
/** Data page size limit in bytes (default 1MB). */
32+
public static final Setting<ByteSizeValue> PAGE_SIZE_BYTES = Setting.byteSizeSetting(
33+
"index.parquet.page_size_bytes",
34+
new ByteSizeValue(1, ByteSizeUnit.MB),
35+
Setting.Property.IndexScope
36+
);
37+
38+
/** Maximum number of rows per data page (default 20000). */
39+
public static final Setting<Integer> PAGE_ROW_LIMIT = Setting.intSetting(
40+
"index.parquet.page_row_limit",
41+
20000,
42+
1,
43+
Setting.Property.IndexScope
44+
);
45+
46+
/** Dictionary page size limit in bytes (default 2MB). */
47+
public static final Setting<ByteSizeValue> DICT_SIZE_BYTES = Setting.byteSizeSetting(
48+
"index.parquet.dict_size_bytes",
49+
new ByteSizeValue(2, ByteSizeUnit.MB),
50+
Setting.Property.IndexScope
51+
);
52+
53+
/** Compression codec for Parquet files, e.g. ZSTD, SNAPPY, LZ4_RAW (default LZ4_RAW). */
54+
public static final Setting<String> COMPRESSION_TYPE = Setting.simpleString(
55+
"index.parquet.compression_type",
56+
"LZ4_RAW",
57+
Setting.Property.IndexScope
58+
);
59+
60+
/** Compression level for the chosen codec (default 2, range 1–9). */
61+
public static final Setting<Integer> COMPRESSION_LEVEL = Setting.intSetting(
62+
"index.parquet.compression_level",
63+
2,
64+
1,
65+
9,
66+
Setting.Property.IndexScope
67+
);
68+
69+
/** Whether bloom filters are enabled for Parquet columns (default true). */
70+
public static final Setting<Boolean> BLOOM_FILTER_ENABLED = Setting.boolSetting(
71+
"index.parquet.bloom_filter_enabled",
72+
true,
73+
Setting.Property.IndexScope
74+
);
75+
76+
/** Bloom filter false positive probability (default 0.1). */
77+
public static final Setting<Double> BLOOM_FILTER_FPP = Setting.doubleSetting(
78+
"index.parquet.bloom_filter_fpp",
79+
0.1,
80+
0.0,
81+
1.0,
82+
Setting.Property.IndexScope
83+
);
84+
85+
/** Bloom filter number of distinct values hint (default 100000). */
86+
public static final Setting<Long> BLOOM_FILTER_NDV = Setting.longSetting(
87+
"index.parquet.bloom_filter_ndv",
88+
100_000L,
89+
1L,
90+
Setting.Property.IndexScope
91+
);
92+
93+
/** Maximum native memory allocation for Arrow buffers, as a percentage of non-heap memory (default 10%). */
3994
public static final Setting<String> MAX_NATIVE_ALLOCATION = Setting.simpleString(
4095
"parquet.max_native_allocation",
4196
DEFAULT_MAX_NATIVE_ALLOCATION,
4297
Setting.Property.NodeScope
4398
);
4499

45-
/** Maximum number of rows per VectorSchemaRoot before rotation is triggered. */
100+
/** Maximum rows per VectorSchemaRoot before rotation is triggered (default 50000). */
46101
public static final Setting<Integer> MAX_ROWS_PER_VSR = Setting.intSetting(
47102
"parquet.max_rows_per_vsr",
48103
DEFAULT_MAX_ROWS_PER_VSR,
49104
1,
50105
Setting.Property.NodeScope
51106
);
52107

108+
/** File size threshold for in-memory sort vs streaming merge sort (default 32MB). */
109+
public static final Setting<ByteSizeValue> SORT_IN_MEMORY_THRESHOLD = Setting.byteSizeSetting(
110+
"index.parquet.sort_in_memory_threshold",
111+
new ByteSizeValue(32, ByteSizeUnit.MB),
112+
Setting.Property.IndexScope
113+
);
114+
115+
/** Batch size for streaming merge sort (default 8192 rows). */
116+
public static final Setting<Integer> SORT_BATCH_SIZE = Setting.intSetting(
117+
"index.parquet.sort_batch_size",
118+
8192,
119+
1,
120+
Setting.Property.IndexScope
121+
);
122+
123+
/** Maximum number of rows per row group (default 1000000). */
124+
public static final Setting<Integer> ROW_GROUP_MAX_ROWS = Setting.intSetting(
125+
"index.parquet.row_group_max_rows",
126+
1_000_000,
127+
1,
128+
Setting.Property.IndexScope
129+
);
130+
131+
/** Batch size for reading records during merge (default 100000 rows). */
132+
public static final Setting<Integer> MERGE_BATCH_SIZE = Setting.intSetting(
133+
"index.parquet.merge_batch_size",
134+
100_000,
135+
1,
136+
Setting.Property.IndexScope
137+
);
138+
139+
/** Number of Rayon threads for parallel column encoding during merge (default num_cores/8, min 1). */
140+
public static final Setting<Integer> MERGE_RAYON_THREADS = Setting.intSetting(
141+
"parquet.merge_rayon_threads",
142+
Math.max(1, Runtime.getRuntime().availableProcessors() / 8),
143+
1,
144+
Setting.Property.NodeScope
145+
);
146+
147+
/** Number of Tokio IO threads for async disk writes during merge (default num_cores/8, min 1). */
148+
public static final Setting<Integer> MERGE_IO_THREADS = Setting.intSetting(
149+
"parquet.merge_io_threads",
150+
Math.max(1, Runtime.getRuntime().availableProcessors() / 8),
151+
1,
152+
Setting.Property.NodeScope
153+
);
154+
53155
/** Returns all settings defined by the Parquet plugin. */
54156
public static List<Setting<?>> getSettings() {
55-
return List.of(MAX_NATIVE_ALLOCATION, MAX_ROWS_PER_VSR);
157+
return List.of(
158+
PARQUET_SETTINGS,
159+
PAGE_SIZE_BYTES,
160+
PAGE_ROW_LIMIT,
161+
DICT_SIZE_BYTES,
162+
COMPRESSION_TYPE,
163+
COMPRESSION_LEVEL,
164+
BLOOM_FILTER_ENABLED,
165+
BLOOM_FILTER_FPP,
166+
BLOOM_FILTER_NDV,
167+
MAX_NATIVE_ALLOCATION,
168+
MAX_ROWS_PER_VSR,
169+
SORT_IN_MEMORY_THRESHOLD,
170+
SORT_BATCH_SIZE,
171+
ROW_GROUP_MAX_ROWS,
172+
MERGE_BATCH_SIZE,
173+
MERGE_RAYON_THREADS,
174+
MERGE_IO_THREADS
175+
);
56176
}
57177
}

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/NativeParquetWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
* <p>Wraps the stateless JNI methods in {@link RustBridge} with a file-scoped lifecycle:
2020
* <ol>
21-
* <li>{@code new NativeParquetWriter(filePath, schemaAddress)} — creates the native writer</li>
21+
* <li>{@code new NativeParquetWriter(filePath, indexName, schemaAddress, sortConfig)} — creates the native writer</li>
2222
* <li>{@link #write(long, long)} — sends one or more Arrow batches (repeatable)</li>
2323
* <li>{@link #flush()} — finalizes the Parquet file and returns metadata</li>
2424
* <li>{@link #sync()} — fsyncs the file to durable storage (calls flush if needed)</li>
@@ -37,12 +37,14 @@ public class NativeParquetWriter {
3737
* Creates a new NativeParquetWriter.
3838
*
3939
* @param filePath the path to the Parquet file to write
40+
* @param indexName the index name for settings lookup
4041
* @param schemaAddress the native memory address of the Arrow schema
42+
* @param sortConfig the sort configuration for the Parquet file
4143
* @throws IOException if the native writer creation fails
4244
*/
43-
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
45+
public NativeParquetWriter(String filePath, String indexName, long schemaAddress, ParquetSortConfig sortConfig) throws IOException {
4446
this.filePath = filePath;
45-
RustBridge.createWriter(filePath, schemaAddress);
47+
RustBridge.createWriter(filePath, indexName, schemaAddress, sortConfig);
4648
}
4749

4850
/**

0 commit comments

Comments
 (0)