-
Notifications
You must be signed in to change notification settings - Fork 0
Datafusion codec - adding libs , data source plugin and data source aware plugin #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/datafusion
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| apply plugin: 'opensearch.build' | ||
|
|
||
| description = 'Vectorized engine common interfaces for OpenSearch' | ||
|
|
||
| dependencies { | ||
| api project(':libs:opensearch-core') | ||
| api project(':libs:opensearch-common') | ||
|
|
||
| testImplementation(project(":test:framework")) { | ||
| exclude group: 'org.opensearch', module: 'vectorized-exec-spi' | ||
| } | ||
| } | ||
|
|
||
| tasks.named('forbiddenApisMain').configure { | ||
| replaceSignatureFiles 'jdk-signatures' | ||
| } | ||
|
|
||
| jarHell.enabled = false | ||
|
|
||
| test { | ||
| systemProperty 'tests.security.manager', 'false' | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| /** | ||
| * DataFusion integration for OpenSearch. | ||
| * Provides JNI bindings and core functionality for DataFusion query engine. | ||
| */ | ||
| package org.opensearch.vectorized.execution; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,56 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.vectorized.execution.spi; | ||
|
|
||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** | ||
| * Service Provider Interface for DataFusion data source codecs. | ||
| * Implementations provide access to different data formats (CSV, Parquet, etc.) | ||
| * through the DataFusion query engine. | ||
| */ | ||
| public interface DataSourceCodec { | ||
|
|
||
| /** | ||
| * Register a directory containing data files with the runtime environment to prewarm cache | ||
| * This ideally should be used as part of each refresh - equivalent of acquire searcher | ||
| * where we register the files associated with this particular refresh point | ||
| * @param directoryPath the path to the directory containing data files | ||
| * @param fileNames the list of file names to register | ||
| * @param runtimeId the runtime environment ID | ||
| * @return a CompletableFuture that completes when registration is done | ||
| */ | ||
| CompletableFuture<Void> registerDirectory(String directoryPath, List<String> fileNames, long runtimeId); | ||
|
|
||
| /** | ||
| * Create a new session context for query execution. | ||
| * | ||
| * @param globalRuntimeEnvId the global runtime environment ID | ||
| * @return a CompletableFuture containing the session context ID | ||
| */ | ||
| CompletableFuture<Long> createSessionContext(long globalRuntimeEnvId); | ||
|
|
||
| /** | ||
| * Execute a Substrait query plan. | ||
| * | ||
| * @param sessionContextId the session context ID | ||
| * @param substraitPlanBytes the serialized Substrait query plan | ||
| * @return a CompletableFuture containing the result stream | ||
| */ | ||
| CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes); | ||
|
|
||
| /** | ||
| * Close a session context and free associated resources. | ||
| * | ||
| * @param sessionContextId the session context ID to close | ||
| * @return a CompletableFuture that completes when the context is closed | ||
| */ | ||
| CompletableFuture<Void> closeSessionContext(long sessionContextId); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| package org.opensearch.vectorized.execution.spi; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
|
|
||
| /** | ||
| * Represents a stream of record batches from a DataFusion query execution. | ||
| * This interface provides access to query results in a streaming fashion. | ||
| */ | ||
| public interface RecordBatchStream extends AutoCloseable { | ||
|
|
||
| /** | ||
| * Check if there are more record batches available in the stream. | ||
| * | ||
| * @return true if more batches are available, false otherwise | ||
| */ | ||
| boolean hasNext(); | ||
|
|
||
| /** | ||
| * Get the schema of the record batches in this stream. | ||
| * @return the schema object | ||
| */ | ||
| Object getSchema(); | ||
|
|
||
| /** | ||
| * Get the next record batch from the stream. | ||
| * | ||
| * @return the next record batch as a byte array, or null if no more batches | ||
| */ | ||
| CompletableFuture<Object> next(); | ||
|
|
||
| /** | ||
| * Close the stream and free associated resources. | ||
| */ | ||
| @Override | ||
| void close(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| /** | ||
| * Service Provider Interface (SPI) for DataFusion data source codecs. | ||
| * Defines interfaces for implementing different data format support. | ||
| */ | ||
| package org.opensearch.vectorized.execution.spi; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,112 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| * | ||
| * The OpenSearch Contributors require contributions made to | ||
| * this file be licensed under the Apache-2.0 license or a | ||
| * compatible open source license. | ||
| */ | ||
|
|
||
| apply plugin: 'opensearch.opensearchplugin' | ||
|
|
||
| opensearchplugin { | ||
| name = 'dataformat-csv' | ||
| description = 'CSV data format plugin for OpenSearch DataFusion' | ||
| classname = 'org.opensearch.datafusion.csv.CsvDataFormatPlugin' | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to decouple the package namespacing?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes , i'll work with mohit , this whole dataformat plugin will probably be moved/designed as something that can work for both query and indexing. I have todos in plugin class for the same. |
||
| hasNativeController = false | ||
| } | ||
|
|
||
| dependencies { | ||
| api project(':libs:opensearch-vectorized-exec-spi') | ||
| api project(':libs:opensearch-core') | ||
| api project(':libs:opensearch-common') | ||
|
|
||
| testImplementation(project(":test:framework")) { | ||
| exclude group: 'org.opensearch', module: 'opensearch-dataformat-csv' | ||
| } | ||
| } | ||
|
|
||
| // JNI library configuration | ||
| task buildJni(type: Exec) { | ||
| description = 'Build the Rust JNI library using Cargo' | ||
| group = 'build' | ||
|
|
||
| workingDir 'jni' | ||
|
|
||
| // Determine the target directory and library name based on OS | ||
| def osName = System.getProperty('os.name').toLowerCase() | ||
| def libPrefix = osName.contains('windows') ? '' : 'lib' | ||
| def libExtension = osName.contains('windows') ? '.dll' : (osName.contains('mac') ? '.dylib' : '.so') | ||
|
|
||
| // Find cargo executable - try common locations | ||
| def cargoExecutable = 'cargo' | ||
| def possibleCargoPaths = [ | ||
| System.getenv('HOME') + '/.cargo/bin/cargo', | ||
| '/usr/local/bin/cargo', | ||
| 'cargo' | ||
| ] | ||
|
|
||
| for (String path : possibleCargoPaths) { | ||
| if (new File(path).exists()) { | ||
| cargoExecutable = path | ||
| break | ||
| } | ||
| } | ||
|
|
||
| // Use release build | ||
| //def cargoArgs = ['cargo', 'build', '--release'] | ||
|
|
||
| def cargoArgs = [cargoExecutable, 'build', '--release'] | ||
|
|
||
| if (osName.contains('windows')) { | ||
| commandLine cargoArgs | ||
| } else { | ||
| commandLine cargoArgs | ||
| } | ||
|
|
||
| // Set environment variables for cross-compilation if needed | ||
| environment 'CARGO_TARGET_DIR', file('jni/target').absolutePath | ||
|
|
||
| inputs.files fileTree('jni/src') | ||
| inputs.file 'jni/Cargo.toml' | ||
| outputs.files file("jni/target/release/${libPrefix}opensearch_datafusion_csv_jni${libExtension}") | ||
| } | ||
|
|
||
| task copyJniLib(type: Copy, dependsOn: buildJni) { | ||
| from 'jni/target/release' | ||
| into 'src/main/resources' | ||
| include '*.dylib', '*.so', '*.dll' | ||
|
|
||
| doLast { | ||
| // Remove executable permissions from copied native libraries | ||
| fileTree('src/main/resources').matching { | ||
| include '*.dylib', '*.so', '*.dll' | ||
| }.each { file -> | ||
| file.setExecutable(false, false) | ||
| file.setReadable(true, false) | ||
| file.setWritable(true, false) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| processResources.dependsOn copyJniLib | ||
| sourcesJar.dependsOn copyJniLib | ||
|
|
||
| // Ensure file permissions check runs after JNI library is copied | ||
| tasks.named('filepermissions').configure { | ||
| dependsOn copyJniLib | ||
| } | ||
|
|
||
| // Ensure forbidden patterns check runs after JNI library is copied | ||
| tasks.named('forbiddenPatterns').configure { | ||
| dependsOn copyJniLib | ||
| exclude '**/*.dylib', '**/*.so', '**/*.dll' | ||
| } | ||
|
|
||
| // Ensure spotless check runs after JNI library is copied | ||
| tasks.named('spotlessJava').configure { | ||
| dependsOn copyJniLib | ||
| } | ||
|
|
||
| test { | ||
| systemProperty 'tests.security.manager', 'false' | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| [package] | ||
| name = "opensearch-datafusion-csv-jni" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
|
|
||
| [lib] | ||
| name = "opensearch_datafusion_csv_jni" | ||
| crate-type = ["cdylib"] | ||
|
|
||
| [dependencies] | ||
| # DataFusion dependencies | ||
| datafusion = "49.0.0" | ||
| datafusion-substrait = "49.0.0" | ||
|
Comment on lines
+10
to
+13
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid this dependency? |
||
| arrow = "54.0.0" | ||
| arrow-array = "54.0.0" | ||
| arrow-schema = "54.0.0" | ||
| arrow-buffer = "54.0.0" | ||
|
|
||
| # JNI dependencies | ||
| jni = "0.21" | ||
|
|
||
| # Async runtime | ||
| tokio = { version = "1.0", features = ["full"] } | ||
| futures = "0.3" | ||
| futures-util = "0.3" | ||
|
|
||
| # Serialization | ||
| serde = { version = "1.0", features = ["derive"] } | ||
| serde_json = "1.0" | ||
|
|
||
| # Error handling | ||
| anyhow = "1.0" | ||
| thiserror = "1.0" | ||
|
|
||
| # Logging | ||
| log = "0.4" | ||
|
|
||
| # Parquet support | ||
| parquet = "54.0.0" | ||
|
|
||
| # Object store for file access | ||
| object_store = "0.11" | ||
| url = "2.0" | ||
|
|
||
| # Substrait support | ||
| substrait = "0.47" | ||
| prost = "0.13" | ||
|
|
||
| # Temporary directory support | ||
| tempfile = "3.0" | ||
|
|
||
| [build-dependencies] | ||
| cbindgen = "0.27" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| /* | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| use datafusion::prelude::*; | ||
| use datafusion::execution::context::SessionContext; | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
| use anyhow::Result; | ||
|
|
||
| /// Manages DataFusion session contexts | ||
| pub struct SessionContextManager { | ||
| contexts: HashMap<*mut SessionContext, Arc<SessionContext>>, | ||
| next_runtime_id: u64, | ||
| } | ||
|
|
||
| impl SessionContextManager { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| contexts: HashMap::new(), | ||
| next_runtime_id: 1, | ||
| } | ||
| } | ||
|
|
||
| pub async fn register_directory( | ||
| &mut self, | ||
| table_name: &str, | ||
| directory_path: &str, | ||
| options: HashMap<String, String>, | ||
| ) -> Result<u64> { | ||
| // Placeholder implementation - would register csv directory as table | ||
| log::info!("Registering directory: {} at path: {} with options: {:?}", | ||
| table_name, directory_path, options); | ||
|
|
||
| let runtime_id = self.next_runtime_id; | ||
| self.next_runtime_id += 1; | ||
| Ok(runtime_id) | ||
| } | ||
|
|
||
| pub async fn create_session_context( | ||
| &mut self, | ||
| config: HashMap<String, String>, | ||
| ) -> Result<*mut SessionContext> { | ||
| // Create actual DataFusion session context | ||
| let mut session_config = SessionConfig::new(); | ||
|
|
||
| // Apply configuration options | ||
| if let Some(batch_size) = config.get("batch_size") { | ||
| if let Ok(size) = batch_size.parse::<usize>() { | ||
| session_config = session_config.with_batch_size(size); | ||
| } | ||
| } | ||
|
|
||
| let ctx = Arc::new(SessionContext::new_with_config(session_config)); | ||
| let ctx_ptr = Arc::as_ptr(&ctx) as *mut SessionContext; | ||
|
|
||
| self.contexts.insert(ctx_ptr, ctx); | ||
|
|
||
| Ok(ctx_ptr) | ||
| } | ||
|
|
||
| pub async fn close_session_context(&mut self, ctx_ptr: *mut SessionContext) -> Result<()> { | ||
| self.contexts.remove(&ctx_ptr); | ||
| Ok(()) | ||
| } | ||
|
|
||
| pub fn get_context(&self, ctx_ptr: *mut SessionContext) -> Option<&Arc<SessionContext>> { | ||
| self.contexts.get(&ctx_ptr) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can be generic typed
RecordBatchStream<T>There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes makes sense. I had a todo in CSV RBS to refactor a bit