From 214157bb44ff903e34f5e2516a014785605d974b Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 14:25:03 +0000 Subject: [PATCH 01/11] adding otel tracing decorator for parallel_upload_file --- .../cloud/storage/internal/connection_impl.cc | 23 +++++++++++++++ .../cloud/storage/internal/connection_impl.h | 3 ++ .../storage/internal/storage_connection.h | 4 +++ .../internal/storage_connection_test.cc | 10 +++++++ .../storage/internal/tracing_connection.cc | 11 +++++++ .../storage/internal/tracing_connection.h | 4 +++ .../internal/tracing_connection_test.cc | 29 +++++++++++++++++++ google/cloud/storage/parallel_upload.h | 20 ++----------- google/cloud/storage/testing/mock_client.h | 3 ++ 9 files changed, 89 insertions(+), 18 deletions(-) diff --git a/google/cloud/storage/internal/connection_impl.cc b/google/cloud/storage/internal/connection_impl.cc index 158dbdeb2e58d..2778b74fc237c 100644 --- a/google/cloud/storage/internal/connection_impl.cc +++ b/google/cloud/storage/internal/connection_impl.cc @@ -14,6 +14,7 @@ #include "google/cloud/storage/internal/connection_impl.h" #include "google/cloud/storage/internal/retry_object_read_source.h" +#include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/filesystem.h" #include "google/cloud/internal/opentelemetry.h" #include "google/cloud/internal/rest_retry_loop.h" @@ -854,6 +855,28 @@ integrity checks using the DisableMD5Hash() and DisableCrc32cChecksum() options. return std::unique_ptr(std::move(source)); } +StatusOr StorageConnectionImpl::ExecuteParallelUploadFile( + std::vector shards, bool ignore_cleanup_failures) { + std::vector threads; + threads.reserve(shards.size()); + for (auto& shard : shards) { + threads.emplace_back([&shard] { + // We can safely ignore the status - if something fails we'll know + // when obtaining final metadata. + shard.Upload(); + }); + } + for (auto& thread : threads) { + thread.join(); + } + auto res = shards[0].WaitForCompletion().get(); + auto cleanup_res = shards[0].EagerCleanup(); + if (!cleanup_res.ok() && !ignore_cleanup_failures) { + return cleanup_res; + } + return res; +} + StatusOr StorageConnectionImpl::ListBucketAcl( ListBucketAclRequest const& request) { auto const idempotency = current_idempotency_policy().IsIdempotent(request) diff --git a/google/cloud/storage/internal/connection_impl.h b/google/cloud/storage/internal/connection_impl.h index c42d806f78d84..bc1a8e40c7c5c 100644 --- a/google/cloud/storage/internal/connection_impl.h +++ b/google/cloud/storage/internal/connection_impl.h @@ -104,6 +104,9 @@ class StorageConnectionImpl InsertObjectMediaRequest& request) override; StatusOr> UploadFileResumable( std::string const& file_name, ResumableUploadRequest& request) override; + StatusOr ExecuteParallelUploadFile( + std::vector shards, + bool ignore_cleanup_failures) override; StatusOr ListBucketAcl( ListBucketAclRequest const& request) override; diff --git a/google/cloud/storage/internal/storage_connection.h b/google/cloud/storage/internal/storage_connection.h index e33638fad695f..eb2411a2650b1 100644 --- a/google/cloud/storage/internal/storage_connection.h +++ b/google/cloud/storage/internal/storage_connection.h @@ -118,6 +118,10 @@ class StorageConnection { std::string const&, ResumableUploadRequest&) { return Status(StatusCode::kUnimplemented, "unimplemented"); } + virtual StatusOr ExecuteParallelUploadFile( + std::vector, bool) { + return Status(StatusCode::kUnimplemented, "unimplemented"); + } ///@} ///@{ diff --git a/google/cloud/storage/internal/storage_connection_test.cc b/google/cloud/storage/internal/storage_connection_test.cc index 6e56c82a1dbdb..374c5dc88392f 100644 --- a/google/cloud/storage/internal/storage_connection_test.cc +++ b/google/cloud/storage/internal/storage_connection_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/internal/storage_connection.h" +#include "google/cloud/storage/parallel_upload.h" #include "google/cloud/testing_util/status_matchers.h" #include @@ -160,6 +161,15 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) { EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented)); } +TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) { + TestStorageConnection connection; + std::vector shards; + bool ignore_cleanup_failures; + auto response = connection.ExecuteParallelUploadFile(std::move(shards), + ignore_cleanup_failures); + EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented)); +} + } // namespace } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 661a1a2c5d976..1a8645ef7b562 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -255,6 +255,17 @@ StatusOr> TracingConnection::UploadFileResumable( impl_->UploadFileResumable(file_name, request)); } +StatusOr TracingConnection::ExecuteParallelUploadFile( + std::vector shards, + bool ignore_cleanup_failures) { + auto span = internal::MakeSpan( + "storage::ParallelUploadFile/ExecuteParallelUploadFile"); + auto scope = opentelemetry::trace::Scope(span); + return internal::EndSpan( + *span, impl_->ExecuteParallelUploadFile(std::move(shards), + ignore_cleanup_failures)); +} + StatusOr TracingConnection::ListBucketAcl( storage::internal::ListBucketAclRequest const& request) { diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 60bf303074214..9fe0d7184dd57 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_TRACING_CONNECTION_H #include "google/cloud/storage/internal/storage_connection.h" +#include "google/cloud/storage/parallel_upload.h" #include "google/cloud/storage/version.h" #include #include @@ -104,6 +105,9 @@ class TracingConnection : public storage::internal::StorageConnection { StatusOr> UploadFileResumable( std::string const& file_name, storage::internal::ResumableUploadRequest& request) override; + StatusOr ExecuteParallelUploadFile( + std::vector shards, + bool ignore_cleanup_failures) override; StatusOr ListBucketAcl( storage::internal::ListBucketAclRequest const& request) override; diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 5e6f7dc47f261..13a6ed563b140 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -746,6 +746,35 @@ TEST(TracingClientTest, UploadFileResumable) { "gl-cpp.status_code", code_str))))); } +TEST(TracingClientTest, ExecuteParallelUploadFile) { + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ExecuteParallelUploadFile) + .WillOnce( + [](std::vector, bool) { + EXPECT_TRUE(ThereIsAnActiveSpan()); + return PermanentError(); + }); + auto under_test = TracingConnection(mock); + std::vector shards; + bool ignore_cleanup_failures; + auto actual = + under_test.ExecuteParallelUploadFile(shards, ignore_cleanup_failures); + + auto const code = PermanentError().code(); + auto const code_str = StatusCodeToString(code); + auto const msg = PermanentError().message(); + EXPECT_THAT(actual, StatusIs(code)); + EXPECT_THAT( + span_catcher->GetSpans(), + ElementsAre(AllOf( + SpanNamed("storage::ParallelUploadFile/ExecuteParallelUploadFile"), + SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg), + SpanHasAttributes( + OTelAttribute("gl-cpp.status_code", code_str))))); +} + TEST(TracingClientTest, ListBucketAcl) { auto span_catcher = InstallSpanCatcher(); auto mock = std::make_shared(); diff --git a/google/cloud/storage/parallel_upload.h b/google/cloud/storage/parallel_upload.h index f752c978a78d7..e094212c3e9a2 100644 --- a/google/cloud/storage/parallel_upload.h +++ b/google/cloud/storage/parallel_upload.h @@ -1198,24 +1198,8 @@ StatusOr ParallelUploadFile( return shards.status(); } - std::vector threads; - threads.reserve(shards->size()); - for (auto& shard : *shards) { - threads.emplace_back([&shard] { - // We can safely ignore the status - if something fails we'll know - // when obtaining final metadata. - shard.Upload(); - }); - } - for (auto& thread : threads) { - thread.join(); - } - auto res = (*shards)[0].WaitForCompletion().get(); - auto cleanup_res = (*shards)[0].EagerCleanup(); - if (!cleanup_res.ok() && !ignore_cleanup_failures) { - return cleanup_res; - } - return res; + return internal::ClientImplDetails::GetConnection(client) + ->ExecuteParallelUploadFile(*std::move(shards), ignore_cleanup_failures); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/testing/mock_client.h b/google/cloud/storage/testing/mock_client.h index 5aa1d2fa05de5..e2bd06ea915b0 100644 --- a/google/cloud/storage/testing/mock_client.h +++ b/google/cloud/storage/testing/mock_client.h @@ -108,6 +108,9 @@ class MockClient : public google::cloud::storage::internal::StorageConnection { MOCK_METHOD(StatusOr>, UploadFileResumable, (std::string const&, storage::internal::ResumableUploadRequest&), (override)); + MOCK_METHOD(StatusOr, ExecuteParallelUploadFile, + (std::vector, bool), + (override)); MOCK_METHOD(StatusOr, ListBucketAcl, (internal::ListBucketAclRequest const&), (override)); From bce82b1e36b5f24dd7d95011f75dce2408cfd4c1 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 16:02:27 +0000 Subject: [PATCH 02/11] fixing build failure --- google/cloud/storage/internal/storage_connection.h | 1 + google/cloud/storage/internal/tracing_connection.cc | 1 + google/cloud/storage/parallel_upload.h | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/storage_connection.h b/google/cloud/storage/internal/storage_connection.h index eb2411a2650b1..43fc82c825cfd 100644 --- a/google/cloud/storage/internal/storage_connection.h +++ b/google/cloud/storage/internal/storage_connection.h @@ -45,6 +45,7 @@ namespace cloud { namespace storage { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace internal { +class ParallelUploadFileShard; class ObjectReadStreambuf; /** diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index 1a8645ef7b562..b81c77f805c0f 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -14,6 +14,7 @@ #include "google/cloud/storage/internal/tracing_connection.h" #include "google/cloud/storage/internal/tracing_object_read_source.h" +#include "google/cloud/storage/parallel_upload.h" #include "google/cloud/internal/opentelemetry.h" #include #include diff --git a/google/cloud/storage/parallel_upload.h b/google/cloud/storage/parallel_upload.h index e094212c3e9a2..3e827a8ea9dfb 100644 --- a/google/cloud/storage/parallel_upload.h +++ b/google/cloud/storage/parallel_upload.h @@ -1191,7 +1191,7 @@ StatusOr ParallelUploadFile( "Provided Option not found in ParallelUploadFileSupportedOptions."); auto shards = internal::CreateParallelUploadShards::Create( - std::move(client), std::move(file_name), std::move(bucket_name), + client, std::move(file_name), std::move(bucket_name), std::move(object_name), std::move(prefix), std::forward(options)...); if (!shards) { From f118e76a4f3140936741dadce8119d66b7c52960 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 18:47:33 +0000 Subject: [PATCH 03/11] fixing presubmits --- google/cloud/storage/internal/storage_connection.cc | 8 ++++++++ google/cloud/storage/internal/storage_connection.h | 4 +--- google/cloud/storage/internal/storage_connection_test.cc | 2 +- google/cloud/storage/internal/tracing_connection_test.cc | 6 +++--- google/cloud/storage/parallel_upload.h | 2 +- google/cloud/storage/testing/mock_client.h | 1 + 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/google/cloud/storage/internal/storage_connection.cc b/google/cloud/storage/internal/storage_connection.cc index 4b3533ae35c95..541f91889445e 100644 --- a/google/cloud/storage/internal/storage_connection.cc +++ b/google/cloud/storage/internal/storage_connection.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/internal/storage_connection.h" +#include "google/cloud/storage/parallel_upload.h" #include #include @@ -44,6 +45,13 @@ StatusOr CreateOrResume( std::move(response->payload)}; } +StatusOr StorageConnection::ExecuteParallelUploadFile( + std::vector, bool) { + return Status( + StatusCode::kUnimplemented, + "ExecuteParallelUploadFile() is not implemented by this Object"); +} + } // namespace internal GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage diff --git a/google/cloud/storage/internal/storage_connection.h b/google/cloud/storage/internal/storage_connection.h index 43fc82c825cfd..6e301f56bfb7d 100644 --- a/google/cloud/storage/internal/storage_connection.h +++ b/google/cloud/storage/internal/storage_connection.h @@ -120,9 +120,7 @@ class StorageConnection { return Status(StatusCode::kUnimplemented, "unimplemented"); } virtual StatusOr ExecuteParallelUploadFile( - std::vector, bool) { - return Status(StatusCode::kUnimplemented, "unimplemented"); - } + std::vector, bool); ///@} ///@{ diff --git a/google/cloud/storage/internal/storage_connection_test.cc b/google/cloud/storage/internal/storage_connection_test.cc index 374c5dc88392f..59b350c2f09aa 100644 --- a/google/cloud/storage/internal/storage_connection_test.cc +++ b/google/cloud/storage/internal/storage_connection_test.cc @@ -164,7 +164,7 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) { TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) { TestStorageConnection connection; std::vector shards; - bool ignore_cleanup_failures; + bool ignore_cleanup_failures = false; auto response = connection.ExecuteParallelUploadFile(std::move(shards), ignore_cleanup_failures); EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented)); diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 13a6ed563b140..48f97cf5483a9 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -757,9 +757,9 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { }); auto under_test = TracingConnection(mock); std::vector shards; - bool ignore_cleanup_failures; - auto actual = - under_test.ExecuteParallelUploadFile(shards, ignore_cleanup_failures); + bool ignore_cleanup_failures = false; + auto actual = under_test.ExecuteParallelUploadFile(std::move(shards), + ignore_cleanup_failures); auto const code = PermanentError().code(); auto const code_str = StatusCodeToString(code); diff --git a/google/cloud/storage/parallel_upload.h b/google/cloud/storage/parallel_upload.h index 3e827a8ea9dfb..d749a3bb436ed 100644 --- a/google/cloud/storage/parallel_upload.h +++ b/google/cloud/storage/parallel_upload.h @@ -1199,7 +1199,7 @@ StatusOr ParallelUploadFile( } return internal::ClientImplDetails::GetConnection(client) - ->ExecuteParallelUploadFile(*std::move(shards), ignore_cleanup_failures); + ->ExecuteParallelUploadFile(std::move(*shards), ignore_cleanup_failures); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/testing/mock_client.h b/google/cloud/storage/testing/mock_client.h index e2bd06ea915b0..c86f64ab34a26 100644 --- a/google/cloud/storage/testing/mock_client.h +++ b/google/cloud/storage/testing/mock_client.h @@ -17,6 +17,7 @@ #include "google/cloud/storage/client.h" #include "google/cloud/storage/internal/storage_connection.h" +#include "google/cloud/storage/parallel_upload.h" #include #include #include From 2042eb8eb55e7ed4c04633aa0cfaf4c73d09d39c Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 19:08:42 +0000 Subject: [PATCH 04/11] fix linting --- google/cloud/storage/internal/storage_connection.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/storage/internal/storage_connection.cc b/google/cloud/storage/internal/storage_connection.cc index 541f91889445e..86abdabfb2c0e 100644 --- a/google/cloud/storage/internal/storage_connection.cc +++ b/google/cloud/storage/internal/storage_connection.cc @@ -45,6 +45,7 @@ StatusOr CreateOrResume( std::move(response->payload)}; } +// NOLINTNEXTLINE(performance-unnecessary-value-param) StatusOr StorageConnection::ExecuteParallelUploadFile( std::vector, bool) { return Status( From 61f8bfae597c229a8ee1aa0aba31a6df5ca13330 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 19:57:42 +0000 Subject: [PATCH 05/11] fix linting --- google/cloud/storage/internal/storage_connection.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/storage_connection.cc b/google/cloud/storage/internal/storage_connection.cc index 86abdabfb2c0e..4a12b8f5b5167 100644 --- a/google/cloud/storage/internal/storage_connection.cc +++ b/google/cloud/storage/internal/storage_connection.cc @@ -45,9 +45,9 @@ StatusOr CreateOrResume( std::move(response->payload)}; } -// NOLINTNEXTLINE(performance-unnecessary-value-param) StatusOr StorageConnection::ExecuteParallelUploadFile( - std::vector, bool) { + std::vector, + bool) { // NOLINT(performance-unnecessary-value-param) return Status( StatusCode::kUnimplemented, "ExecuteParallelUploadFile() is not implemented by this Object"); From e6245e2251d0e412fc1327a576b3b89b494785b9 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sat, 19 Jul 2025 20:01:16 +0000 Subject: [PATCH 06/11] fix linting --- google/cloud/storage/internal/storage_connection.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/storage_connection.cc b/google/cloud/storage/internal/storage_connection.cc index 4a12b8f5b5167..3e646299f6c7a 100644 --- a/google/cloud/storage/internal/storage_connection.cc +++ b/google/cloud/storage/internal/storage_connection.cc @@ -46,8 +46,9 @@ StatusOr CreateOrResume( } StatusOr StorageConnection::ExecuteParallelUploadFile( - std::vector, - bool) { // NOLINT(performance-unnecessary-value-param) + std::vector< + ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) + bool) { return Status( StatusCode::kUnimplemented, "ExecuteParallelUploadFile() is not implemented by this Object"); From e2203347b5f77b09747f7edd45b7c4f1dfdc721e Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sun, 20 Jul 2025 07:49:47 +0000 Subject: [PATCH 07/11] fix linting --- google/cloud/storage/internal/tracing_connection_test.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 48f97cf5483a9..6d058d5b056e9 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -751,7 +751,10 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { auto mock = std::make_shared(); EXPECT_CALL(*mock, ExecuteParallelUploadFile) .WillOnce( - [](std::vector, bool) { + [](std::vector< + storage::internal:: + ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) + bool) { EXPECT_TRUE(ThereIsAnActiveSpan()); return PermanentError(); }); From 70a7ea6e6753e4a5cd366d57600b456fe94b3213 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sun, 20 Jul 2025 10:19:46 +0000 Subject: [PATCH 08/11] fix linting --- google/cloud/storage/internal/connection_impl.cc | 10 +--------- google/cloud/storage/internal/connection_impl.h | 1 + google/cloud/storage/internal/storage_connection.cc | 1 + google/cloud/storage/internal/storage_connection.h | 3 ++- .../storage/internal/storage_connection_test.cc | 5 +++-- google/cloud/storage/internal/tracing_connection.cc | 7 ++++--- google/cloud/storage/internal/tracing_connection.h | 1 + .../storage/internal/tracing_connection_test.cc | 9 ++++++--- google/cloud/storage/parallel_upload.h | 12 +++++++++++- google/cloud/storage/testing/mock_client.h | 3 ++- 10 files changed, 32 insertions(+), 20 deletions(-) diff --git a/google/cloud/storage/internal/connection_impl.cc b/google/cloud/storage/internal/connection_impl.cc index 2778b74fc237c..ec0fc632b0c3d 100644 --- a/google/cloud/storage/internal/connection_impl.cc +++ b/google/cloud/storage/internal/connection_impl.cc @@ -856,16 +856,8 @@ integrity checks using the DisableMD5Hash() and DisableCrc32cChecksum() options. } StatusOr StorageConnectionImpl::ExecuteParallelUploadFile( + std::vector threads, std::vector shards, bool ignore_cleanup_failures) { - std::vector threads; - threads.reserve(shards.size()); - for (auto& shard : shards) { - threads.emplace_back([&shard] { - // We can safely ignore the status - if something fails we'll know - // when obtaining final metadata. - shard.Upload(); - }); - } for (auto& thread : threads) { thread.join(); } diff --git a/google/cloud/storage/internal/connection_impl.h b/google/cloud/storage/internal/connection_impl.h index bc1a8e40c7c5c..82338cbb65b48 100644 --- a/google/cloud/storage/internal/connection_impl.h +++ b/google/cloud/storage/internal/connection_impl.h @@ -105,6 +105,7 @@ class StorageConnectionImpl StatusOr> UploadFileResumable( std::string const& file_name, ResumableUploadRequest& request) override; StatusOr ExecuteParallelUploadFile( + std::vector threads, std::vector shards, bool ignore_cleanup_failures) override; diff --git a/google/cloud/storage/internal/storage_connection.cc b/google/cloud/storage/internal/storage_connection.cc index 3e646299f6c7a..793f88abeb021 100644 --- a/google/cloud/storage/internal/storage_connection.cc +++ b/google/cloud/storage/internal/storage_connection.cc @@ -46,6 +46,7 @@ StatusOr CreateOrResume( } StatusOr StorageConnection::ExecuteParallelUploadFile( + std::vector, // NOLINT(performance-unnecessary-value-param) std::vector< ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) bool) { diff --git a/google/cloud/storage/internal/storage_connection.h b/google/cloud/storage/internal/storage_connection.h index 6e301f56bfb7d..3496ef97d1f20 100644 --- a/google/cloud/storage/internal/storage_connection.h +++ b/google/cloud/storage/internal/storage_connection.h @@ -38,6 +38,7 @@ #include "google/cloud/status_or.h" #include #include +#include #include namespace google { @@ -120,7 +121,7 @@ class StorageConnection { return Status(StatusCode::kUnimplemented, "unimplemented"); } virtual StatusOr ExecuteParallelUploadFile( - std::vector, bool); + std::vector, std::vector, bool); ///@} ///@{ diff --git a/google/cloud/storage/internal/storage_connection_test.cc b/google/cloud/storage/internal/storage_connection_test.cc index 59b350c2f09aa..9093163557f10 100644 --- a/google/cloud/storage/internal/storage_connection_test.cc +++ b/google/cloud/storage/internal/storage_connection_test.cc @@ -163,10 +163,11 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) { TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) { TestStorageConnection connection; + std::vector threads; std::vector shards; bool ignore_cleanup_failures = false; - auto response = connection.ExecuteParallelUploadFile(std::move(shards), - ignore_cleanup_failures); + auto response = connection.ExecuteParallelUploadFile( + std::move(threads), std::move(shards), ignore_cleanup_failures); EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented)); } diff --git a/google/cloud/storage/internal/tracing_connection.cc b/google/cloud/storage/internal/tracing_connection.cc index b81c77f805c0f..9827f3f52cdce 100644 --- a/google/cloud/storage/internal/tracing_connection.cc +++ b/google/cloud/storage/internal/tracing_connection.cc @@ -257,14 +257,15 @@ StatusOr> TracingConnection::UploadFileResumable( } StatusOr TracingConnection::ExecuteParallelUploadFile( + std::vector threads, std::vector shards, bool ignore_cleanup_failures) { auto span = internal::MakeSpan( "storage::ParallelUploadFile/ExecuteParallelUploadFile"); auto scope = opentelemetry::trace::Scope(span); - return internal::EndSpan( - *span, impl_->ExecuteParallelUploadFile(std::move(shards), - ignore_cleanup_failures)); + return internal::EndSpan(*span, impl_->ExecuteParallelUploadFile( + std::move(threads), std::move(shards), + ignore_cleanup_failures)); } StatusOr diff --git a/google/cloud/storage/internal/tracing_connection.h b/google/cloud/storage/internal/tracing_connection.h index 9fe0d7184dd57..f06b4d5914752 100644 --- a/google/cloud/storage/internal/tracing_connection.h +++ b/google/cloud/storage/internal/tracing_connection.h @@ -106,6 +106,7 @@ class TracingConnection : public storage::internal::StorageConnection { std::string const& file_name, storage::internal::ResumableUploadRequest& request) override; StatusOr ExecuteParallelUploadFile( + std::vector threads, std::vector shards, bool ignore_cleanup_failures) override; diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 6d058d5b056e9..93fe917aa34a2 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -751,7 +751,9 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { auto mock = std::make_shared(); EXPECT_CALL(*mock, ExecuteParallelUploadFile) .WillOnce( - [](std::vector< + [](std::vector + threads, // NOLINT(performance-unnecessary-value-param) + std::vector< storage::internal:: ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) bool) { @@ -759,10 +761,11 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { return PermanentError(); }); auto under_test = TracingConnection(mock); + std::vector threads; std::vector shards; bool ignore_cleanup_failures = false; - auto actual = under_test.ExecuteParallelUploadFile(std::move(shards), - ignore_cleanup_failures); + auto actual = under_test.ExecuteParallelUploadFile( + std::move(threads), std::move(shards), ignore_cleanup_failures); auto const code = PermanentError().code(); auto const code_str = StatusCodeToString(code); diff --git a/google/cloud/storage/parallel_upload.h b/google/cloud/storage/parallel_upload.h index d749a3bb436ed..d1648a606f86a 100644 --- a/google/cloud/storage/parallel_upload.h +++ b/google/cloud/storage/parallel_upload.h @@ -1198,8 +1198,18 @@ StatusOr ParallelUploadFile( return shards.status(); } + std::vector threads; + threads.reserve(shards->size()); + for (auto& shard : *shards) { + threads.emplace_back([&shard] { + // We can safely ignore the status - if something fails we'll know + // when obtaining final metadata. + shard.Upload(); + }); + } return internal::ClientImplDetails::GetConnection(client) - ->ExecuteParallelUploadFile(std::move(*shards), ignore_cleanup_failures); + ->ExecuteParallelUploadFile(std::move(threads), std::move(*shards), + ignore_cleanup_failures); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/testing/mock_client.h b/google/cloud/storage/testing/mock_client.h index c86f64ab34a26..32e39445abe03 100644 --- a/google/cloud/storage/testing/mock_client.h +++ b/google/cloud/storage/testing/mock_client.h @@ -110,7 +110,8 @@ class MockClient : public google::cloud::storage::internal::StorageConnection { (std::string const&, storage::internal::ResumableUploadRequest&), (override)); MOCK_METHOD(StatusOr, ExecuteParallelUploadFile, - (std::vector, bool), + (std::vector, + std::vector, bool), (override)); MOCK_METHOD(StatusOr, ListBucketAcl, From d1ca85849dfaf4101d0ae30cf382f9a14847818b Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sun, 20 Jul 2025 10:27:35 +0000 Subject: [PATCH 09/11] fix linting --- google/cloud/storage/internal/tracing_connection_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 93fe917aa34a2..60bd3d3b8c17e 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -751,8 +751,7 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { auto mock = std::make_shared(); EXPECT_CALL(*mock, ExecuteParallelUploadFile) .WillOnce( - [](std::vector - threads, // NOLINT(performance-unnecessary-value-param) + [](std::vector, // NOLINT(performance-unnecessary-value-param) std::vector< storage::internal:: ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) From 19c118a408eb722ccc0b689deef74f2b10a3dedf Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Sun, 20 Jul 2025 10:36:33 +0000 Subject: [PATCH 10/11] fix linting --- google/cloud/storage/internal/tracing_connection_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/internal/tracing_connection_test.cc b/google/cloud/storage/internal/tracing_connection_test.cc index 60bd3d3b8c17e..07d2d25235574 100644 --- a/google/cloud/storage/internal/tracing_connection_test.cc +++ b/google/cloud/storage/internal/tracing_connection_test.cc @@ -751,7 +751,8 @@ TEST(TracingClientTest, ExecuteParallelUploadFile) { auto mock = std::make_shared(); EXPECT_CALL(*mock, ExecuteParallelUploadFile) .WillOnce( - [](std::vector, // NOLINT(performance-unnecessary-value-param) + [](std::vector< + std::thread>, // NOLINT(performance-unnecessary-value-param) std::vector< storage::internal:: ParallelUploadFileShard>, // NOLINT(performance-unnecessary-value-param) From 4d35a00527fc7d2ccf456445e16aa87984638132 Mon Sep 17 00:00:00 2001 From: Shubham Kaushal Date: Tue, 29 Jul 2025 10:22:55 +0000 Subject: [PATCH 11/11] creating client connection early --- google/cloud/storage/parallel_upload.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/parallel_upload.h b/google/cloud/storage/parallel_upload.h index d1648a606f86a..0d0e08db1b0ac 100644 --- a/google/cloud/storage/parallel_upload.h +++ b/google/cloud/storage/parallel_upload.h @@ -1190,8 +1190,9 @@ StatusOr ParallelUploadFile( internal::IsOptionSupportedWithParallelUpload::value, "Provided Option not found in ParallelUploadFileSupportedOptions."); + auto connection = internal::ClientImplDetails::GetConnection(client); auto shards = internal::CreateParallelUploadShards::Create( - client, std::move(file_name), std::move(bucket_name), + std::move(client), std::move(file_name), std::move(bucket_name), std::move(object_name), std::move(prefix), std::forward(options)...); if (!shards) { @@ -1207,9 +1208,8 @@ StatusOr ParallelUploadFile( shard.Upload(); }); } - return internal::ClientImplDetails::GetConnection(client) - ->ExecuteParallelUploadFile(std::move(threads), std::move(*shards), - ignore_cleanup_failures); + return connection->ExecuteParallelUploadFile( + std::move(threads), std::move(*shards), ignore_cleanup_failures); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END