Skip to content

Commit 196e435

Browse files
authored
Non throwing object read stream (#1750)
When possible, use `status()` to report errors in `ObjectReadStream`. Unfortunately the standard library assumes that exceptions are enabled, and a `std::basic_istreambuf<>` can only report errors by raising exceptions. When exceptions are disabled this is, hmmm, difficult, so we must rely on the application dutifully checking the `status()` flag.
1 parent 37051bf commit 196e435

10 files changed

Lines changed: 204 additions & 53 deletions

google/cloud/storage/client.cc

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -147,39 +147,40 @@ StatusOr<ObjectMetadata> Client::UploadStreamResumable(
147147

148148
void Client::DownloadFileImpl(internal::ReadObjectRangeRequest const& request,
149149
std::string const& file_name) {
150+
// TODO(#1665) - use Status to report errors.
150151
std::unique_ptr<internal::ObjectReadStreambuf> streambuf =
151152
raw_client_->ReadObject(request).value();
153+
// Open the download stream and immediately raise an exception on failure.
152154
ObjectReadStream stream(std::move(streambuf));
153-
if (stream.eof() and not stream.IsOpen()) {
154-
std::string msg = __func__;
155-
msg += ": cannot open source object ";
156-
msg += request.object_name();
157-
msg += " in bucket ";
158-
msg += request.bucket_name();
159-
google::cloud::internal::RaiseRuntimeError(msg);
155+
156+
auto report_error = [&](char const* func, char const* what) {
157+
std::ostringstream msg;
158+
msg << func << "(" << request << ", " << file_name
159+
<< "): " << what << " - status=" << stream.status();
160+
google::cloud::internal::RaiseRuntimeError(std::move(msg).str());
161+
};
162+
if (not stream.status().ok()) {
163+
report_error(__func__, "cannot open download stream");
160164
}
165+
166+
// Open the destination file, and immediate raise an exception on failure.
161167
std::ofstream os(file_name);
162168
if (not os.is_open()) {
163-
std::string msg = __func__;
164-
msg += ": cannot open destination file ";
165-
msg += file_name;
166-
google::cloud::internal::RaiseRuntimeError(msg);
169+
report_error(__func__, "cannot open destination file");
167170
}
171+
168172
std::string buffer;
169173
buffer.resize(raw_client_->client_options().download_buffer_size(), '\0');
170174
do {
171175
stream.read(&buffer[0], buffer.size());
172176
os.write(buffer.data(), stream.gcount());
173-
if (not stream.good()) {
174-
break;
175-
}
176-
} while (os.good());
177+
} while (os.good() and stream.good());
177178
os.close();
178179
if (not os.good()) {
179-
std::string msg = __func__;
180-
msg += ": failure closing destination file ";
181-
msg += file_name;
182-
google::cloud::internal::RaiseRuntimeError(msg);
180+
report_error(__func__, "error closing destination file");
181+
}
182+
if (not stream.status().ok()) {
183+
report_error(__func__, "error in download stream");
183184
}
184185
}
185186

google/cloud/storage/client.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,14 @@ class Client {
786786
/**
787787
* Reads the contents of an object.
788788
*
789+
* Returns an object derived from `std::istream` which can be used to read the
790+
* contents of the GCS blob. The application should check the `badbit` (e.g.
791+
* by calling `stream.bad()`) on the returned object to detect if there was
792+
* an error reading from the blob. If `badbit` is set, the application can
793+
* check the `status()` variable to get details about the failure.
794+
* Applications can also set the exception mask on the returned stream, in
795+
* which case an exception is thrown if an error is detected.
796+
*
789797
* @param bucket_name the name of the bucket that contains the object.
790798
* @param object_name the name of the object to be read.
791799
* @param options a list of optional query parameters and/or request headers.

google/cloud/storage/internal/curl_download_request.cc

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,14 @@ StatusOr<HttpResponse> CurlDownloadRequest::Close() {
4242
// callback, see the comments in the header file for more details.
4343
closing_ = true;
4444
// Block until that callback is made.
45-
Wait([this] { return curl_closed_; });
45+
auto status = Wait([this] { return curl_closed_; });
46+
if (not status.ok()) {
47+
return status;
48+
}
4649

4750
// Now remove the handle from the CURLM* interface and wait for the response.
4851
auto error = curl_multi_remove_handle(multi_.get(), handle_.handle_.get());
49-
auto status = AsStatus(error, __func__);
52+
status = AsStatus(error, __func__);
5053
if (not status.ok()) {
5154
return status;
5255
}
@@ -61,15 +64,18 @@ StatusOr<HttpResponse> CurlDownloadRequest::Close() {
6164

6265
StatusOr<HttpResponse> CurlDownloadRequest::GetMore(std::string& buffer) {
6366
handle_.FlushDebug(__func__);
64-
Wait([this] {
67+
auto status = Wait([this] {
6568
return curl_closed_ or buffer_.size() >= initial_buffer_size_;
6669
});
70+
if (not status.ok()) {
71+
return status;
72+
}
6773
GCP_LOG(DEBUG) << __func__ << "(), curl.size=" << buffer_.size()
6874
<< ", closing=" << closing_ << ", closed=" << curl_closed_;
6975
if (curl_closed_) {
7076
// Remove the handle from the CURLM* interface and wait for the response.
7177
auto error = curl_multi_remove_handle(multi_.get(), handle_.handle_.get());
72-
Status status = AsStatus(error, __func__);
78+
status = AsStatus(error, __func__);
7379
if (not status.ok()) {
7480
return status;
7581
}
@@ -89,9 +95,9 @@ StatusOr<HttpResponse> CurlDownloadRequest::GetMore(std::string& buffer) {
8995
buffer_.swap(buffer);
9096
buffer_.clear();
9197
buffer_.reserve(initial_buffer_size_);
92-
Status pause = handle_.EasyPause(CURLPAUSE_RECV_CONT);
93-
if (not pause.ok()) {
94-
return pause;
98+
status = handle_.EasyPause(CURLPAUSE_RECV_CONT);
99+
if (not status.ok()) {
100+
return status;
95101
}
96102
GCP_LOG(DEBUG) << __func__ << "(), size=" << buffer.size()
97103
<< ", closing=" << closing_ << ", closed=" << curl_closed_

google/cloud/storage/internal/curl_streambuf.cc

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,20 @@ CurlReadStreambuf::CurlReadStreambuf(
3535

3636
bool CurlReadStreambuf::IsOpen() const { return download_.IsOpen(); }
3737

38-
HttpResponse CurlReadStreambuf::Close() {
39-
// TODO(#1736) - return StatusOr<> from here.
40-
return download_.Close().value();
38+
void CurlReadStreambuf::Close() {
39+
auto response = download_.Close();
40+
if (not response.ok()) {
41+
status_ = std::move(response).status();
42+
ReportError(status_);
43+
}
4144
}
4245

4346
CurlReadStreambuf::int_type CurlReadStreambuf::underflow() {
4447
if (not IsOpen()) {
45-
current_ios_buffer_.clear();
46-
current_ios_buffer_.push_back('\0');
47-
char* data = &current_ios_buffer_[0];
48-
setg(data, data + 1, data + 1);
48+
// The stream is closed, reading from a closed stream can happen if there is
49+
// no object to read from, or the object is empty. In that case just setup
50+
// an empty (but valid) region and verify the checksums.
51+
SetEmptyRegion();
4952
hash_validator_result_ = HashValidator::FinishAndCheck(
5053
__func__ + std::string(" mismatched hashes reading from closed stream"),
5154
std::move(*hash_validator_));
@@ -55,13 +58,15 @@ CurlReadStreambuf::int_type CurlReadStreambuf::underflow() {
5558
current_ios_buffer_.reserve(target_buffer_size_);
5659
StatusOr<HttpResponse> response = download_.GetMore(current_ios_buffer_);
5760
if (not response.ok()) {
58-
return traits_type::eof();
61+
return ReportError(std::move(response).status());
5962
}
6063
for (auto const& kv : response->headers) {
6164
hash_validator_->ProcessHeader(kv.first, kv.second);
65+
headers_.emplace(kv.first, kv.second);
6266
}
6367
if (response->status_code >= 300) {
64-
return traits_type::eof();
68+
return ReportError(
69+
Status(response->status_code, std::move(response->payload)));
6570
}
6671

6772
if (not current_ios_buffer_.empty()) {
@@ -70,15 +75,41 @@ CurlReadStreambuf::int_type CurlReadStreambuf::underflow() {
7075
setg(data, data, data + current_ios_buffer_.size());
7176
return traits_type::to_int_type(*data);
7277
}
73-
current_ios_buffer_.push_back('\0');
74-
char* data = &current_ios_buffer_[0];
75-
setg(data, data + 1, data + 1);
78+
79+
// This is an actual EOF, there is no more data to download, create an
80+
// empty (but valid) region:
81+
SetEmptyRegion();
82+
// Verify the checksums, and return the EOF character.
7683
hash_validator_result_ = HashValidator::FinishAndCheck(
7784
__func__ + std::string(" mismatched hashes at end of download"),
7885
std::move(*hash_validator_));
7986
return traits_type::eof();
8087
}
8188

89+
CurlReadStreambuf::int_type CurlReadStreambuf::ReportError(Status status) {
90+
// The only way to report errors from a std::basic_streambuf<> (which this
91+
// class derives from) is to throw exceptions:
92+
// https://stackoverflow.com/questions/50716688/how-to-set-the-badbit-of-a-stream-by-a-customized-streambuf
93+
// but we need to be able to report errors when the application has disabled
94+
// exceptions via `-fno-exceptions` or a similar option. In that case we set
95+
// `status_`, and report the error as an EOF. This is obviously not ideal,
96+
// but it is the best we can do when the application disables the standard
97+
// mechanism to signal errors.
98+
status_ = std::move(status);
99+
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
100+
internal::ThrowStatus(status_);
101+
#else
102+
return traits_type::eof();
103+
#endif // GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
104+
}
105+
106+
void CurlReadStreambuf::SetEmptyRegion() {
107+
current_ios_buffer_.clear();
108+
current_ios_buffer_.push_back('\0');
109+
char* data = &current_ios_buffer_[0];
110+
setg(data, data + 1, data + 1);
111+
}
112+
82113
CurlWriteStreambuf::CurlWriteStreambuf(
83114
CurlUploadRequest&& upload, std::size_t max_buffer_size,
84115
std::unique_ptr<HashValidator> hash_validator)

google/cloud/storage/internal/curl_streambuf.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,35 @@ class CurlReadStreambuf : public ObjectReadStreambuf {
3636

3737
~CurlReadStreambuf() override = default;
3838

39-
HttpResponse Close() override;
39+
void Close() override;
4040
bool IsOpen() const override;
41+
Status const& status() const override { return status_; }
4142
std::string const& received_hash() const override {
4243
return hash_validator_result_.received;
4344
}
4445
std::string const& computed_hash() const override {
4546
return hash_validator_result_.computed;
4647
}
48+
std::multimap<std::string, std::string> const& headers() const override {
49+
return headers_;
50+
}
4751

4852
protected:
4953
int_type underflow() override;
5054

55+
int_type ReportError(Status status);
56+
57+
void SetEmptyRegion();
58+
5159
private:
5260
CurlDownloadRequest download_;
5361
std::string current_ios_buffer_;
5462
std::size_t target_buffer_size_;
5563

5664
std::unique_ptr<HashValidator> hash_validator_;
5765
HashValidator::Result hash_validator_result_;
66+
Status status_;
67+
std::multimap<std::string, std::string> headers_;
5868
};
5969

6070
/**

google/cloud/storage/internal/object_streambuf.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_OBJECT_STREAMBUF_H_
1717

1818
#include "google/cloud/storage/internal/http_response.h"
19+
#include "google/cloud/storage/status.h"
1920
#include <iostream>
2021

2122
namespace google {
@@ -41,10 +42,12 @@ class ObjectReadStreambuf : public std::basic_streambuf<char> {
4142
ObjectReadStreambuf(ObjectReadStreambuf const&) = delete;
4243
ObjectReadStreambuf& operator=(ObjectReadStreambuf const&) = delete;
4344

44-
virtual HttpResponse Close() = 0;
45+
virtual void Close() = 0;
4546
virtual bool IsOpen() const = 0;
47+
virtual Status const& status() const = 0;
4648
virtual std::string const& received_hash() const = 0;
4749
virtual std::string const& computed_hash() const = 0;
50+
virtual std::multimap<std::string, std::string> const& headers() const = 0;
4851
};
4952

5053
/**

google/cloud/storage/object_stream.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ ObjectReadStream::~ObjectReadStream() {
4444
#endif // GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
4545
}
4646

47-
internal::HttpResponse ObjectReadStream::Close() {
47+
void ObjectReadStream::Close() {
4848
if (not IsOpen()) {
49-
google::cloud::internal::RaiseRuntimeError(
50-
"Attempting to Close() closed ObjectReadStream");
49+
return;
50+
}
51+
buf_->Close();
52+
if (not status().ok()) {
53+
setstate(std::ios_base::badbit);
5154
}
52-
return buf_->Close();
5355
}
5456

5557
ObjectWriteStream::~ObjectWriteStream() {

google/cloud/storage/object_stream.h

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,64 @@ class ObjectReadStream : public std::basic_istream<char> {
6363
/// Closes the stream (if necessary).
6464
~ObjectReadStream() override;
6565

66-
bool IsOpen() const { return buf_.get() != nullptr and buf_->IsOpen(); }
67-
internal::HttpResponse Close();
66+
bool IsOpen() const { return (bool)buf_ and buf_->IsOpen(); }
67+
68+
/**
69+
* Terminate the download, possibly before completing it.
70+
*/
71+
void Close();
72+
73+
//@{
74+
/**
75+
* Report any download errors.
76+
*
77+
* Note that errors may go undetected until the download completes.
78+
*/
79+
Status const& status() const& { return buf_->status(); }
80+
81+
/**
82+
* The received CRC32C checksum and the MD5 hash values as reported by GCS.
83+
*
84+
* When the download is finalized (via `Close()` or the end of file) the GCS
85+
* server reports the CRC32C checksum and, except for composite objects, the
86+
* MD5 hash of the data. This class compares the locally computed and received
87+
* hashes so applications can detect data download errors.
88+
*
89+
* The values are reported as comma separated `tag=value` pairs, e.g.
90+
* `crc32c=AAAAAA==,md5=1B2M2Y8AsgTpgAmY7PhCfg==`. The format of this string
91+
* is subject to change without notice, they are provided for informational
92+
* purposes only.
93+
*
94+
* @see https://cloud.google.com/storage/docs/hashes-etags for more
95+
* information on checksums and hashes in GCS.
96+
*/
6897
std::string const& received_hash() const { return buf_->received_hash(); }
98+
99+
/**
100+
* The locally computed checksum and hashes, as a string.
101+
*
102+
* This object computes the CRC32C checksum and MD5 hash of the downloaded
103+
* data. Note that there are several cases where these values may be empty or
104+
* irrelevant, for example:
105+
* - When reading only a portion of a blob the hash of that portion is
106+
* irrelevant, note that GCS only reports the hashes for the full blob.
107+
* - The application may disable the CRC32C and/or the MD5 hash computation.
108+
*
109+
* The string has the same format as the value returned by `received_hash()`.
110+
* Note that the format of this string is also subject to change without
111+
* notice.
112+
*
113+
* @see https://cloud.google.com/storage/docs/hashes-etags for more
114+
* information on checksums and hashes in GCS.
115+
*/
69116
std::string const& computed_hash() const { return buf_->computed_hash(); }
70117

118+
/// The headers returned by the service, for debugging only.
119+
std::multimap<std::string, std::string> const& headers() const {
120+
return buf_->headers();
121+
}
122+
//@}
123+
71124
private:
72125
std::unique_ptr<internal::ObjectReadStreambuf> buf_;
73126
};

google/cloud/storage/tests/object_integration_test.cc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,12 @@ TEST_F(ObjectIntegrationTest, ReadNotFound) {
327327

328328
// Create a iostream to read the object back.
329329
auto stream = client.ReadObject(bucket_name, object_name);
330-
EXPECT_TRUE(stream.eof());
330+
EXPECT_FALSE(stream.status().ok());
331331
EXPECT_FALSE(stream.IsOpen());
332+
EXPECT_EQ(404, stream.status().status_code()) << "status=" << stream.status();
333+
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
334+
EXPECT_TRUE(stream.bad());
335+
#endif // GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
332336
}
333337

334338
TEST_F(ObjectIntegrationTest, Copy) {
@@ -1158,12 +1162,11 @@ TEST_F(ObjectIntegrationTest, StreamingWriteFailure) {
11581162

11591163
// This operation should fail because the object already exists.
11601164
#if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
1161-
EXPECT_THROW(
1162-
try { os.Close(); } catch (std::runtime_error const& ex) {
1163-
EXPECT_THAT(ex.what(), HasSubstr("[412]"));
1164-
throw;
1165-
},
1166-
std::runtime_error);
1165+
EXPECT_THROW(try { os.Close(); } catch (std::runtime_error const& ex) {
1166+
EXPECT_THAT(ex.what(), HasSubstr("[412]"));
1167+
throw;
1168+
},
1169+
std::runtime_error);
11671170
#else
11681171
EXPECT_DEATH_IF_SUPPORTED(os.Close(), "exceptions are disabled");
11691172
#endif // GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS

0 commit comments

Comments
 (0)