Skip to content

Commit d5286a6

Browse files
add parquet merge support through a K way merge streaming merge sort
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
1 parent ec4dd15 commit d5286a6

38 files changed

Lines changed: 3919 additions & 367 deletions

sandbox/libs/dataformat-native/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ once_cell = "1.21.3"
5151
crc32fast = "1.4"
5252
parking_lot = "0.12.5"
5353
lazy_static = "1.4.0"
54+
rayon = "1.10"
5455
criterion = { version = "0.5", features = ["async_tokio"] }
5556

5657
# Internal

sandbox/plugins/parquet-data-format/benchmarks/src/main/java/org/opensearch/parquet/benchmark/VSRRotationBenchmark.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010

1111
import org.apache.arrow.vector.types.pojo.Field;
1212
import org.apache.arrow.vector.types.pojo.Schema;
13+
import org.opensearch.Version;
14+
import org.opensearch.cluster.metadata.IndexMetadata;
1315
import org.opensearch.common.settings.Settings;
16+
import org.opensearch.index.IndexSettings;
1417
import org.opensearch.index.mapper.KeywordFieldMapper;
1518
import org.opensearch.index.mapper.MappedFieldType;
1619
import org.opensearch.index.mapper.NumberFieldMapper;
@@ -80,6 +83,7 @@ public class VSRRotationBenchmark {
8083
private List<MappedFieldType> fieldTypes;
8184
private VSRManager vsrManager;
8285
private String filePath;
86+
private IndexSettings indexSettings;
8387

8488
@Setup(Level.Trial)
8589
public void setupTrial() {
@@ -123,7 +127,10 @@ public void setupTrial() {
123127
public void setup() throws IOException {
124128
bufferPool = new ArrowBufferPool(Settings.EMPTY);
125129
filePath = Path.of(System.getProperty("java.io.tmpdir"), "benchmark_vsr_" + System.nanoTime() + ".parquet").toString();
126-
vsrManager = new VSRManager(filePath, schema, bufferPool, maxRowsPerVSR, threadPool, runAsync);
130+
Settings idxSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT).build();
131+
IndexMetadata indexMetadata = IndexMetadata.builder("benchmark-index").settings(idxSettings).build();
132+
indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY);
133+
vsrManager = new VSRManager(filePath, indexSettings, schema, bufferPool, maxRowsPerVSR, threadPool, runAsync);
127134
}
128135

129136
@Benchmark

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,40 +9,81 @@
99
package org.opensearch.parquet;
1010

1111
import org.opensearch.common.settings.Setting;
12+
import org.opensearch.common.settings.Settings;
13+
import org.opensearch.core.common.unit.ByteSizeUnit;
14+
import org.opensearch.core.common.unit.ByteSizeValue;
1215

1316
import java.util.List;
1417

1518
/**
16-
* Node-scoped settings for the Parquet data format plugin.
17-
*
18-
* <p>All settings are registered with OpenSearch via
19-
* {@link ParquetDataFormatPlugin#getSettings()} and can be configured in
20-
* {@code opensearch.yml} or via cluster settings API.
21-
*
22-
* <ul>
23-
* <li>{@link #MAX_NATIVE_ALLOCATION} — Maximum native memory allocation for Arrow buffers,
24-
* expressed as a percentage of available non-heap system memory (default {@code "10%"}).</li>
25-
* <li>{@link #MAX_ROWS_PER_VSR} — Row count threshold that triggers VectorSchemaRoot rotation
26-
* during document ingestion (default {@code 50000}).</li>
27-
* </ul>
19+
* Settings for Parquet data format.
2820
*/
2921
public final class ParquetSettings {
3022

3123
private ParquetSettings() {}
3224

33-
/** Default maximum native memory allocation as a percentage of available non-heap memory. */
3425
public static final String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
35-
/** Default maximum number of rows per VectorSchemaRoot before rotation. */
3626
public static final int DEFAULT_MAX_ROWS_PER_VSR = 50000;
3727

38-
/** Maximum native memory allocation for Arrow buffers, as a percentage of non-heap memory. */
28+
/** Group setting prefix for all Parquet settings. */
29+
public static final Setting<Settings> PARQUET_SETTINGS = Setting.groupSetting(
30+
"parquet.",
31+
Setting.Property.IndexScope
32+
);
33+
34+
/** Maximum row group size in bytes (default 128MB). */
35+
public static final Setting<ByteSizeValue> ROW_GROUP_SIZE_BYTES = Setting.byteSizeSetting(
36+
"parquet.row_group_size_bytes",
37+
new ByteSizeValue(128, ByteSizeUnit.MB),
38+
Setting.Property.IndexScope
39+
);
40+
41+
/** Data page size limit in bytes (default 1MB). */
42+
public static final Setting<ByteSizeValue> PAGE_SIZE_BYTES = Setting.byteSizeSetting(
43+
"parquet.page_size_bytes",
44+
new ByteSizeValue(1, ByteSizeUnit.MB),
45+
Setting.Property.IndexScope
46+
);
47+
48+
/** Maximum number of rows per data page (default 20000). */
49+
public static final Setting<Integer> PAGE_ROW_LIMIT = Setting.intSetting(
50+
"parquet.page_row_limit",
51+
20000,
52+
1,
53+
Setting.Property.IndexScope
54+
);
55+
56+
/** Dictionary page size limit in bytes (default 2MB). */
57+
public static final Setting<ByteSizeValue> DICT_SIZE_BYTES = Setting.byteSizeSetting(
58+
"parquet.dict_size_bytes",
59+
new ByteSizeValue(2, ByteSizeUnit.MB),
60+
Setting.Property.IndexScope
61+
);
62+
63+
/** Compression codec for Parquet files, e.g. ZSTD, SNAPPY, LZ4_RAW (default LZ4_RAW). */
64+
public static final Setting<String> COMPRESSION_TYPE = Setting.simpleString(
65+
"parquet.compression_type",
66+
"LZ4_RAW",
67+
Setting.Property.IndexScope
68+
);
69+
70+
/** Compression level for the chosen codec (default 2, range 1–9). */
71+
public static final Setting<Integer> COMPRESSION_LEVEL = Setting.intSetting(
72+
"parquet.compression_level",
73+
2,
74+
1,
75+
9,
76+
Setting.Property.IndexScope
77+
);
78+
79+
/** Maximum native memory allocation for Arrow buffers, as a percentage of non-heap memory (default 10%). */
3980
public static final Setting<String> MAX_NATIVE_ALLOCATION = Setting.simpleString(
4081
"parquet.max_native_allocation",
4182
DEFAULT_MAX_NATIVE_ALLOCATION,
4283
Setting.Property.NodeScope
4384
);
4485

45-
/** Maximum number of rows per VectorSchemaRoot before rotation is triggered. */
86+
/** Maximum rows per VectorSchemaRoot before rotation is triggered (default 50000). */
4687
public static final Setting<Integer> MAX_ROWS_PER_VSR = Setting.intSetting(
4788
"parquet.max_rows_per_vsr",
4889
DEFAULT_MAX_ROWS_PER_VSR,
@@ -52,6 +93,11 @@ private ParquetSettings() {}
5293

5394
/** Returns all settings defined by the Parquet plugin. */
5495
public static List<Setting<?>> getSettings() {
55-
return List.of(MAX_NATIVE_ALLOCATION, MAX_ROWS_PER_VSR);
96+
return List.of(
97+
PARQUET_SETTINGS,
98+
ROW_GROUP_SIZE_BYTES, PAGE_SIZE_BYTES, PAGE_ROW_LIMIT, DICT_SIZE_BYTES,
99+
COMPRESSION_TYPE, COMPRESSION_LEVEL,
100+
MAX_NATIVE_ALLOCATION, MAX_ROWS_PER_VSR
101+
);
56102
}
57103
}

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/NativeParquetWriter.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111
import org.opensearch.common.SetOnce;
1212

1313
import java.io.IOException;
14+
import java.util.List;
1415
import java.util.concurrent.atomic.AtomicBoolean;
1516

1617
/**
1718
* Type-safe handle for the native Rust Parquet writer with lifecycle management.
1819
*
1920
* <p>Wraps the stateless JNI methods in {@link RustBridge} with a file-scoped lifecycle:
2021
* <ol>
21-
* <li>{@code new NativeParquetWriter(filePath, schemaAddress)} — creates the native writer</li>
22+
* <li>{@code new NativeParquetWriter(filePath, indexName, schemaAddress, sortColumns, reverseSorts, nullsFirst)} — creates the native writer</li>
2223
* <li>{@link #write(long, long)} — sends one or more Arrow batches (repeatable)</li>
2324
* <li>{@link #flush()} — finalizes the Parquet file and returns metadata</li>
2425
* <li>{@link #sync()} — fsyncs the file to durable storage (calls flush if needed)</li>
@@ -37,12 +38,23 @@ public class NativeParquetWriter {
3738
* Creates a new NativeParquetWriter.
3839
*
3940
* @param filePath the path to the Parquet file to write
41+
* @param indexName the index name for settings lookup
4042
* @param schemaAddress the native memory address of the Arrow schema
43+
* @param sortColumns the columns to sort by, or empty list for no sorting
44+
* @param reverseSorts whether each sort column is descending, or empty list
45+
* @param nullsFirst whether nulls sort first for each column, or empty list
4146
* @throws IOException if the native writer creation fails
4247
*/
43-
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
48+
public NativeParquetWriter(
49+
String filePath,
50+
String indexName,
51+
long schemaAddress,
52+
List<String> sortColumns,
53+
List<Boolean> reverseSorts,
54+
List<Boolean> nullsFirst
55+
) throws IOException {
4456
this.filePath = filePath;
45-
RustBridge.createWriter(filePath, schemaAddress);
57+
RustBridge.createWriter(filePath, indexName, schemaAddress, sortColumns, reverseSorts, nullsFirst);
4658
}
4759

4860
/**
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.parquet.bridge;
10+
11+
/**
12+
* Immutable settings passed to the native Rust writer via JNI.
13+
* The Rust side reads values through the getter methods.
14+
* All fields are nullable; the native side falls back to defaults when null.
15+
*/
16+
public class NativeSettings {
17+
18+
private final String indexName;
19+
private final String compressionType;
20+
private final Integer compressionLevel;
21+
private final Long pageSizeBytes;
22+
private final Integer pageRowLimit;
23+
private final Long dictSizeBytes;
24+
private final Long rowGroupSizeBytes;
25+
private final Boolean bloomFilterEnabled;
26+
private final Double bloomFilterFpp;
27+
private final Long bloomFilterNdv;
28+
29+
private NativeSettings(Builder builder) {
30+
this.indexName = builder.indexName;
31+
this.compressionType = builder.compressionType;
32+
this.compressionLevel = builder.compressionLevel;
33+
this.pageSizeBytes = builder.pageSizeBytes;
34+
this.pageRowLimit = builder.pageRowLimit;
35+
this.dictSizeBytes = builder.dictSizeBytes;
36+
this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
37+
this.bloomFilterEnabled = builder.bloomFilterEnabled;
38+
this.bloomFilterFpp = builder.bloomFilterFpp;
39+
this.bloomFilterNdv = builder.bloomFilterNdv;
40+
}
41+
42+
public String getIndexName() { return indexName; }
43+
public String getCompressionType() { return compressionType; }
44+
public Integer getCompressionLevel() { return compressionLevel; }
45+
public Long getPageSizeBytes() { return pageSizeBytes; }
46+
public Integer getPageRowLimit() { return pageRowLimit; }
47+
public Long getDictSizeBytes() { return dictSizeBytes; }
48+
public Long getRowGroupSizeBytes() { return rowGroupSizeBytes; }
49+
public Boolean getBloomFilterEnabled() { return bloomFilterEnabled; }
50+
public Double getBloomFilterFpp() { return bloomFilterFpp; }
51+
public Long getBloomFilterNdv() { return bloomFilterNdv; }
52+
53+
public static Builder builder() { return new Builder(); }
54+
55+
public static class Builder {
56+
private String indexName;
57+
private String compressionType;
58+
private Integer compressionLevel;
59+
private Long pageSizeBytes;
60+
private Integer pageRowLimit;
61+
private Long dictSizeBytes;
62+
private Long rowGroupSizeBytes;
63+
private Boolean bloomFilterEnabled;
64+
private Double bloomFilterFpp;
65+
private Long bloomFilterNdv;
66+
67+
public Builder indexName(String v) { this.indexName = v; return this; }
68+
public Builder compressionType(String v) { this.compressionType = v; return this; }
69+
public Builder compressionLevel(Integer v) { this.compressionLevel = v; return this; }
70+
public Builder pageSizeBytes(Long v) { this.pageSizeBytes = v; return this; }
71+
public Builder pageRowLimit(Integer v) { this.pageRowLimit = v; return this; }
72+
public Builder dictSizeBytes(Long v) { this.dictSizeBytes = v; return this; }
73+
public Builder rowGroupSizeBytes(Long v) { this.rowGroupSizeBytes = v; return this; }
74+
public Builder bloomFilterEnabled(Boolean v) { this.bloomFilterEnabled = v; return this; }
75+
public Builder bloomFilterFpp(Double v) { this.bloomFilterFpp = v; return this; }
76+
public Builder bloomFilterNdv(Long v) { this.bloomFilterNdv = v; return this; }
77+
78+
public NativeSettings build() { return new NativeSettings(this); }
79+
}
80+
}

0 commit comments

Comments
 (0)