[PECOBLR-348] Added putFiles method in DBFSVolume client#860
Conversation
|
Please ensure that the NEXT_CHANGELOG.md file is updated with any relevant changes. |
jayantsing-db
left a comment
There was a problem hiding this comment.
thanks a lot @shivam2680 for making these changes and perf improvements with async client. I have added my comments to the PR.
Additionally, in general, I feel the implementation could have been much more simple. Right now, we have a lot of nested callbacks, nested retries, a semaphore whole lifecycle needs to manually managed, and nested error handling. This makes it a lot more complex than it has to be. If possible, let's see if we can discuss and simplify this.
|
|
||
| @ParameterizedTest | ||
| @MethodSource("provideParametersForFileOrderTest") | ||
| void testResultOrderMatches( |
There was a problem hiding this comment.
we can get rid of this
There was a problem hiding this comment.
hopefully the stubs are removed as well?
| "test_volume1", | ||
| Arrays.asList("multi_upload_4.txt", "missing_file.txt", "multi_upload_5.txt"), | ||
| Arrays.asList( | ||
| "/tmp/multi_upload_0.txt", "/tmp/multi_upload_100.txt", "/tmp/multi_upload_2.txt"), |
There was a problem hiding this comment.
is it testing the case when the 2nd file is missing? You create a /tmp/multi_upload_1.txt instead of /tmp/multi_upload_100.txt and so file is missing. is that correct?
| // Handle stream reset if needed | ||
| if (!request.isFile() && request.inputStream.markSupported()) { | ||
| try { | ||
| request.inputStream.reset(); | ||
| } catch (IOException e) { | ||
| LOGGER.warn("Could not reset input stream for retry: " + e.getMessage()); | ||
| } | ||
| } |
| String presignedUrl = response.getUrl(); | ||
| LOGGER.debug( | ||
| String.format( | ||
| "Got new presigned URL for retry of %s (attempt %d)", | ||
| request.objectPath, attempt + 1)); | ||
|
|
||
| try { | ||
| // Create upload producer | ||
| AsyncRequestProducer uploadProducer; | ||
| if (request.isFile()) { | ||
| // File upload | ||
| uploadProducer = | ||
| AsyncRequestBuilder.put() | ||
| .setUri(URI.create(presignedUrl)) | ||
| .setEntity( | ||
| AsyncEntityProducers.create( | ||
| request.file.toFile(), ContentType.DEFAULT_BINARY)) | ||
| .build(); | ||
| } else { | ||
| // Stream upload | ||
| AsyncEntityProducer entity = | ||
| new InputStreamFixedLenProducer( | ||
| request.inputStream, request.contentLength); | ||
| uploadProducer = | ||
| AsyncRequestBuilder.put() | ||
| .setUri(URI.create(presignedUrl)) | ||
| .setEntity(entity) | ||
| .build(); | ||
| } | ||
|
|
||
| AsyncResponseConsumer<SimpleHttpResponse> uploadConsumer = | ||
| SimpleResponseConsumer.create(); | ||
|
|
||
| // Create callback with incremented attempt count | ||
| VolumeUploadCallback uploadCallback = | ||
| new VolumeUploadCallback( |
There was a problem hiding this comment.
is this exactly the same code as the one in volume client class? if so, all comments apply to this one as well.
| presignedUrlSemaphore.acquire(); | ||
|
|
||
| // The whenComplete block acts as a "finally" for the async operation. | ||
| // It guarantees the semaphore is released when the future is done, for any reason. | ||
| future.whenComplete( | ||
| (response, throwable) -> { | ||
| LOGGER.debug("Releasing semaphore permit for {}", objectPath); | ||
| presignedUrlSemaphore.release(); | ||
| }); |
There was a problem hiding this comment.
i think during the retry of the same url for which semaphore was acquired earlier, it needs to contest for the same semaphore again. i am not entirely sure if this is a good approach. A failed URL might end up starving to the end.
i think better to retain the semaphore for entire number of attempts.
as a side note/separately, i think this entire impl can be simplified like:
- requestPresignedUrlWithRetry (acquires the semaphore) calls makeRequest (releases semaphore when complete)
- makeRequest does async call and composes the future with processResponse
- processResponse either completes the future or composes the future with a retry till a number of attempts
There was a problem hiding this comment.
i think during the retry of the same url for which semaphore was acquired earlier, it needs to contest for the same semaphore again. i am not entirely sure if this is a good approach. A failed URL might end up starving to the end.
oh sorry, it looks like it retains the semaphore and calls handleRetry with the incomplete future
| * Maximum delay in milliseconds between retry attempts. Caps the exponential backoff to prevent | ||
| * excessively long delays. | ||
| */ | ||
| private static final long MAX_RETRY_DELAY_MS = 10000; // 10 seconds max delay |
There was a problem hiding this comment.
can this be made configurable? even 10s may be higher for some cases
There was a problem hiding this comment.
I don't think we need this. This is just to cap exponential backoff delay. we anyways start from a lower value
|
|
||
| // Get a new presigned URL and retry the upload | ||
| urlGenerator | ||
| .apply(request.ucVolumePath, request.objectPath, 1) |
There was a problem hiding this comment.
add param name for readability: 1 /*.paramName */
jayantsing-db
left a comment
There was a problem hiding this comment.
thanks. could you please log a todo/backlog to revisit some of the retry-unification/callback-handling?
|
|
||
| @ParameterizedTest | ||
| @MethodSource("provideParametersForMultiFilePutTest") | ||
| void testPutFiles( |
There was a problem hiding this comment.
minor qq: what was the need to update stubs for testputobject?
There was a problem hiding this comment.
Might have run this test to verify pre-signed url setup
|
Tracking suggested improvements here |
gopalldb
left a comment
There was a problem hiding this comment.
Rename the flag for maxPresignedUrls
Description
Added putFiles in Volume client to support async upload of files.
Design Doc
Old PR link: #823
Testing
Driver Test
Integration Test
E2E test
Ingestion Team Benchmarks
Benchmark Results
Additional Notes to the Reviewer