Skip to content

Commit 2bc48d6

Browse files
authored
Non throwing object write stream (#1749)
Unless the application explicitly requests exceptions by setting the exception mask. By default `std::ios_base::badbit` is set when an error is detected, and the ObjectWriteStream saves the status with a more detailed error message. This change will reduce code coverage because many errors that used to be propagated as exceptions now require an extra if(), but we do not have test infrastructure to produce these error conditions on demand. Wrote a test to verify that resumable upload errors are reported correctly. Fixed the testbench to properly validate preconditions on a resumable upload. Then found a bug in how we handle error status for resumable upload failures.
1 parent 196e435 commit 2bc48d6

21 files changed

Lines changed: 339 additions & 147 deletions

ci/test-install/storage_install_test.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,19 @@ int main(int argc, char* argv[]) try {
3232
gcs::Client client;
3333

3434
gcs::ObjectWriteStream os = client.WriteObject(bucket_name, object_name);
35+
os.exceptions(std::ios_base::badbit | std::ios_base::failbit);
3536
os << "Hello World" << std::endl;
36-
gcs::ObjectMetadata meta = os.Close();
37+
os.Close();
38+
gcs::ObjectMetadata meta = os.metadata().value();
3739
std::cout << "Successfully created object, generation=" << meta.generation()
3840
<< std::endl;
3941

4042
gcs::ObjectReadStream stream = client.ReadObject(bucket_name, object_name);
43+
stream.exceptions(std::ios_base::badbit | std::ios_base::failbit);
4144

4245
int count = 0;
43-
while (not stream.eof()) {
44-
std::string line;
45-
std::getline(stream, line, '\n');
46+
std::string line;
47+
while (std::getline(stream, line, '\n')) {
4648
++count;
4749
}
4850
client.DeleteObject(bucket_name, object_name,

google/cloud/storage/client_write_object_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class WriteObjectTest : public ::testing::Test {
5959
class MockStreambuf : public internal::ObjectWriteStreambuf {
6060
public:
6161
MOCK_CONST_METHOD0(IsOpen, bool());
62-
MOCK_METHOD0(DoClose, internal::HttpResponse());
62+
MOCK_METHOD0(DoClose, StatusOr<internal::HttpResponse>());
6363
MOCK_METHOD1(ValidateHash, void(ObjectMetadata const&));
6464
MOCK_CONST_METHOD0(received_hash, std::string const&());
6565
MOCK_CONST_METHOD0(computed_hash, std::string const&());
@@ -88,7 +88,8 @@ TEST_F(WriteObjectTest, WriteObject) {
8888

8989
auto stream = client->WriteObject("test-bucket-name", "test-object-name");
9090
stream << "Hello World!";
91-
ObjectMetadata actual = stream.Close();
91+
stream.Close();
92+
ObjectMetadata actual = stream.metadata().value();
9293
EXPECT_EQ(expected, actual);
9394
}
9495

google/cloud/storage/examples/storage_bucket_samples.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ void WriteObjectRequesterPays(google::cloud::storage::Client client, int& argc,
477477
stream << (lineno + 1) << ": I will write better examples\n";
478478
}
479479

480-
gcs::ObjectMetadata meta = stream.Close();
480+
stream.Close();
481+
gcs::ObjectMetadata meta = stream.metadata().value();
481482
std::cout << "The resulting object size is: " << meta.size() << std::endl;
482483
}
483484
//! [write object requester pays]

google/cloud/storage/examples/storage_object_samples.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,8 @@ void WriteObject(google::cloud::storage::Client client, int& argc,
279279
stream << (lineno + 1) << ": " << text << "\n";
280280
}
281281

282-
gcs::ObjectMetadata meta = stream.Close();
282+
stream.Close();
283+
gcs::ObjectMetadata meta = stream.metadata().value();
283284
std::cout << "The resulting object size is: " << meta.size() << std::endl;
284285
}
285286
//! [write object]
@@ -382,7 +383,8 @@ non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
382383
)""";
383384
}
384385

385-
gcs::ObjectMetadata metadata = stream.Close();
386+
stream.Close();
387+
gcs::ObjectMetadata metadata = stream.metadata().value();
386388
std::cout << "Upload completed, the new object metadata is: " << metadata
387389
<< std::endl;
388390
}
@@ -754,7 +756,8 @@ void WriteObjectWithKmsKey(google::cloud::storage::Client client, int& argc,
754756
stream << lineno << ": placeholder text for CMEK example.\n";
755757
}
756758

757-
gcs::ObjectMetadata meta = stream.Close();
759+
stream.Close();
760+
gcs::ObjectMetadata meta = stream.metadata().value();
758761
std::cout << "The resulting object size is: " << meta.size() << std::endl;
759762
}
760763
//! [write object with kms key] [END storage_upload_with_kms_key]

google/cloud/storage/internal/curl_client.cc

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ StatusOr<ResumableUploadResponse> CurlClient::UploadChunk(
282282
bool success_with_308 =
283283
response->status_code == 308 and
284284
response->headers.find("range") != response->headers.end();
285-
if (status.ok() or success_with_308) {
285+
if (response->status_code < 300 or success_with_308) {
286286
return ResumableUploadResponse::FromHttpResponse(*std::move(response));
287287
}
288288
return AsStatus(*response);
@@ -305,7 +305,7 @@ StatusOr<ResumableUploadResponse> CurlClient::QueryResumableUpload(
305305
bool success_with_308 =
306306
response->status_code == 308 and
307307
response->headers.find("range") != response->headers.end();
308-
if (status.ok() or success_with_308) {
308+
if (response->status_code < 300 or success_with_308) {
309309
return ResumableUploadResponse::FromHttpResponse(*std::move(response));
310310
}
311311
return AsStatus(*response);
@@ -1335,11 +1335,8 @@ StatusOr<ObjectMetadata> CurlClient::InsertObjectMediaMultipart(
13351335
writer << crlf << request.contents() << crlf << marker << "--" << crlf;
13361336

13371337
// 6. Return the results as usual.
1338-
auto response = writer.CloseRaw();
1339-
if (response.status_code >= 300) {
1340-
return Status(response.status_code, std::move(response.payload));
1341-
}
1342-
return ObjectMetadata::ParseFromString(response.payload);
1338+
writer.Close();
1339+
return std::move(writer).metadata();
13431340
}
13441341

13451342
std::string CurlClient::PickBoundary(std::string const& text_to_avoid) {

google/cloud/storage/internal/curl_resumable_streambuf.cc

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,13 @@ void CurlResumableStreambuf::ValidateHash(ObjectMetadata const& meta) {
4242
}
4343

4444
CurlResumableStreambuf::int_type CurlResumableStreambuf::overflow(int_type ch) {
45-
Validate(__func__);
46-
Flush(false);
45+
if (not IsOpen()) {
46+
return traits_type::eof();
47+
}
48+
auto status = Flush(false);
49+
if (not status.ok()) {
50+
return traits_type::eof();
51+
}
4752
if (not traits_type::eq_int_type(ch, traits_type::eof())) {
4853
current_ios_buffer_.push_back(traits_type::to_char_type(ch));
4954
pbump(1);
@@ -52,34 +57,34 @@ CurlResumableStreambuf::int_type CurlResumableStreambuf::overflow(int_type ch) {
5257
}
5358

5459
int CurlResumableStreambuf::sync() {
55-
Flush(false);
60+
auto status = Flush(false);
61+
if (not status.ok()) {
62+
return traits_type::eof();
63+
}
5664
return 0;
5765
}
5866

5967
std::streamsize CurlResumableStreambuf::xsputn(char const* s,
6068
std::streamsize count) {
61-
Validate(__func__);
62-
Flush(false);
69+
if (not IsOpen()) {
70+
return traits_type::eof();
71+
}
72+
auto status = Flush(false);
73+
if (not status.ok()) {
74+
return traits_type::eof();
75+
}
76+
6377
current_ios_buffer_.append(s, static_cast<std::size_t>(count));
6478
pbump(static_cast<int>(count));
6579
return count;
6680
}
6781

68-
HttpResponse CurlResumableStreambuf::DoClose() {
82+
StatusOr<HttpResponse> CurlResumableStreambuf::DoClose() {
6983
GCP_LOG(INFO) << __func__ << "()";
7084
return Flush(true);
7185
}
7286

73-
void CurlResumableStreambuf::Validate(char const* where) const {
74-
if (IsOpen()) {
75-
return;
76-
}
77-
std::string msg = "Attempting to use closed CurlResumableStreambuf in ";
78-
msg += where;
79-
google::cloud::internal::RaiseRuntimeError(msg);
80-
}
81-
82-
HttpResponse CurlResumableStreambuf::Flush(bool final_chunk) {
87+
StatusOr<HttpResponse> CurlResumableStreambuf::Flush(bool final_chunk) {
8388
if (not IsOpen()) {
8489
return last_response_;
8590
}
@@ -105,6 +110,10 @@ HttpResponse CurlResumableStreambuf::Flush(bool final_chunk) {
105110
hash_validator_->Update(current_ios_buffer_);
106111

107112
auto result = upload_session_->UploadChunk(current_ios_buffer_, upload_size);
113+
if (not result.ok()) {
114+
// This was an unrecoverable error, time to signal an error.
115+
return std::move(result).status();
116+
}
108117
current_ios_buffer_.clear();
109118
current_ios_buffer_.reserve(max_buffer_size_);
110119
setp(&current_ios_buffer_[0], &current_ios_buffer_[0] + max_buffer_size_);

google/cloud/storage/internal/curl_resumable_streambuf.h

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,11 @@ class CurlResumableStreambuf : public ObjectWriteStreambuf {
5757
int sync() override;
5858
std::streamsize xsputn(char const* s, std::streamsize count) override;
5959
int_type overflow(int_type ch) override;
60-
// TODO(coryan) this is an ugly return type.
61-
HttpResponse DoClose() override;
60+
StatusOr<HttpResponse> DoClose() override;
6261

6362
private:
64-
/// Raise an exception if the stream is closed.
65-
void Validate(char const* where) const;
66-
6763
/// Flush the libcurl buffer and swap it with the iostream buffer.
68-
HttpResponse Flush(bool final_chunk);
64+
StatusOr<HttpResponse> Flush(bool final_chunk);
6965

7066
std::unique_ptr<ResumableUploadSession> upload_session_;
7167

google/cloud/storage/internal/curl_streambuf.cc

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -159,37 +159,48 @@ std::streamsize CurlWriteStreambuf::xsputn(char const* s,
159159
return count;
160160
}
161161

162-
HttpResponse CurlWriteStreambuf::DoClose() {
163-
GCP_LOG(INFO) << __func__ << "()";
164-
Validate(__func__);
165-
SwapBuffers();
166-
// TODO(#1735) - return StatusOr<> from here.
167-
auto response = upload_.Close().value();
168-
for (auto const& kv : response.headers) {
169-
hash_validator_->ProcessHeader(kv.first, kv.second);
162+
StatusOr<HttpResponse> CurlWriteStreambuf::DoClose() {
163+
GCP_LOG(DEBUG) << __func__ << "()";
164+
Status status = Validate(__func__);
165+
if (not status.ok()) {
166+
return status;
167+
}
168+
status = SwapBuffers();
169+
if (not status.ok()) {
170+
return status;
171+
}
172+
auto response = upload_.Close();
173+
if (response.ok()) {
174+
for (auto const& kv : response->headers) {
175+
hash_validator_->ProcessHeader(kv.first, kv.second);
176+
}
170177
}
171178
return response;
172179
}
173180

174-
void CurlWriteStreambuf::Validate(char const* where) const {
181+
Status CurlWriteStreambuf::Validate(char const* where) const {
175182
if (upload_.IsOpen()) {
176-
return;
183+
return Status();
177184
}
178185
std::string msg = "Attempting to use closed CurlStream in ";
179186
msg += where;
180-
google::cloud::internal::RaiseRuntimeError(msg);
187+
return Status(StatusCode::FAILED_PRECONDITION, std::move(msg));
181188
}
182189

183-
void CurlWriteStreambuf::SwapBuffers() {
190+
Status CurlWriteStreambuf::SwapBuffers() {
184191
// Shorten the buffer to the actual used size.
185192
current_ios_buffer_.resize(pptr() - pbase());
186193
// Push the buffer to the libcurl wrapper to be written as needed
187194
hash_validator_->Update(current_ios_buffer_);
188-
upload_.NextBuffer(current_ios_buffer_);
195+
auto status = upload_.NextBuffer(current_ios_buffer_);
196+
if (not status.ok()) {
197+
return status;
198+
}
189199
// Make the buffer big enough to receive more data before needing a flush.
190200
current_ios_buffer_.clear();
191201
current_ios_buffer_.reserve(max_buffer_size_);
192202
setp(&current_ios_buffer_[0], &current_ios_buffer_[0] + max_buffer_size_);
203+
return Status();
193204
}
194205

195206
} // namespace internal

google/cloud/storage/internal/curl_streambuf.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,14 @@ class CurlWriteStreambuf : public ObjectWriteStreambuf {
9595
int sync() override;
9696
std::streamsize xsputn(char const* s, std::streamsize count) override;
9797
int_type overflow(int_type ch) override;
98-
HttpResponse DoClose() override;
98+
StatusOr<HttpResponse> DoClose() override;
9999

100100
private:
101101
/// Raise an exception if the stream is closed.
102-
void Validate(char const* where) const;
102+
Status Validate(char const* where) const;
103103

104104
/// Flush the libcurl buffer and swap it with the iostream buffer.
105-
void SwapBuffers();
105+
Status SwapBuffers();
106106

107107
CurlUploadRequest upload_;
108108
std::string current_ios_buffer_;

google/cloud/storage/internal/curl_upload_request.cc

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,35 @@ CurlUploadRequest::CurlUploadRequest(std::size_t initial_buffer_size)
3636
buffer_rdptr_ = buffer_.end();
3737
}
3838

39-
void CurlUploadRequest::Flush() {
39+
Status CurlUploadRequest::Flush() {
4040
ValidateOpen(__func__);
4141
handle_.FlushDebug(__func__);
4242
GCP_LOG(DEBUG) << __func__ << "(), curl.size=" << buffer_.size()
4343
<< ", curl.rdptr="
4444
<< std::distance(buffer_.begin(), buffer_rdptr_)
4545
<< ", curl.end="
4646
<< std::distance(buffer_.begin(), buffer_.end());
47-
Wait([this] { return buffer_rdptr_ == buffer_.end(); });
47+
return Wait([this] { return buffer_rdptr_ == buffer_.end(); });
4848
}
4949

5050
StatusOr<HttpResponse> CurlUploadRequest::Close() {
51-
ValidateOpen(__func__);
51+
Status status = ValidateOpen(__func__);
52+
if (not status.ok()) {
53+
return status;
54+
}
5255
handle_.FlushDebug(__func__);
53-
Flush();
56+
status = Flush();
57+
if (not status.ok()) {
58+
return status;
59+
}
5460
// Set the the closing_ flag to trigger a return 0 from the next read
5561
// callback, see the comments in the header file for more details.
5662
closing_ = true;
5763
// Block until that callback is made.
58-
Wait([this] { return curl_closed_; });
64+
status = Wait([this] { return curl_closed_; });
65+
if (not status.ok()) {
66+
return status;
67+
}
5968

6069
// Now remove the handle from the CURLM* interface and wait for the response.
6170
auto error = curl_multi_remove_handle(multi_.get(), handle_.handle_.get());
@@ -71,11 +80,15 @@ StatusOr<HttpResponse> CurlUploadRequest::Close() {
7180
std::move(received_headers_)};
7281
}
7382

74-
void CurlUploadRequest::NextBuffer(std::string& next_buffer) {
75-
ValidateOpen(__func__);
76-
Flush();
83+
Status CurlUploadRequest::NextBuffer(std::string& next_buffer) {
84+
Status status = ValidateOpen(__func__);
85+
if (not status.ok()) {
86+
return status;
87+
}
88+
status = Flush();
7789
next_buffer.swap(buffer_);
7890
buffer_rdptr_ = buffer_.begin();
91+
return status;
7992
}
8093

8194
Status CurlUploadRequest::SetOptions() {
@@ -216,13 +229,13 @@ Status CurlUploadRequest::AsStatus(CURLMcode result, char const* where) {
216229
return Status(StatusCode::UNKNOWN, std::move(os).str());
217230
}
218231

219-
void CurlUploadRequest::ValidateOpen(char const* where) {
232+
Status CurlUploadRequest::ValidateOpen(char const* where) {
220233
if (not closing_) {
221-
return;
234+
return Status();
222235
}
223-
std::ostringstream os;
224-
os << "Attempting to use closed CurlUploadRequest in " << where;
225-
google::cloud::internal::RaiseRuntimeError(os.str());
236+
std::string msg = "Attempting to use closed CurlUploadRequest in ";
237+
msg += where;
238+
return Status(StatusCode::FAILED_PRECONDITION, std::move(msg));
226239
}
227240

228241
} // namespace internal

0 commit comments

Comments
 (0)