Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions libs/vectorized-exec-spi/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

apply plugin: 'opensearch.build'

description = 'Vectorized engine common interfaces for OpenSearch'

dependencies {
api project(':libs:opensearch-core')
api project(':libs:opensearch-common')

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'vectorized-exec-spi'
}
}

tasks.named('forbiddenApisMain').configure {
replaceSignatureFiles 'jdk-signatures'
}

jarHell.enabled = false

test {
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* DataFusion integration for OpenSearch.
* Provides JNI bindings and core functionality for DataFusion query engine.
*/
package org.opensearch.vectorized.execution;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.spi;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* Service Provider Interface for DataFusion data source codecs.
* Implementations provide access to different data formats (CSV, Parquet, etc.)
* through the DataFusion query engine.
*/
public interface DataSourceCodec {

/**
* Register a directory containing data files with the runtime environment to prewarm cache
* This ideally should be used as part of each refresh - equivalent of acquire searcher
* where we register the files associated with this particular refresh point
* @param directoryPath the path to the directory containing data files
* @param fileNames the list of file names to register
* @param runtimeId the runtime environment ID
* @return a CompletableFuture that completes when registration is done
*/
CompletableFuture<Void> registerDirectory(String directoryPath, List<String> fileNames, long runtimeId);

/**
* Create a new session context for query execution.
*
* @param globalRuntimeEnvId the global runtime environment ID
* @return a CompletableFuture containing the session context ID
*/
CompletableFuture<Long> createSessionContext(long globalRuntimeEnvId);

/**
* Execute a Substrait query plan.
*
* @param sessionContextId the session context ID
* @param substraitPlanBytes the serialized Substrait query plan
* @return a CompletableFuture containing the result stream
*/
CompletableFuture<RecordBatchStream> executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes);

/**
* Close a session context and free associated resources.
*
* @param sessionContextId the session context ID to close
* @return a CompletableFuture that completes when the context is closed
*/
CompletableFuture<Void> closeSessionContext(long sessionContextId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.vectorized.execution.spi;

import java.util.concurrent.CompletableFuture;

/**
* Represents a stream of record batches from a DataFusion query execution.
* This interface provides access to query results in a streaming fashion.
*/
public interface RecordBatchStream extends AutoCloseable {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can be generic typed RecordBatchStream<T>

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense. I had a todo in CSV RBS to refactor a bit


/**
* Check if there are more record batches available in the stream.
*
* @return true if more batches are available, false otherwise
*/
boolean hasNext();

/**
* Get the schema of the record batches in this stream.
* @return the schema object
*/
Object getSchema();

/**
* Get the next record batch from the stream.
*
* @return the next record batch as a byte array, or null if no more batches
*/
CompletableFuture<Object> next();

/**
* Close the stream and free associated resources.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Service Provider Interface (SPI) for DataFusion data source codecs.
* Defines interfaces for implementing different data format support.
*/
package org.opensearch.vectorized.execution.spi;
112 changes: 112 additions & 0 deletions plugins/dataformat-csv/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

apply plugin: 'opensearch.opensearchplugin'

opensearchplugin {
name = 'dataformat-csv'
description = 'CSV data format plugin for OpenSearch DataFusion'
classname = 'org.opensearch.datafusion.csv.CsvDataFormatPlugin'
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to decouple the package namespacing?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , i'll work with mohit , this whole dataformat plugin will probably be moved/designed as something that can work for both query and indexing. I have todos in plugin class for the same.

hasNativeController = false
}

dependencies {
api project(':libs:opensearch-vectorized-exec-spi')
api project(':libs:opensearch-core')
api project(':libs:opensearch-common')

testImplementation(project(":test:framework")) {
exclude group: 'org.opensearch', module: 'opensearch-dataformat-csv'
}
}

// JNI library configuration
task buildJni(type: Exec) {
description = 'Build the Rust JNI library using Cargo'
group = 'build'

workingDir 'jni'

// Determine the target directory and library name based on OS
def osName = System.getProperty('os.name').toLowerCase()
def libPrefix = osName.contains('windows') ? '' : 'lib'
def libExtension = osName.contains('windows') ? '.dll' : (osName.contains('mac') ? '.dylib' : '.so')

// Find cargo executable - try common locations
def cargoExecutable = 'cargo'
def possibleCargoPaths = [
System.getenv('HOME') + '/.cargo/bin/cargo',
'/usr/local/bin/cargo',
'cargo'
]

for (String path : possibleCargoPaths) {
if (new File(path).exists()) {
cargoExecutable = path
break
}
}

// Use release build
//def cargoArgs = ['cargo', 'build', '--release']

def cargoArgs = [cargoExecutable, 'build', '--release']

if (osName.contains('windows')) {
commandLine cargoArgs
} else {
commandLine cargoArgs
}

// Set environment variables for cross-compilation if needed
environment 'CARGO_TARGET_DIR', file('jni/target').absolutePath

inputs.files fileTree('jni/src')
inputs.file 'jni/Cargo.toml'
outputs.files file("jni/target/release/${libPrefix}opensearch_datafusion_csv_jni${libExtension}")
}

task copyJniLib(type: Copy, dependsOn: buildJni) {
from 'jni/target/release'
into 'src/main/resources'
include '*.dylib', '*.so', '*.dll'

doLast {
// Remove executable permissions from copied native libraries
fileTree('src/main/resources').matching {
include '*.dylib', '*.so', '*.dll'
}.each { file ->
file.setExecutable(false, false)
file.setReadable(true, false)
file.setWritable(true, false)
}
}
}

processResources.dependsOn copyJniLib
sourcesJar.dependsOn copyJniLib

// Ensure file permissions check runs after JNI library is copied
tasks.named('filepermissions').configure {
dependsOn copyJniLib
}

// Ensure forbidden patterns check runs after JNI library is copied
tasks.named('forbiddenPatterns').configure {
dependsOn copyJniLib
exclude '**/*.dylib', '**/*.so', '**/*.dll'
}

// Ensure spotless check runs after JNI library is copied
tasks.named('spotlessJava').configure {
dependsOn copyJniLib
}

test {
systemProperty 'tests.security.manager', 'false'
}
53 changes: 53 additions & 0 deletions plugins/dataformat-csv/jni/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[package]
name = "opensearch-datafusion-csv-jni"
version = "0.1.0"
edition = "2021"

[lib]
name = "opensearch_datafusion_csv_jni"
crate-type = ["cdylib"]

[dependencies]
# DataFusion dependencies
datafusion = "49.0.0"
datafusion-substrait = "49.0.0"
Comment on lines +10 to +13
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid this dependency?

arrow = "54.0.0"
arrow-array = "54.0.0"
arrow-schema = "54.0.0"
arrow-buffer = "54.0.0"

# JNI dependencies
jni = "0.21"

# Async runtime
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
futures-util = "0.3"

# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

# Error handling
anyhow = "1.0"
thiserror = "1.0"

# Logging
log = "0.4"

# Parquet support
parquet = "54.0.0"

# Object store for file access
object_store = "0.11"
url = "2.0"

# Substrait support
substrait = "0.47"
prost = "0.13"

# Temporary directory support
tempfile = "3.0"

[build-dependencies]
cbindgen = "0.27"
70 changes: 70 additions & 0 deletions plugins/dataformat-csv/jni/src/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*/

use datafusion::prelude::*;
use datafusion::execution::context::SessionContext;
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;

/// Manages DataFusion session contexts
pub struct SessionContextManager {
contexts: HashMap<*mut SessionContext, Arc<SessionContext>>,
next_runtime_id: u64,
}

impl SessionContextManager {
pub fn new() -> Self {
Self {
contexts: HashMap::new(),
next_runtime_id: 1,
}
}

pub async fn register_directory(
&mut self,
table_name: &str,
directory_path: &str,
options: HashMap<String, String>,
) -> Result<u64> {
// Placeholder implementation - would register csv directory as table
log::info!("Registering directory: {} at path: {} with options: {:?}",
table_name, directory_path, options);

let runtime_id = self.next_runtime_id;
self.next_runtime_id += 1;
Ok(runtime_id)
}

pub async fn create_session_context(
&mut self,
config: HashMap<String, String>,
) -> Result<*mut SessionContext> {
// Create actual DataFusion session context
let mut session_config = SessionConfig::new();

// Apply configuration options
if let Some(batch_size) = config.get("batch_size") {
if let Ok(size) = batch_size.parse::<usize>() {
session_config = session_config.with_batch_size(size);
}
}

let ctx = Arc::new(SessionContext::new_with_config(session_config));
let ctx_ptr = Arc::as_ptr(&ctx) as *mut SessionContext;

self.contexts.insert(ctx_ptr, ctx);

Ok(ctx_ptr)
}

pub async fn close_session_context(&mut self, ctx_ptr: *mut SessionContext) -> Result<()> {
self.contexts.remove(&ctx_ptr);
Ok(())
}

pub fn get_context(&self, ctx_ptr: *mut SessionContext) -> Option<&Arc<SessionContext>> {
self.contexts.get(&ctx_ptr)
}
}
Loading
Loading