Skip to content

[PECOBLR-348] Added putFiles method in DBFSVolume client#860

Merged
shivam2680 merged 37 commits into
databricks:mainfrom
shivam2680:dbfs-async
Jul 22, 2025
Merged

[PECOBLR-348] Added putFiles method in DBFSVolume client#860
shivam2680 merged 37 commits into
databricks:mainfrom
shivam2680:dbfs-async

Conversation

@shivam2680

@shivam2680 shivam2680 commented Jun 23, 2025

Copy link
Copy Markdown
Collaborator

Description

Added putFiles in Volume client to support async upload of files.
Design Doc

Old PR link: #823

Testing

Driver Test
Integration Test
E2E test
Ingestion Team Benchmarks

Benchmark Results

Number of files File Size (bytes) Duration (ms) Files/sec
100 8 4404 22
100 16 2164 46
100 160 2258 44
100 500 2504 39
1000 8 15961 62
1000 16 13116 76
1000 160 15285 65
1000 500 16396 60
4000 8 56740 70
4000 16 58010 68
4000 160 61217 65
4000 500 56372 70
10000 8 144293 69
10000 16 147235 67
10000 160 155257 64

Additional Notes to the Reviewer

@github-actions

Copy link
Copy Markdown

Please ensure that the NEXT_CHANGELOG.md file is updated with any relevant changes.
If this is not necessary for your PR, please include the following in your PR description:
NO_CHANGELOG=true
and rerun the job.

@shivam2680 shivam2680 changed the title Dbfs async [PECOBLR-348] Added putFiles method in DBFSVolume client Jun 23, 2025

@jayantsing-db jayantsing-db left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot @shivam2680 for making these changes and perf improvements with async client. I have added my comments to the PR.

Additionally, in general, I feel the implementation could have been much more simple. Right now, we have a lot of nested callbacks, nested retries, a semaphore whole lifecycle needs to manually managed, and nested error handling. This makes it a lot more complex than it has to be. If possible, let's see if we can discuss and simplify this.


@ParameterizedTest
@MethodSource("provideParametersForFileOrderTest")
void testResultOrderMatches(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get rid of this

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@jayantsing-db jayantsing-db Jul 9, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hopefully the stubs are removed as well?

"test_volume1",
Arrays.asList("multi_upload_4.txt", "missing_file.txt", "multi_upload_5.txt"),
Arrays.asList(
"/tmp/multi_upload_0.txt", "/tmp/multi_upload_100.txt", "/tmp/multi_upload_2.txt"),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it testing the case when the 2nd file is missing? You create a /tmp/multi_upload_1.txt instead of /tmp/multi_upload_100.txt and so file is missing. is that correct?

Comment thread src/main/java/com/databricks/jdbc/api/impl/DatabricksConnectionContext.java Outdated
Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/VolumeUploadCallback.java Outdated
Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/VolumeUploadCallback.java Outdated
Comment on lines +160 to +167
// Handle stream reset if needed
if (!request.isFile() && request.inputStream.markSupported()) {
try {
request.inputStream.reset();
} catch (IOException e) {
LOGGER.warn("Could not reset input stream for retry: " + e.getMessage());
}
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Comment on lines +174 to +209
String presignedUrl = response.getUrl();
LOGGER.debug(
String.format(
"Got new presigned URL for retry of %s (attempt %d)",
request.objectPath, attempt + 1));

try {
// Create upload producer
AsyncRequestProducer uploadProducer;
if (request.isFile()) {
// File upload
uploadProducer =
AsyncRequestBuilder.put()
.setUri(URI.create(presignedUrl))
.setEntity(
AsyncEntityProducers.create(
request.file.toFile(), ContentType.DEFAULT_BINARY))
.build();
} else {
// Stream upload
AsyncEntityProducer entity =
new InputStreamFixedLenProducer(
request.inputStream, request.contentLength);
uploadProducer =
AsyncRequestBuilder.put()
.setUri(URI.create(presignedUrl))
.setEntity(entity)
.build();
}

AsyncResponseConsumer<SimpleHttpResponse> uploadConsumer =
SimpleResponseConsumer.create();

// Create callback with incremented attempt count
VolumeUploadCallback uploadCallback =
new VolumeUploadCallback(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this exactly the same code as the one in volume client class? if so, all comments apply to this one as well.

@shivam2680 shivam2680 requested a review from jayantsing-db July 7, 2025 05:25
Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java Outdated
Comment on lines +857 to +865
presignedUrlSemaphore.acquire();

// The whenComplete block acts as a "finally" for the async operation.
// It guarantees the semaphore is released when the future is done, for any reason.
future.whenComplete(
(response, throwable) -> {
LOGGER.debug("Releasing semaphore permit for {}", objectPath);
presignedUrlSemaphore.release();
});

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think during the retry of the same url for which semaphore was acquired earlier, it needs to contest for the same semaphore again. i am not entirely sure if this is a good approach. A failed URL might end up starving to the end.

i think better to retain the semaphore for entire number of attempts.

as a side note/separately, i think this entire impl can be simplified like:

  • requestPresignedUrlWithRetry (acquires the semaphore) calls makeRequest (releases semaphore when complete)
  • makeRequest does async call and composes the future with processResponse
  • processResponse either completes the future or composes the future with a retry till a number of attempts

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think during the retry of the same url for which semaphore was acquired earlier, it needs to contest for the same semaphore again. i am not entirely sure if this is a good approach. A failed URL might end up starving to the end.

oh sorry, it looks like it retains the semaphore and calls handleRetry with the incomplete future

* Maximum delay in milliseconds between retry attempts. Caps the exponential backoff to prevent
* excessively long delays.
*/
private static final long MAX_RETRY_DELAY_MS = 10000; // 10 seconds max delay

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be made configurable? even 10s may be higher for some cases

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this. This is just to cap exponential backoff delay. we anyways start from a lower value

Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java Outdated
Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java Outdated
Comment thread src/main/java/com/databricks/jdbc/api/impl/volume/DBFSVolumeClient.java Outdated

// Get a new presigned URL and retry the upload
urlGenerator
.apply(request.ucVolumePath, request.objectPath, 1)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add param name for readability: 1 /*.paramName */

@jayantsing-db jayantsing-db left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. could you please log a todo/backlog to revisit some of the retry-unification/callback-handling?


@ParameterizedTest
@MethodSource("provideParametersForMultiFilePutTest")
void testPutFiles(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor qq: what was the need to update stubs for testputobject?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might have run this test to verify pre-signed url setup

@shivam2680

Copy link
Copy Markdown
Collaborator Author

Tracking suggested improvements here
https://databricks.atlassian.net/browse/PECOBLR-664

Comment thread src/main/java/com/databricks/jdbc/common/DatabricksJdbcUrlParams.java Outdated
Comment thread src/main/java/com/databricks/jdbc/common/util/VolumeRetryUtil.java

@gopalldb gopalldb left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename the flag for maxPresignedUrls

@shivam2680 shivam2680 merged commit 160deae into databricks:main Jul 22, 2025
15 of 19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants