Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions .github/workflows/loggingTesting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ jobs:
javac \
-cp "target/databricks-jdbc-1.0.5-oss.jar" \
-d target/test-classes \
src/test/java/com/databricks/client/jdbc/LoggingTest.java
src/test/java/com/databricks/client/jdbc/LoggingTest.java \
src/test/java/com/databricks/client/jdbc/LoggingTestNoJVMFlag.java

echo "==== Checking compiled classes ===="
find target/test-classes -type f

- name: Run LoggingTest
- name: Run LoggingTest with JVM flag (original)
shell: bash
run: |
echo "==== Running LoggingTest with usethriftclient=${{ matrix.thrift-client }} ===="
echo "==== Running LoggingTest WITH JVM flag, usethriftclient=${{ matrix.thrift-client }} ===="
OS_TYPE=$(uname | tr '[:upper:]' '[:lower:]')
if [[ "$OS_TYPE" == "linux" ]]; then SEP=":"; else SEP=";"; fi
echo "Using classpath separator: '$SEP'"
Expand All @@ -91,6 +92,19 @@ jobs:
-cp "$CP" \
com.databricks.client.jdbc.LoggingTest

- name: Run LoggingTest without JVM flag (new approach)
shell: bash
run: |
echo "==== Running LoggingTest WITHOUT JVM flag, usethriftclient=${{ matrix.thrift-client }} ===="
OS_TYPE=$(uname | tr '[:upper:]' '[:lower:]')
if [[ "$OS_TYPE" == "linux" ]]; then SEP=":"; else SEP=";"; fi
echo "Using classpath separator: '$SEP'"
CP="target/test-classes${SEP}target/databricks-jdbc-1.0.5-oss.jar"

java \
-cp "$CP" \
com.databricks.client.jdbc.LoggingTestNoJVMFlag

- name: Verify log file contents
shell: bash
run: |
Expand Down
104 changes: 104 additions & 0 deletions .github/workflows/noAddOpensTest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
name: Test JDBC Without Add-Opens

on:
workflow_dispatch:
pull_request:

jobs:
test-without-add-opens:
strategy:
fail-fast: false
matrix:
github-runner: [linux-ubuntu-latest]
thrift-client: [0, 1]

runs-on:
group: databricks-protected-runner-group
labels: ${{ matrix.github-runner }}

steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: Set up Java
uses: actions/setup-java@v4
with:
distribution: 'adopt'
java-version: '21'

- name: Build JDBC driver
run: mvn clean package -DskipTests

- name: Set Environment Variables
shell: bash
env:
DATABRICKS_TOKEN: ${{ secrets.DATABRICKS_TOKEN }}
DATABRICKS_HOST: ${{ secrets.DATABRICKS_HOST }}
DATABRICKS_HTTP_PATH: ${{ secrets.DATABRICKS_HTTP_PATH }}
USE_THRIFT_CLIENT: ${{ matrix.thrift-client }}
run: |
echo "DATABRICKS_TOKEN=${DATABRICKS_TOKEN}" >> $GITHUB_ENV
echo "DATABRICKS_HOST=${DATABRICKS_HOST}" >> $GITHUB_ENV
echo "DATABRICKS_HTTP_PATH=${DATABRICKS_HTTP_PATH}" >> $GITHUB_ENV
echo "USE_THRIFT_CLIENT=${USE_THRIFT_CLIENT}" >> $GITHUB_ENV

- name: Clean & Compile LoggingTestNoJVMFlag
shell: bash
run: |
rm -rf target/test-classes
mkdir -p target/test-classes

javac \
-cp "target/databricks-jdbc-1.0.5-oss.jar" \
-d target/test-classes \
src/test/java/com/databricks/client/jdbc/LoggingTestNoJVMFlag.java

echo "==== Checking compiled classes ===="
find target/test-classes -type f

- name: Run LoggingTestNoJVMFlag WITHOUT the --add-opens flag
shell: bash
run: |
echo "==== Running LoggingTestNoJVMFlag WITHOUT JVM flag, usethriftclient=${{ matrix.thrift-client }} ===="
OS_TYPE=$(uname | tr '[:upper:]' '[:lower:]')
if [[ "$OS_TYPE" == "linux" ]]; then SEP=":"; else SEP=";"; fi
echo "Using classpath separator: '$SEP'"
CP="target/test-classes${SEP}target/databricks-jdbc-1.0.5-oss.jar"

java \
-cp "$CP" \
com.databricks.client.jdbc.LoggingTestNoJVMFlag

- name: Verify log file contents
shell: bash
run: |
LOG_DIR="${HOME}/logstest"
LOG_FILE="${LOG_DIR}/databricks_jdbc.log.0"
echo "Verifying log file contents in ${LOG_FILE}..."

if [ -f "$LOG_FILE" ]; then
echo "Log file found. Checking contents..."

REQUIRED_STRINGS=("sql = SELECT 1",
"Result retrieved successfully"
"Closing global async HTTP client"
"Global async HTTP client has been shut down")

for STRING in "${REQUIRED_STRINGS[@]}"; do
if ! grep -qF "$STRING" "$LOG_FILE"; then
echo "ERROR: Required log string not found: $STRING"
echo "Showing last 100 lines of log file:"
tail -n 100 "$LOG_FILE"
exit 1
fi
done

echo "All required log strings were found."
else
echo "Log file directory contents:"
ls -la "${LOG_DIR}" || echo "Directory does not exist"
echo "Log file ${LOG_FILE} does not exist. Failing the build."
exit 1
fi
4 changes: 4 additions & 0 deletions src/main/java/com/databricks/client/jdbc/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.DatabricksClientType;
import com.databricks.jdbc.common.util.*;
import com.databricks.jdbc.common.util.ArrowBootstrapHook;
import com.databricks.jdbc.dbclient.IDatabricksClient;
import com.databricks.jdbc.dbclient.impl.common.SessionId;
import com.databricks.jdbc.dbclient.impl.sqlexec.DatabricksSdkClient;
Expand All @@ -31,6 +32,9 @@ public class Driver implements IDatabricksDriver, java.sql.Driver {

static {
try {
// Initialize Arrow memory access utilities as early as possible
ArrowBootstrapHook.initialize();

DriverManager.registerDriver(INSTANCE = new Driver());
} catch (SQLException e) {
throw new IllegalStateException("Unable to register " + Driver.class, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.databricks.jdbc.api.impl.converters.ArrowToJavaObjectConverter;
import com.databricks.jdbc.common.CompressionCodec;
import com.databricks.jdbc.common.util.ArrowAllocatorFactory;
import com.databricks.jdbc.common.util.ArrowMemoryHandler;
import com.databricks.jdbc.common.util.DecompressionUtil;
import com.databricks.jdbc.common.util.DriverUtil;
import com.databricks.jdbc.dbclient.IDatabricksHttpClient;
Expand All @@ -30,7 +32,6 @@
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
Expand Down Expand Up @@ -103,14 +104,17 @@ enum ChunkStatus {
private List<String> arrowMetadata;

private ArrowResultChunk(Builder builder) throws DatabricksParsingException {
// Initialize Arrow memory access utilities first
ArrowMemoryHandler.initialize();

this.chunkIndex = builder.chunkIndex;
this.numRows = builder.numRows;
this.rowOffset = builder.rowOffset;
this.chunkLink = builder.chunkLink;
this.statementId = builder.statementId;
this.expiryTime = builder.expiryTime;
this.status = builder.status;
this.rootAllocator = new RootAllocator(/* limit= */ Integer.MAX_VALUE);
this.rootAllocator = ArrowAllocatorFactory.createAllocator(Integer.MAX_VALUE);
if (builder.inputStream != null) {
// Data is already available
try {
Expand Down Expand Up @@ -386,7 +390,10 @@ private static ArrowData getRecordBatchList(
throws IOException {
List<List<ValueVector>> recordBatchList = new ArrayList<>();
List<String> metadata = new ArrayList<>();
try (ArrowStreamReader arrowStreamReader = new ArrowStreamReader(inputStream, rootAllocator)) {

// Use our ArrowReaderProxy to handle Buffer.address access
try (ArrowStreamReader arrowStreamReader =
com.databricks.jdbc.common.util.ArrowReaderProxy.createReader(inputStream, rootAllocator)) {
VectorSchemaRoot vectorSchemaRoot = arrowStreamReader.getVectorSchemaRoot();
boolean fetchedMetadata = false;
while (arrowStreamReader.loadNextBatch()) {
Expand All @@ -408,7 +415,8 @@ private static ArrowData getRecordBatchList(
purgeArrowData(recordBatchList);
} catch (IOException e) {
LOGGER.error(
"Error while reading arrow data, purging the local list and rethrowing the exception.");
"Error while reading arrow data, purging the local list and rethrowing the exception: {}",
e.getMessage());
purgeArrowData(recordBatchList);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.databricks.jdbc.api.impl.arrow;

import com.databricks.jdbc.common.util.UnsafeAccessUtil;
import com.databricks.jdbc.log.JdbcLogger;
import com.databricks.jdbc.log.JdbcLoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Optional;

/**
* A helper class that provides utilities to work with Arrow buffers without requiring the
* --add-opens=java.base/java.nio=ALL-UNNAMED JVM flag.
*/
public class DatabricksArrowBufferHelper {
private static final JdbcLogger LOGGER =
JdbcLoggerFactory.getLogger(DatabricksArrowBufferHelper.class);

// Flag to check if we've installed the bridge
private static boolean BRIDGE_INSTALLED = false;

// Fields and methods for reflection
private static Optional<Field> addressField = Optional.empty();
private static Optional<Method> getAddressMethod = Optional.empty();

/**
* Initialize the system to use our safe ByteBuffer.address access. This should be called early
* during driver initialization.
*
* <p>This method installs hooks into the JVM that allow Arrow to access ByteBuffer.address
* without requiring the --add-opens flag.
*/
public static synchronized void initializeArrowBufferBridge() {
if (BRIDGE_INSTALLED) {
return;
}

try {
// Check if we even need this workaround
if (UnsafeAccessUtil.hasDirectAddressAccess()) {
LOGGER.info("Direct ByteBuffer.address access is available, no need for workaround");
BRIDGE_INSTALLED = true;
return;
}

// Try to initialize a buffer hook to intercept access to Buffer.address
// We'll use method handles to call our own UnsafeAccessUtil

// Find ArrowBufUnderlyingBuffer class using reflection
Class<?> arrowByteBufClass = null;
try {
// Different versions of Arrow use different package names
// Try different possibilities
arrowByteBufClass = Class.forName("org.apache.arrow.memory.ArrowByteBuf");
} catch (ClassNotFoundException e) {
try {
arrowByteBufClass = Class.forName("io.netty.buffer.ArrowByteBuf");
} catch (ClassNotFoundException ex) {
// Likely a different version of Arrow, we'll try a different approach
}
}

// Log success if we found the class
if (arrowByteBufClass != null) {
LOGGER.info("Found ArrowByteBuf class: {}", arrowByteBufClass.getName());
}

// Initialize access to buffer methods
try {
// Try to find java.nio.Buffer.address field
Field addrField = Class.forName("java.nio.Buffer").getDeclaredField("address");
addrField.setAccessible(true);
addressField = Optional.of(addrField);
LOGGER.info("Successfully accessed Buffer.address field");
} catch (Exception e) {
LOGGER.info("Could not access Buffer.address field: {}", e.getMessage());
}

BRIDGE_INSTALLED = true;
LOGGER.info("Arrow Buffer bridge has been installed");
} catch (Exception e) {
LOGGER.warn("Failed to initialize Arrow buffer bridge: {}", e.getMessage());
}
}

/**
* Get the memory address of a direct ByteBuffer using our safe access method. This is a
* workaround for the reflection-based approach that requires --add-opens.
*
* @param buffer The ByteBuffer to get the address of
* @return The memory address as a long
* @throws IllegalArgumentException If the buffer is not direct or address cannot be accessed
*/
public static long getBufferAddress(ByteBuffer buffer) {
return UnsafeAccessUtil.getBufferAddress(buffer);
}

/**
* Sets the address field of a Buffer using our safe access method. This can be used to override
* the address field when Arrow tries to access it.
*
* @param buffer The Buffer to set the address in
* @param address The address value to set
* @return true if successful, false otherwise
*/
public static boolean setBufferAddress(Object buffer, long address) {
if (addressField.isPresent()) {
try {
addressField.get().set(buffer, address);
return true;
} catch (Exception e) {
LOGGER.warn("Failed to set Buffer.address: {}", e.getMessage());
}
}
return false;
}

/**
* Check if a ByteBuffer is direct.
*
* @param buffer The buffer to check
* @return true if the buffer is direct
*/
public static boolean isDirectBuffer(ByteBuffer buffer) {
return buffer.isDirect();
}
}
Loading
Loading