[PECOBLR-348] Added putFiles method in DBFSVolume client#823
[PECOBLR-348] Added putFiles method in DBFSVolume client#823shivam2680 wants to merge 1605 commits into
Conversation
* Fix UC volume GET operation * Add tests
…ed by workspace (#604) * init * fmt
* Added the GET and DELETE inside the DBFS client * Added java doc and refractored workspace client initialisation * Added unit tests * Improved the unit test coverage to 85% * Fixed the DBFS client to use ConnectionContext for initializing * Refractored the code for Volume Factory * Added java docs
* Adding changes for Fixing async execution * Add tests * Fixes as per comments * Add tests
* init * init * init * init
* Update SDK version * fix tests as apiClient is being used in sdk now
* Revert "revert (#595)" This reverts commit 56ef04f. * Fix uber minimal * fix version
* column size and nullable col fixed
…-and-deploy (#612)
* Telemetry * Add client * Add telemetry client * Minor fix * More changes * Changes for telemetry * Add more details in Telemetry impl * Add tests * Fix typo * Add more details * Updated error msg * Fix coverage
* parser * more complex data type logic * complete complex data types * more tests * refine * integration test * add test to driver test * removed secret * fmt * fix unsupported test * integration test working * fmt * comments * fmt * update method sig * fmt * comments * comments * comments * fmt * fmt * add * more extensive testing * more extensive testing * more extensive testing * more extensive testing * more extensive testing
* Send intial connection log to unauth telemetry endpoint * Add tests + remove debug logs * fmt * Add tests * Add tests * Add tests for classes i didn't add:) * add getters/setters in systemConfiguration + improve coverage
- Run execution queries in `async` mode - Run metadata queries in `async` mode - Refactor result retrieval in Thrift by improving response handling and fixing polling - Refactor credential refreshing by transitioning from Thrift accessor to Thrift transport - Competing driver, SQL Gateway, and SQL Execution API also execute the queries in async mode - Minor code cleanup Running queries in async mode and using polling will be beneficial: - Freeing up server resources under high load instead of holding the connection for ~5 seconds (direct results) - If query takes more than 5 seconds, server will anyway require polling from client - Didn't notice any regressions (slightly better performance) under high concurrent load when running queries in async mode. Some empirical results below: - Running a simple select query on a table with 10-15 columns for 10K rows in a concurrent environment (250 connections executing query in parallel) - Average results when running on `main` ``` Average query time across connections: 73873.58ms Min query time: 22075ms Max query time: 88997ms ``` - Average results when running in `async` mode ``` Average query time across connections: 68382.95ms Min query time: 24718ms Max query time: 95282ms ```
…ch (#622) This adds a workflow to run JDBC comparator in a separate branch for GA readiness. Note that this does not affect the main branch.
Add databricks protected runners
Use databricks runners
Setup PAT correctly
…y paths (#620) * change file paths * change file paths * change file paths * change file paths * change file paths * change file paths * change file paths
| } | ||
|
|
||
| @Override | ||
| public int getMaxConcurrentPresignedRequests() { |
There was a problem hiding this comment.
let's rename this to UCVolumeUploadBatchSize
There was a problem hiding this comment.
It's not batch size. This is the number of parallel requests that driver sends for fetching pre-signed urls.
| List<CompletableFuture<VolumePutResult>> futures = new ArrayList<>(objectPaths.size()); | ||
|
|
||
| // Initialize lists with null. Later, set values at the correct index | ||
| for (int i = 0; i < objectPaths.size(); i++) { |
There was a problem hiding this comment.
Why is this step needed? To handle partial failures?
There was a problem hiding this comment.
and maintain result order.
jayantsing-db
left a comment
There was a problem hiding this comment.
few comments inline. Could you please remove redundant fake service tests and stubs? i think fake-service does not provide the required fidelity as it is a client upload.
| } | ||
|
|
||
| // Remove null entries from uploadRequests. Files that don't exist will be null | ||
| uploadRequests.removeIf(Objects::isNull); |
There was a problem hiding this comment.
futures seem to be a parallel array. no modifications needed to that array?
There was a problem hiding this comment.
These are 2 different arrays,
futures is just track result order
uploadRequests is used to send parallel requests, if a file doesn't exist, we won't send upload request for that file.
|
|
||
| this.source = source; | ||
| this.contentLength = contentLength; | ||
| this.contentType = Objects.requireNonNull(contentType, "contentType"); |
There was a problem hiding this comment.
if this throws exception, are resources like inputstream closed?
There was a problem hiding this comment.
Actually, this case won't arise as we are setting ContentType.APPLICATION_OCTET_STREAM always while constructing this producer. I can maybe remove this field itself.
| channel.endStream(); | ||
| return; | ||
| } | ||
| int read = source.read(buf, 0, (int) Math.min(buf.length, contentLength - totalSent)); |
There was a problem hiding this comment.
this seems to be a blocking call for async thread pool. can we use nio channels if not too complex?
There was a problem hiding this comment.
I feel based on the current upload latencies, current impl is fine. I agree this is blocking, but is limited by buf size (16KB). A pure NIO pipeline might get complicated
| channel.endStream(); | ||
| return; | ||
| } | ||
| int read = source.read(buf, 0, (int) Math.min(buf.length, contentLength - totalSent)); |
There was a problem hiding this comment.
overflow risks with (int) Math.min(buf.length, contentLength - totalSent)?
There was a problem hiding this comment.
Not really,
buf.length is always 16384(16KB), so Math.min(buf.length, contentLength - totalSent) has max value 16384 only.
| } | ||
| try { | ||
| source.close(); | ||
| } catch (IOException ignore) { |
There was a problem hiding this comment.
silent exception swallowing?
There was a problem hiding this comment.
throwing ex now.
| channel.endStream(); | ||
| } else { | ||
| totalSent += read; | ||
| channel.write(ByteBuffer.wrap(buf, 0, read)); |
There was a problem hiding this comment.
should we check if the channel can accept data?
There was a problem hiding this comment.
introduced a currentChunk ByteBuffer. helps in identifying how many bytes channel accepts and retries until buffer is empty.
| endOfStream = true; | ||
| channel.endStream(); |
There was a problem hiding this comment.
should we check if we have actually sent expected number of bytes and fail accordingly?
| final WorkspaceClient workspaceClient; | ||
| final ApiClient apiClient; | ||
| private final String allowedVolumeIngestionPaths; | ||
| private static int MAX_CONCURRENT_PRESIGNED_REQUESTS = 50; |
There was a problem hiding this comment.
where is this default value used?
There was a problem hiding this comment.
used to initialize the presignedUrlSemaphore that controls the rate limiting for presigned URL requests
Description
Added putFiles in Volume client to support async upload of files.
Design Doc
Testing
Driver Test
Integration Test
E2E test
Ingestion Team Benchmarks
Benchmark Results
Additional Notes to the Reviewer