Skip to content

[PECOBLR-348] Added putFiles method in DBFSVolume client#823

Closed
shivam2680 wants to merge 1605 commits into
mainfrom
shivam2680/dbfs-async
Closed

[PECOBLR-348] Added putFiles method in DBFSVolume client#823
shivam2680 wants to merge 1605 commits into
mainfrom
shivam2680/dbfs-async

Conversation

@shivam2680

@shivam2680 shivam2680 commented May 12, 2025

Copy link
Copy Markdown
Collaborator

Description

Added putFiles in Volume client to support async upload of files.
Design Doc

Testing

Driver Test
Integration Test
E2E test
Ingestion Team Benchmarks

Benchmark Results

Number of files File Size (bytes) Duration (ms) Files/sec
100 8 4404 22
100 16 2164 46
100 160 2258 44
100 500 2504 39
1000 8 15961 62
1000 16 13116 76
1000 160 15285 65
1000 500 16396 60
4000 8 56740 70
4000 16 58010 68
4000 160 61217 65
4000 500 56372 70
10000 8 144293 69
10000 16 147235 67
10000 160 155257 64

Additional Notes to the Reviewer

gopalldb and others added 30 commits December 10, 2024 15:22
* Fix UC volume GET operation

* Add tests
* Added the GET and DELETE inside the DBFS client

* Added java doc and refractored workspace client initialisation

* Added unit tests

* Improved the unit test coverage to 85%

* Fixed the DBFS client to use ConnectionContext for initializing

* Refractored the code for Volume Factory

* Added java docs
* Adding changes for Fixing async execution

* Add tests

* Fixes as per comments

* Add tests
* Update SDK version

* fix tests as apiClient is being used in sdk now
* Revert "revert (#595)"

This reverts commit 56ef04f.

* Fix uber minimal

* fix version
* Telemetry

* Add client

* Add telemetry client

* Minor fix

* More changes

* Changes for telemetry

* Add more details in Telemetry impl

* Add tests

* Fix typo

* Add more details

* Updated error msg

* Fix coverage
* parser

* more complex data type logic

* complete complex data types

* more tests

* refine

* integration test

* add test to driver test

* removed secret

* fmt

* fix unsupported test

* integration test working

* fmt

* comments

* fmt

* update method sig

* fmt

* comments

* comments

* comments

* fmt

* fmt

* add

* more extensive testing

* more extensive testing

* more extensive testing

* more extensive testing

* more extensive testing
* Send intial connection log to unauth telemetry endpoint

* Add tests + remove debug logs

* fmt

* Add tests

* Add tests

* Add tests for classes i didn't add:)

* add getters/setters in systemConfiguration + improve coverage
- Run execution queries in `async` mode
- Run metadata queries in `async` mode
- Refactor result retrieval in Thrift by improving response handling and fixing polling
- Refactor credential refreshing by transitioning from Thrift accessor to Thrift transport
- Competing driver, SQL Gateway, and SQL Execution API also execute the queries in async mode
- Minor code cleanup

Running queries in async mode and using polling will be beneficial:
- Freeing up server resources under high load instead of holding the connection for ~5 seconds (direct results)
- If query takes more than 5 seconds, server will anyway require polling from client
- Didn't notice any regressions (slightly better performance) under high concurrent load when running queries in async mode. Some empirical results below:
  - Running a simple select query on a table with 10-15 columns for 10K rows in a concurrent environment (250 connections executing query in parallel)
- Average results when running on `main`
```
Average query time across connections: 73873.58ms
Min query time: 22075ms
Max query time: 88997ms
```
- Average results when running in `async` mode
```
Average query time across connections: 68382.95ms
Min query time: 24718ms
Max query time: 95282ms
```
…ch (#622)

This adds a workflow to run JDBC comparator in a separate branch for GA readiness. Note that this does not affect the main branch.
Add databricks protected runners
Use databricks runners
Setup PAT correctly
…y paths (#620)

* change file paths

* change file paths

* change file paths

* change file paths

* change file paths

* change file paths

* change file paths
}

@Override
public int getMaxConcurrentPresignedRequests() {

@gopalldb gopalldb May 19, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename this to UCVolumeUploadBatchSize

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not batch size. This is the number of parallel requests that driver sends for fetching pre-signed urls.

List<CompletableFuture<VolumePutResult>> futures = new ArrayList<>(objectPaths.size());

// Initialize lists with null. Later, set values at the correct index
for (int i = 0; i < objectPaths.size(); i++) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this step needed? To handle partial failures?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and maintain result order.

@jayantsing-db jayantsing-db left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments inline. Could you please remove redundant fake service tests and stubs? i think fake-service does not provide the required fidelity as it is a client upload.

Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java Outdated
}

// Remove null entries from uploadRequests. Files that don't exist will be null
uploadRequests.removeIf(Objects::isNull);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures seem to be a parallel array. no modifications needed to that array?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are 2 different arrays,
futures is just track result order
uploadRequests is used to send parallel requests, if a file doesn't exist, we won't send upload request for that file.


this.source = source;
this.contentLength = contentLength;
this.contentType = Objects.requireNonNull(contentType, "contentType");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this throws exception, are resources like inputstream closed?

@shivam2680 shivam2680 May 27, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this case won't arise as we are setting ContentType.APPLICATION_OCTET_STREAM always while constructing this producer. I can maybe remove this field itself.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay thanks

channel.endStream();
return;
}
int read = source.read(buf, 0, (int) Math.min(buf.length, contentLength - totalSent));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a blocking call for async thread pool. can we use nio channels if not too complex?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel based on the current upload latencies, current impl is fine. I agree this is blocking, but is limited by buf size (16KB). A pure NIO pipeline might get complicated

channel.endStream();
return;
}
int read = source.read(buf, 0, (int) Math.min(buf.length, contentLength - totalSent));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overflow risks with (int) Math.min(buf.length, contentLength - totalSent)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really,
buf.length is always 16384(16KB), so Math.min(buf.length, contentLength - totalSent) has max value 16384 only.

}
try {
source.close();
} catch (IOException ignore) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silent exception swallowing?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throwing ex now.

channel.endStream();
} else {
totalSent += read;
channel.write(ByteBuffer.wrap(buf, 0, read));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if the channel can accept data?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduced a currentChunk ByteBuffer. helps in identifying how many bytes channel accepts and retries until buffer is empty.

Comment on lines +120 to +121
endOfStream = true;
channel.endStream();

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if we have actually sent expected number of bytes and fail accordingly?

final WorkspaceClient workspaceClient;
final ApiClient apiClient;
private final String allowedVolumeIngestionPaths;
private static int MAX_CONCURRENT_PRESIGNED_REQUESTS = 50;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this default value used?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to initialize the presignedUrlSemaphore that controls the rate limiting for presigned URL requests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants