Skip to content
33 changes: 2 additions & 31 deletions google/cloud/storage/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,39 +225,10 @@ StatusOr<ObjectMetadata> Client::UploadStreamResumable(

Status Client::DownloadFileImpl(internal::ReadObjectRangeRequest const& request,
std::string const& file_name) {
auto const* func = __func__;
auto msg = [&request, &file_name, func](char const* what) {
std::ostringstream os;
os << func << "(" << request << ", " << file_name << "): " << what;
return std::move(os).str();
};

auto stream = ReadObjectImpl(request);
if (stream.bad()) return stream.status();

// Open the destination file, and immediate raise an exception on failure.
std::ofstream os(file_name, std::ios::binary);
if (!os.is_open()) {
return google::cloud::internal::InvalidArgumentError(
msg("cannot open download destination file - ofstream::open()"),
GCP_ERROR_INFO());
}

auto const& current = google::cloud::internal::CurrentOptions();
auto const size = current.get<DownloadBufferSizeOption>();
std::unique_ptr<char[]> buffer(new char[size]);
do {
stream.read(buffer.get(), size);
os.write(buffer.get(), stream.gcount());
} while (os.good() && stream.good());
os.close();
if (!os.good()) {
return google::cloud::internal::UnknownError(
msg("cannot close download destination file - ofstream::close()"),
GCP_ERROR_INFO());
}
if (stream.bad()) return stream.status();
return Status();
return connection_->DownloadStreamToFile(std::move(stream), file_name,
request);
}

std::string Client::SigningEmail(SigningAccount const& signing_account) const {
Expand Down
35 changes: 35 additions & 0 deletions google/cloud/storage/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,41 @@ integrity checks using the DisableMD5Hash() and DisableCrc32cChecksum() options.
return std::unique_ptr<std::istream>(std::move(source));
}

Status StorageConnectionImpl::DownloadStreamToFile(
ObjectReadStream&& stream, std::string const& file_name,
ReadObjectRangeRequest const& request) {
auto const* func = __func__;
auto msg = [&request, &file_name, func](char const* what) {
std::ostringstream os;
os << func << "(" << request << ", " << file_name << "): " << what;
return std::move(os).str();
};

// Open the destination file, and immediate raise an exception on failure.
std::ofstream os(file_name, std::ios::binary);
if (!os.is_open()) {
return google::cloud::internal::InvalidArgumentError(
msg("cannot open download destination file - ofstream::open()"),
GCP_ERROR_INFO());
}

auto const& current = google::cloud::internal::CurrentOptions();
auto const size = current.get<DownloadBufferSizeOption>();
std::unique_ptr<char[]> buffer(new char[size]);
do {
stream.read(buffer.get(), size);
os.write(buffer.get(), stream.gcount());
} while (os.good() && stream.good());
os.close();
if (!os.good()) {
return google::cloud::internal::UnknownError(
msg("cannot close download destination file - ofstream::close()"),
GCP_ERROR_INFO());
}
if (stream.bad()) return stream.status();
return Status();
}

StatusOr<ObjectMetadata> StorageConnectionImpl::ExecuteParallelUploadFile(
std::vector<std::thread> threads,
std::vector<ParallelUploadFileShard> shards, bool ignore_cleanup_failures) {
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/storage/idempotency_policy.h"
#include "google/cloud/storage/internal/generic_stub.h"
#include "google/cloud/storage/internal/storage_connection.h"
#include "google/cloud/storage/object_read_stream.h"
#include "google/cloud/storage/retry_policy.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/internal/invocation_id_generator.h"
Expand Down Expand Up @@ -104,6 +105,9 @@ class StorageConnectionImpl
InsertObjectMediaRequest& request) override;
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
std::string const& file_name, ResumableUploadRequest& request) override;
Status DownloadStreamToFile(ObjectReadStream&& stream,
std::string const& file_name,
ReadObjectRangeRequest const& request) override;
StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
std::vector<std::thread> threads,
std::vector<ParallelUploadFileShard> shards,
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/storage/internal/storage_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "google/cloud/storage/internal/sign_blob_requests.h"
#include "google/cloud/storage/oauth2/credentials.h"
#include "google/cloud/storage/object_metadata.h"
#include "google/cloud/storage/object_read_stream.h"
#include "google/cloud/storage/service_account.h"
#include "google/cloud/storage/version.h"
#include "google/cloud/options.h"
Expand Down Expand Up @@ -120,6 +121,10 @@ class StorageConnection {
std::string const&, ResumableUploadRequest&) {
return Status(StatusCode::kUnimplemented, "unimplemented");
}
virtual Status DownloadStreamToFile(ObjectReadStream&&, std::string const&,
ReadObjectRangeRequest const&) {
return Status(StatusCode::kUnimplemented, "unimplemented");
}
virtual StatusOr<ObjectMetadata> ExecuteParallelUploadFile(
std::vector<std::thread>, std::vector<ParallelUploadFileShard>, bool);
///@}
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/storage/internal/storage_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/internal/storage_connection.h"
#include "google/cloud/storage/object_read_stream.h"
#include "google/cloud/storage/parallel_upload.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
Expand Down Expand Up @@ -161,6 +162,15 @@ TEST(StorageConnectionTest, UploadFileResumableUnimplemented) {
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
}

TEST(StorageConnectionTest, DownloadStreamToFileUnimplemented) {
TestStorageConnection connection;
ObjectReadStream stream;
ReadObjectRangeRequest request;
auto response = connection.DownloadStreamToFile(std::move(stream),
"test-file.txt", request);
EXPECT_THAT(response, StatusIs(StatusCode::kUnimplemented));
}

TEST(StorageConnectionTest, ExecuteParallelUploadFileUnimplemented) {
TestStorageConnection connection;
std::vector<std::thread> threads;
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/storage/internal/tracing_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ StatusOr<std::unique_ptr<std::istream>> TracingConnection::UploadFileResumable(
impl_->UploadFileResumable(file_name, request));
}

Status TracingConnection::DownloadStreamToFile(
storage::ObjectReadStream&& stream, std::string const& file_name,
storage::internal::ReadObjectRangeRequest const& request) {
auto span = internal::MakeSpan(
"storage::Client::DownloadToFile/DownloadStreamToFile");
auto scope = opentelemetry::trace::Scope(span);
return internal::EndSpan(*span, impl_->DownloadStreamToFile(
std::move(stream), file_name, request));
}

StatusOr<storage::ObjectMetadata> TracingConnection::ExecuteParallelUploadFile(
std::vector<std::thread> threads,
std::vector<storage::internal::ParallelUploadFileShard> shards,
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/storage/internal/tracing_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ class TracingConnection : public storage::internal::StorageConnection {
StatusOr<std::unique_ptr<std::istream>> UploadFileResumable(
std::string const& file_name,
storage::internal::ResumableUploadRequest& request) override;
Status DownloadStreamToFile(
storage::ObjectReadStream&&, std::string const&,
storage::internal::ReadObjectRangeRequest const&) override;
StatusOr<storage::ObjectMetadata> ExecuteParallelUploadFile(
std::vector<std::thread> threads,
std::vector<storage::internal::ParallelUploadFileShard> shards,
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/storage/internal/tracing_connection_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY

#include "google/cloud/storage/internal/tracing_connection.h"
#include "google/cloud/storage/object_read_stream.h"
#include "google/cloud/storage/testing/canonical_errors.h"
#include "google/cloud/storage/testing/mock_client.h"
#include "google/cloud/internal/opentelemetry.h"
Expand Down Expand Up @@ -746,6 +747,35 @@ TEST(TracingClientTest, UploadFileResumable) {
"gl-cpp.status_code", code_str)))));
}

TEST(TracingClientTest, DownloadStreamToFile) {
auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
EXPECT_CALL(*mock, DownloadStreamToFile)
.WillOnce([](auto&&, auto const&, auto const&) {
EXPECT_TRUE(ThereIsAnActiveSpan());
return PermanentError();
});
auto under_test = TracingConnection(mock);
storage::ObjectReadStream stream;
storage::internal::ReadObjectRangeRequest request("test-bucket",
"test-object");
auto actual = under_test.DownloadStreamToFile(std::move(stream),
"test-file.txt", request);

auto const code = PermanentError().code();
auto const code_str = StatusCodeToString(code);
auto const msg = PermanentError().message();
EXPECT_THAT(actual, StatusIs(code, msg));
EXPECT_THAT(
span_catcher->GetSpans(),
ElementsAre(AllOf(
SpanNamed("storage::Client::DownloadToFile/DownloadStreamToFile"),
SpanHasInstrumentationScope(), SpanKindIsClient(),
SpanWithStatus(opentelemetry::trace::StatusCode::kError, msg),
SpanHasAttributes(
OTelAttribute<std::string>("gl-cpp.status_code", code_str)))));
}

TEST(TracingClientTest, ExecuteParallelUploadFile) {
auto span_catcher = InstallSpanCatcher();
auto mock = std::make_shared<MockClient>();
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/storage/testing/mock_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class MockClient : public google::cloud::storage::internal::StorageConnection {
MOCK_METHOD(StatusOr<std::unique_ptr<std::istream>>, UploadFileResumable,
(std::string const&, storage::internal::ResumableUploadRequest&),
(override));
MOCK_METHOD(Status, DownloadStreamToFile,
(ObjectReadStream&&, std::string const&,
storage::internal::ReadObjectRangeRequest const&),
(override));
MOCK_METHOD(StatusOr<ObjectMetadata>, ExecuteParallelUploadFile,
(std::vector<std::thread>,
std::vector<storage::internal::ParallelUploadFileShard>, bool),
Expand Down
Loading