Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
382bd58
Initial commit of plugin engine-datafusion
mch2 Jul 31, 2025
a70103d
Fixed the build failures, added javadocs (#19398)
vinaykpud Sep 24, 2025
4109473
Add JNI layer and rust methods to execute substrait plan (#19399)
vinaykpud Sep 24, 2025
2f83116
Add License, make precommit checks work
vinaykpud Sep 24, 2025
d180365
Ensure precommit succeeds
vinaykpud Sep 24, 2025
47f6d59
Added Integration of Search with Datafusion plugin
vinaykpud Sep 25, 2025
7b62823
Add extensions for csv codec
bharath-techie Aug 7, 2025
55582ed
adding libs , data source plugin and data source aware plugin
bharath-techie Aug 15, 2025
e4e9584
search interface changes
bharath-techie Aug 25, 2025
812ad4f
Search and indexing engine integration changes - inprogesss commit
bharath-techie Sep 3, 2025
b8ddfb2
in-progress read engine / query phase abstractions
bharath-techie Sep 4, 2025
19db171
Tying searcher and reader with rust
bharath-techie Sep 5, 2025
847aa7e
Add changes for searcher integration
bharath-techie Sep 25, 2025
1276efd
Fix datafusion rust
alchemist51 Sep 26, 2025
c79e554
added global row id optimizer and tests for query phase
Sep 26, 2025
a649b6f
Feature/datafusion (#38)
bharath-techie Aug 19, 2025
803fe6a
Fixing tests
bharath-techie Sep 26, 2025
065c88d
Fix Listing Cache
alchemist51 Sep 28, 2025
c011a9c
Integrate aggregators to convert result from datafusion (#19441)
expani Sep 29, 2025
00dd39a
Changes in dataformat for CSVEngine
alchemist51 Sep 29, 2025
eba3575
Working changes (#44)
alchemist51 Sep 29, 2025
0a9b038
Changes to make plugin contexts work with source parse
bharath-techie Sep 29, 2025
93fcf57
Fixing end to end flow for pure aggregations (#19494)
expani Oct 3, 2025
02bd3cc
Indexing integration
bharath-techie Oct 3, 2025
7f6d309
removing CSV codec and integrating with parquet module
bharath-techie Oct 3, 2025
43b5937
fixes for publishToMavenLocal
bharath-techie Oct 6, 2025
958186c
add readme
bharath-techie Oct 7, 2025
ac9f5d1
Added support for average metric aggregation (#19559)
expani Oct 8, 2025
3d06c82
Fixing rust build for parquet-data-format (#19611)
raghuvanshraj Oct 13, 2025
77b320c
Commiter integration and build fixes (#19612)
bharath-techie Oct 13, 2025
f476533
Add initial draft for datafusion and plugin interaction
alchemist51 Oct 19, 2025
381da87
Minor changes for removing pub/sub model
alchemist51 Oct 21, 2025
4527d5d
Testing changes
alchemist51 Oct 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ CLAUDE.md
build-idea/
out/

modules/parquet-data-format/src/main/rust/target/*
libs/dataformat-csv/jni/target/*
libs/dataformat-csv/src/main/resources/*
plugins/dataformat-csv/src/main/resources/*
libs/dataformat-csv/jni/Cargo.lock

# include shared intellij config
!.idea/inspectionProfiles/Project_Default.xml
!.idea/runConfigurations/Debug_OpenSearch.xml
Expand Down Expand Up @@ -68,3 +74,14 @@ testfixtures_shared/

# build files generated
doc-tools/missing-doclet/bin/
/plugins/dataformat-csv/jni/target
/plugins/dataformat-csv/jni/Cargo.lock

/modules/parquet-data-format/src/main/rust/target
/modules/parquet-data-format/src/main/rust/debug
/modules/parquet-data-format/src/main/resources/native/
/modules/parquet-data-format/jni/target/debug

/modules/parquet-data-format/jni/target/release
**/Cargo.lock
/modules/parquet-data-format/jni/
24 changes: 14 additions & 10 deletions .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ public void beforeStart() {
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
// TODO : remove this - this disables assertions
node.jvmArgs(" -da ");
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
Expand Down
6 changes: 5 additions & 1 deletion gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ configure([
project(":test:fixtures:hdfs-fixture"),
project(":test:fixtures:s3-fixture"),
project(":test:framework"),
project(":test:logger-usage")
project(":test:logger-usage"),
project(":libs:opensearch-vectorized-exec-spi"), // TODO
project(":plugins:engine-datafusion"), //TODO
project(":server"),
project(":modules:parquet-data-format"),
]) {
project.tasks.withType(MissingJavadocTask) {
isExcluded = true
Expand Down
24 changes: 24 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,30 @@ testClusters {
}
}
}

if (findProperty("remotePlugins")) {
remotePlugins = Eval.me(remotePlugins)
for (String coords : remotePlugins) {
if (coords.startsWith('/') || coords.startsWith('file:')) {
// Direct file path
plugin(project.layout.file(project.provider { new File(coords) }))
} else {
// Maven coordinates
def config = project.configurations.detachedConfiguration(
project.dependencies.create(coords + '@zip')
)
config.resolutionStrategy.cacheChangingModulesFor 0, 'seconds'
project.repositories.mavenLocal()
project.repositories {
maven {
name = 'OpenSearch Snapshots'
url = 'https://central.sonatype.com/repository/maven-snapshots/'
}
}
plugin(project.layout.file(project.provider { config.singleFile }))
}
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment
Set.of(PublicApi.class, ExperimentalApi.class, DeprecatedApi.class)
);

for (var element : elements) {
validate(element);

if (!checkPackage(element)) {
continue;
}

// Skip all not-public elements
checkPublicVisibility(null, element);

if (element instanceof TypeElement) {
process((TypeElement) element);
}
}
// for (var element : elements) {
// validate(element);
//
// if (!checkPackage(element)) {
// continue;
// }
//
// // Skip all not-public elements
// checkPublicVisibility(null, element);
//
// if (element instanceof TypeElement) {
// process((TypeElement) element);
// }
// }

return false;
}
Expand Down
30 changes: 30 additions & 0 deletions libs/vectorized-exec-spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

apply plugin: 'opensearch.build'

description = 'Vectorized engine common interfaces for OpenSearch'

dependencies {
api project(':libs:opensearch-core')
api project(':libs:opensearch-common')

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'vectorized-exec-spi'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false

test {
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* DataFusion integration for OpenSearch.
* Provides JNI bindings and core functionality for DataFusion query engine.
*/
package org.opensearch.vectorized.execution;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search;

public class CatalogSearcher {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search;

import org.opensearch.common.annotation.ExperimentalApi;

/**
DataFormat supported by OpenSearch
*/
@ExperimentalApi
public enum DataFormat {
/** CSV Format*/
CSV("parquet"),
PARQUET("parquet"),

/** Text Format */
Text("text");

private final String name;

DataFormat(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search;

public class IndexReader {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search.spi;

import org.opensearch.vectorized.execution.search.DataFormat;

/**
* Listener for configuration updates.
* DataFusionPlugin implements this to receive updates from format-specific plugins (e.g., Parquet).
*/
public interface ConfigUpdateListener {
void onSessionConfigUpdate(SessionConfig sessionConfig);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search.spi;

import org.opensearch.vectorized.execution.search.DataFormat;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Service Provider Interface for DataFusion data source codecs.
* Implementations provide access to different data formats (CSV, Parquet, etc.)
* through the DataFusion query engine.
*/
public interface DataSourceCodec {

/**
* Register a directory containing data files with the runtime environment to prewarm cache
* This ideally should be used as part of each refresh - equivalent of acquire searcher
* where we register the files associated with this particular refresh point
* @param directoryPath the path to the directory containing data files
* @param fileNames the list of file names to register
* @param runtimeId the runtime environment ID
* @return a CompletableFuture that completes when registration is done
*/
CompletableFuture<Void> registerDirectory(String directoryPath, List<String> fileNames, long runtimeId);

/**
* Create a new session context for query execution.
*
* @param globalRuntimeEnvId the global runtime environment ID
* @return a CompletableFuture containing the session context ID
*/
CompletableFuture<Long> createSessionContext(long globalRuntimeEnvId);

/**
* Execute a Substrait query plan.
*
* @param sessionContextId the session context ID
* @param substraitPlanBytes the serialized Substrait query plan
* @return a CompletableFuture containing the result stream
*/
CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes);

/**
* Close a session context and free associated resources.
*
* @param sessionContextId the session context ID to close
* @return a CompletableFuture that completes when the context is closed
*/
CompletableFuture<Void> closeSessionContext(long sessionContextId);

/**
* Returns the data format name
*/
DataFormat getDataFormat();

/**
* Override the engine config with the session config
* @return a CompletableFuture containing the overridden engine config
*/
EngineConfig updateEngineConfig(EngineConfig config);

void attachListener(ConfigUpdateListener listener);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.search.spi;

/**
* Configuration interface for data formats.
* Format-specific configs (e.g., Parquet) publish updates when settings change.
* DataFusionPlugin subscribes to receive updates and holds the final config.
*/
public interface EngineConfig {

/**
* Gets the session config
* @return The session config
*/
SessionConfig getSessionConfig();

NativeConfiguration getNativeConfiguration();

/**
* Updates the session config by merging values
*
* @param sessionConfig The session config to merge from
* @return
*/
EngineConfig updateSessionConfig(SessionConfig sessionConfig);

/**
* Updates the listing table options
* @param nativeConfiguration The new listing table options
*/
EngineConfig updateNativeConfiguration(NativeConfiguration nativeConfiguration);

}
Loading