Skip to content

Commit d97d3ce

Browse files
add IntegTests, CRC in merge and address comments
Signed-off-by: Shailesh-Kumar-Singh <shaileshkumarsingh260@gmail.com>
1 parent d47427d commit d97d3ce

23 files changed

Lines changed: 680 additions & 223 deletions

File tree

gradle.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,3 @@ systemProp.jdk.tls.client.protocols=TLSv1.2,TLSv1.3
3232

3333
# jvm args for faster test execution by default
3434
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m
35-
systemProp.sandbox.enabled=true

sandbox/plugins/composite-engine/src/internalClusterTest/java/org/opensearch/composite/CompositeMergeIT.java

Lines changed: 211 additions & 66 deletions
Large diffs are not rendered by default.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,6 @@ private ParquetSettings() {}
2828
/** Group setting prefix for all Parquet settings. */
2929
public static final Setting<Settings> PARQUET_SETTINGS = Setting.groupSetting("index.parquet.", Setting.Property.IndexScope);
3030

31-
/** Maximum row group size in bytes (default 128MB). */
32-
public static final Setting<ByteSizeValue> ROW_GROUP_SIZE_BYTES = Setting.byteSizeSetting(
33-
"index.parquet.row_group_size_bytes",
34-
new ByteSizeValue(128, ByteSizeUnit.MB),
35-
Setting.Property.IndexScope
36-
);
37-
3831
/** Data page size limit in bytes (default 1MB). */
3932
public static final Setting<ByteSizeValue> PAGE_SIZE_BYTES = Setting.byteSizeSetting(
4033
"index.parquet.page_size_bytes",
@@ -127,11 +120,42 @@ private ParquetSettings() {}
127120
Setting.Property.IndexScope
128121
);
129122

123+
/** Maximum number of rows per row group (default 1000000). */
124+
public static final Setting<Integer> ROW_GROUP_MAX_ROWS = Setting.intSetting(
125+
"index.parquet.row_group_max_rows",
126+
1_000_000,
127+
1,
128+
Setting.Property.IndexScope
129+
);
130+
131+
/** Batch size for reading records during merge (default 100000 rows). */
132+
public static final Setting<Integer> MERGE_BATCH_SIZE = Setting.intSetting(
133+
"index.parquet.merge_batch_size",
134+
100_000,
135+
1,
136+
Setting.Property.IndexScope
137+
);
138+
139+
/** Number of Rayon threads for parallel column encoding during merge (default num_cores/8, min 1). */
140+
public static final Setting<Integer> MERGE_RAYON_THREADS = Setting.intSetting(
141+
"parquet.merge_rayon_threads",
142+
Math.max(1, Runtime.getRuntime().availableProcessors() / 8),
143+
1,
144+
Setting.Property.NodeScope
145+
);
146+
147+
/** Number of Tokio IO threads for async disk writes during merge (default num_cores/8, min 1). */
148+
public static final Setting<Integer> MERGE_IO_THREADS = Setting.intSetting(
149+
"parquet.merge_io_threads",
150+
Math.max(1, Runtime.getRuntime().availableProcessors() / 8),
151+
1,
152+
Setting.Property.NodeScope
153+
);
154+
130155
/** Returns all settings defined by the Parquet plugin. */
131156
public static List<Setting<?>> getSettings() {
132157
return List.of(
133158
PARQUET_SETTINGS,
134-
ROW_GROUP_SIZE_BYTES,
135159
PAGE_SIZE_BYTES,
136160
PAGE_ROW_LIMIT,
137161
DICT_SIZE_BYTES,
@@ -143,7 +167,11 @@ public static List<Setting<?>> getSettings() {
143167
MAX_NATIVE_ALLOCATION,
144168
MAX_ROWS_PER_VSR,
145169
SORT_IN_MEMORY_THRESHOLD,
146-
SORT_BATCH_SIZE
170+
SORT_BATCH_SIZE,
171+
ROW_GROUP_MAX_ROWS,
172+
MERGE_BATCH_SIZE,
173+
MERGE_RAYON_THREADS,
174+
MERGE_IO_THREADS
147175
);
148176
}
149177
}

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/NativeSettings.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@ public class NativeSettings {
2121
private final Long pageSizeBytes;
2222
private final Integer pageRowLimit;
2323
private final Long dictSizeBytes;
24-
private final Long rowGroupSizeBytes;
2524
private final Boolean bloomFilterEnabled;
2625
private final Double bloomFilterFpp;
2726
private final Long bloomFilterNdv;
2827
private final Long sortInMemoryThresholdBytes;
2928
private final Integer sortBatchSize;
29+
private final Integer rowGroupMaxRows;
30+
private final Integer mergeBatchSize;
31+
private final Integer mergeRayonThreads;
32+
private final Integer mergeIoThreads;
3033

3134
private NativeSettings(Builder builder) {
3235
this.indexName = builder.indexName;
@@ -35,12 +38,15 @@ private NativeSettings(Builder builder) {
3538
this.pageSizeBytes = builder.pageSizeBytes;
3639
this.pageRowLimit = builder.pageRowLimit;
3740
this.dictSizeBytes = builder.dictSizeBytes;
38-
this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
3941
this.bloomFilterEnabled = builder.bloomFilterEnabled;
4042
this.bloomFilterFpp = builder.bloomFilterFpp;
4143
this.bloomFilterNdv = builder.bloomFilterNdv;
4244
this.sortInMemoryThresholdBytes = builder.sortInMemoryThresholdBytes;
4345
this.sortBatchSize = builder.sortBatchSize;
46+
this.rowGroupMaxRows = builder.rowGroupMaxRows;
47+
this.mergeBatchSize = builder.mergeBatchSize;
48+
this.mergeRayonThreads = builder.mergeRayonThreads;
49+
this.mergeIoThreads = builder.mergeIoThreads;
4450
}
4551

4652
public String getIndexName() {
@@ -67,10 +73,6 @@ public Long getDictSizeBytes() {
6773
return dictSizeBytes;
6874
}
6975

70-
public Long getRowGroupSizeBytes() {
71-
return rowGroupSizeBytes;
72-
}
73-
7476
public Boolean getBloomFilterEnabled() {
7577
return bloomFilterEnabled;
7678
}
@@ -91,6 +93,22 @@ public Integer getSortBatchSize() {
9193
return sortBatchSize;
9294
}
9395

96+
public Integer getRowGroupMaxRows() {
97+
return rowGroupMaxRows;
98+
}
99+
100+
public Integer getMergeBatchSize() {
101+
return mergeBatchSize;
102+
}
103+
104+
public Integer getMergeRayonThreads() {
105+
return mergeRayonThreads;
106+
}
107+
108+
public Integer getMergeIoThreads() {
109+
return mergeIoThreads;
110+
}
111+
94112
public static Builder builder() {
95113
return new Builder();
96114
}
@@ -102,12 +120,15 @@ public static class Builder {
102120
private Long pageSizeBytes;
103121
private Integer pageRowLimit;
104122
private Long dictSizeBytes;
105-
private Long rowGroupSizeBytes;
106123
private Boolean bloomFilterEnabled;
107124
private Double bloomFilterFpp;
108125
private Long bloomFilterNdv;
109126
private Long sortInMemoryThresholdBytes;
110127
private Integer sortBatchSize;
128+
private Integer rowGroupMaxRows;
129+
private Integer mergeBatchSize;
130+
private Integer mergeRayonThreads;
131+
private Integer mergeIoThreads;
111132

112133
public Builder indexName(String v) {
113134
this.indexName = v;
@@ -139,11 +160,6 @@ public Builder dictSizeBytes(Long v) {
139160
return this;
140161
}
141162

142-
public Builder rowGroupSizeBytes(Long v) {
143-
this.rowGroupSizeBytes = v;
144-
return this;
145-
}
146-
147163
public Builder bloomFilterEnabled(Boolean v) {
148164
this.bloomFilterEnabled = v;
149165
return this;
@@ -169,6 +185,26 @@ public Builder sortBatchSize(Integer v) {
169185
return this;
170186
}
171187

188+
public Builder rowGroupMaxRows(Integer v) {
189+
this.rowGroupMaxRows = v;
190+
return this;
191+
}
192+
193+
public Builder mergeBatchSize(Integer v) {
194+
this.mergeBatchSize = v;
195+
return this;
196+
}
197+
198+
public Builder mergeRayonThreads(Integer v) {
199+
this.mergeRayonThreads = v;
200+
return this;
201+
}
202+
203+
public Builder mergeIoThreads(Integer v) {
204+
this.mergeIoThreads = v;
205+
return this;
206+
}
207+
172208
public NativeSettings build() {
173209
return new NativeSettings(this);
174210
}

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class RustBridge {
3333
private static final MethodHandle ON_SETTINGS_UPDATE;
3434
private static final MethodHandle REMOVE_SETTINGS;
3535
private static final MethodHandle MERGE_FILES;
36+
private static final MethodHandle READ_AS_JSON;
3637

3738
static {
3839
SymbolLookup lib = NativeLibraryLoader.symbolLookup();
@@ -112,12 +113,15 @@ public class RustBridge {
112113
ValueLayout.JAVA_LONG, // page_size_bytes
113114
ValueLayout.JAVA_LONG, // page_row_limit
114115
ValueLayout.JAVA_LONG, // dict_size_bytes
115-
ValueLayout.JAVA_LONG, // row_group_size_bytes
116116
ValueLayout.JAVA_LONG, // bloom_filter_enabled
117117
ValueLayout.JAVA_DOUBLE, // bloom_filter_fpp
118118
ValueLayout.JAVA_LONG, // bloom_filter_ndv
119119
ValueLayout.JAVA_LONG, // sort_in_memory_threshold_bytes
120-
ValueLayout.JAVA_LONG // sort_batch_size
120+
ValueLayout.JAVA_LONG, // sort_batch_size
121+
ValueLayout.JAVA_LONG, // row_group_max_rows
122+
ValueLayout.JAVA_LONG, // merge_batch_size
123+
ValueLayout.JAVA_LONG, // merge_rayon_threads
124+
ValueLayout.JAVA_LONG // merge_io_threads
121125
)
122126
);
123127
REMOVE_SETTINGS = linker.downcallHandle(
@@ -137,6 +141,17 @@ public class RustBridge {
137141
ValueLayout.JAVA_LONG // index_name
138142
)
139143
);
144+
READ_AS_JSON = linker.downcallHandle(
145+
lib.find("parquet_read_as_json").orElseThrow(),
146+
FunctionDescriptor.of(
147+
ValueLayout.JAVA_LONG,
148+
ValueLayout.ADDRESS,
149+
ValueLayout.JAVA_LONG, // file
150+
ValueLayout.ADDRESS, // out_buf
151+
ValueLayout.JAVA_LONG, // buf_capacity
152+
ValueLayout.ADDRESS // out_len
153+
)
154+
);
140155
}
141156

142157
public static void initLogger() {}
@@ -251,12 +266,15 @@ public static void onSettingsUpdate(NativeSettings nativeSettings) throws IOExce
251266
nativeSettings.getPageSizeBytes() != null ? nativeSettings.getPageSizeBytes() : -1L,
252267
nativeSettings.getPageRowLimit() != null ? (long) nativeSettings.getPageRowLimit() : -1L,
253268
nativeSettings.getDictSizeBytes() != null ? nativeSettings.getDictSizeBytes() : -1L,
254-
nativeSettings.getRowGroupSizeBytes() != null ? nativeSettings.getRowGroupSizeBytes() : -1L,
255269
nativeSettings.getBloomFilterEnabled() != null ? (nativeSettings.getBloomFilterEnabled() ? 1L : 0L) : -1L,
256270
nativeSettings.getBloomFilterFpp() != null ? nativeSettings.getBloomFilterFpp() : -1.0,
257271
nativeSettings.getBloomFilterNdv() != null ? nativeSettings.getBloomFilterNdv() : -1L,
258272
nativeSettings.getSortInMemoryThresholdBytes() != null ? nativeSettings.getSortInMemoryThresholdBytes() : -1L,
259-
nativeSettings.getSortBatchSize() != null ? (long) nativeSettings.getSortBatchSize() : -1L
273+
nativeSettings.getSortBatchSize() != null ? (long) nativeSettings.getSortBatchSize() : -1L,
274+
nativeSettings.getRowGroupMaxRows() != null ? (long) nativeSettings.getRowGroupMaxRows() : -1L,
275+
nativeSettings.getMergeBatchSize() != null ? (long) nativeSettings.getMergeBatchSize() : -1L,
276+
nativeSettings.getMergeRayonThreads() != null ? (long) nativeSettings.getMergeRayonThreads() : -1L,
277+
nativeSettings.getMergeIoThreads() != null ? (long) nativeSettings.getMergeIoThreads() : -1L
260278
);
261279
}
262280
}
@@ -291,5 +309,20 @@ private static java.lang.foreign.MemorySegment marshalBoolList(NativeCall call,
291309
return seg;
292310
}
293311

312+
/**
313+
* Reads a parquet file and returns its contents as a JSON string.
314+
*/
315+
public static String readAsJson(String file) throws IOException {
316+
try (var call = new NativeCall()) {
317+
var f = call.str(file);
318+
int bufSize = 10 * 1024 * 1024; // 10MB
319+
var outBuf = call.buf(bufSize);
320+
var outLen = call.longOut();
321+
call.invokeIO(READ_AS_JSON, f.segment(), f.len(), outBuf, (long) bufSize, outLen);
322+
int len = (int) outLen.get(ValueLayout.JAVA_LONG, 0);
323+
return new String(outBuf.asSlice(0, len).toArray(ValueLayout.JAVA_BYTE), StandardCharsets.UTF_8);
324+
}
325+
}
326+
294327
private RustBridge() {}
295328
}

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/engine/ParquetIndexingEngine.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import org.opensearch.parquet.bridge.NativeSettings;
2828
import org.opensearch.parquet.bridge.RustBridge;
2929
import org.opensearch.parquet.memory.ArrowBufferPool;
30+
import org.opensearch.parquet.merge.NativeParquetMergeStrategy;
3031
import org.opensearch.parquet.merge.ParquetMergeExecutor;
31-
import org.opensearch.parquet.merge.StreamingParquetMergeStrategy;
3232
import org.opensearch.parquet.writer.ParquetDocumentInput;
3333
import org.opensearch.parquet.writer.ParquetWriter;
3434
import org.opensearch.threadpool.ThreadPool;
@@ -73,6 +73,7 @@ public class ParquetIndexingEngine implements IndexingExecutionEngine<ParquetDat
7373
private final Supplier<Schema> schemaSupplier;
7474
private final ArrowBufferPool bufferPool;
7575
private final IndexSettings indexSettings;
76+
private final Settings nodeSettings;
7677
private final ThreadPool threadPool;
7778
private final FormatChecksumStrategy checksumStrategy;
7879
private final Merger parquetMerger;
@@ -127,6 +128,7 @@ public ParquetIndexingEngine(
127128
this.schemaSupplier = schemaSupplier;
128129
this.bufferPool = new ArrowBufferPool(settings);
129130
this.indexSettings = indexSettings;
131+
this.nodeSettings = settings;
130132
this.threadPool = threadPool;
131133
this.checksumStrategy = checksumStrategy;
132134
try {
@@ -137,7 +139,7 @@ public ParquetIndexingEngine(
137139
throw new RuntimeException(e);
138140
}
139141
this.parquetMerger = new ParquetMergeExecutor(
140-
new StreamingParquetMergeStrategy(dataFormat, indexSettings.getIndex().getName(), shardPath.getDataPath())
142+
new NativeParquetMergeStrategy(dataFormat, indexSettings.getIndex().getName(), shardPath.getDataPath())
141143
);
142144
pushSettingsToRust();
143145
}
@@ -160,12 +162,15 @@ private void pushSettingsToRust() {
160162
.pageSizeBytes(ParquetSettings.PAGE_SIZE_BYTES.get(settings).getBytes())
161163
.pageRowLimit(ParquetSettings.PAGE_ROW_LIMIT.get(settings))
162164
.dictSizeBytes(ParquetSettings.DICT_SIZE_BYTES.get(settings).getBytes())
163-
.rowGroupSizeBytes(ParquetSettings.ROW_GROUP_SIZE_BYTES.get(settings).getBytes())
164165
.bloomFilterEnabled(ParquetSettings.BLOOM_FILTER_ENABLED.get(settings))
165166
.bloomFilterFpp(ParquetSettings.BLOOM_FILTER_FPP.get(settings))
166167
.bloomFilterNdv(ParquetSettings.BLOOM_FILTER_NDV.get(settings))
167168
.sortInMemoryThresholdBytes(ParquetSettings.SORT_IN_MEMORY_THRESHOLD.get(settings).getBytes())
168169
.sortBatchSize(ParquetSettings.SORT_BATCH_SIZE.get(settings))
170+
.rowGroupMaxRows(ParquetSettings.ROW_GROUP_MAX_ROWS.get(settings))
171+
.mergeBatchSize(ParquetSettings.MERGE_BATCH_SIZE.get(settings))
172+
.mergeRayonThreads(ParquetSettings.MERGE_RAYON_THREADS.get(nodeSettings))
173+
.mergeIoThreads(ParquetSettings.MERGE_IO_THREADS.get(nodeSettings))
169174
.build();
170175
try {
171176
RustBridge.onSettingsUpdate(config);

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/fields/ArrowSchemaBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.arrow.vector.types.pojo.Schema;
1313
import org.apache.logging.log4j.LogManager;
1414
import org.apache.logging.log4j.Logger;
15+
import org.opensearch.index.engine.dataformat.DocumentInput;
1516
import org.opensearch.index.mapper.FieldNamesFieldMapper;
1617
import org.opensearch.index.mapper.IndexFieldMapper;
1718
import org.opensearch.index.mapper.Mapper;
@@ -57,7 +58,7 @@ public static Schema getSchema(MapperService mapperService) {
5758
}
5859
// Add row ID field (long)
5960
LongParquetField longField = new LongParquetField();
60-
fields.add(new Field("_row_id", longField.getFieldType(), null));
61+
fields.add(new Field(DocumentInput.ROW_ID_FIELD, longField.getFieldType(), null));
6162
return new Schema(fields);
6263
}
6364

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/merge/StreamingParquetMergeStrategy.java renamed to sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/merge/NativeParquetMergeStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,15 @@
2828
/**
2929
* Implements merging of Parquet files.
3030
*/
31-
public class StreamingParquetMergeStrategy implements ParquetMergeStrategy {
31+
public class NativeParquetMergeStrategy implements ParquetMergeStrategy {
3232

33-
private static final Logger logger = LogManager.getLogger(StreamingParquetMergeStrategy.class);
33+
private static final Logger logger = LogManager.getLogger(NativeParquetMergeStrategy.class);
3434

3535
private final DataFormat dataFormat;
3636
private final String indexName;
3737
private final Path shardDataPath;
3838

39-
public StreamingParquetMergeStrategy(DataFormat dataFormat, String indexName, Path shardDataPath) {
39+
public NativeParquetMergeStrategy(DataFormat dataFormat, String indexName, Path shardDataPath) {
4040
this.dataFormat = dataFormat;
4141
this.indexName = indexName;
4242
this.shardDataPath = shardDataPath;

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/vsr/VSRManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.apache.logging.log4j.LogManager;
1515
import org.apache.logging.log4j.Logger;
1616
import org.opensearch.index.IndexSettings;
17+
import org.opensearch.index.engine.dataformat.DocumentInput;
1718
import org.opensearch.index.mapper.MappedFieldType;
1819
import org.opensearch.nativebridge.spi.ArrowExport;
1920
import org.opensearch.parquet.ParquetDataFormatPlugin;
@@ -130,7 +131,7 @@ public void addDocument(ParquetDocumentInput doc) throws IOException {
130131
parquetField.createField(fieldType, activeVSR, pair.getValue());
131132
}
132133
int rowIndex = activeVSR.getRowCount();
133-
BigIntVector rowIdVector = (BigIntVector) activeVSR.getVector("___row_id");
134+
BigIntVector rowIdVector = (BigIntVector) activeVSR.getVector(DocumentInput.ROW_ID_FIELD);
134135
if (rowIdVector != null) {
135136
rowIdVector.setSafe(rowIndex, doc.getRowId());
136137
}

sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ native-bridge-common = { workspace = true }
2222
rayon = { workspace = true }
2323
tokio = { workspace = true }
2424
crc32fast = { workspace = true }
25+
serde_json = { workspace = true }
2526

2627
[dev-dependencies]
2728
opensearch-parquet-format = { path = ".", features = ["test-utils"] }

0 commit comments

Comments
 (0)