Skip to content

Commit 1160a9d

Browse files
authored
fix(storage): Reset write offset on gRPC BidiWriteObject resumable uploads (#16083)
When a stream breaks, the connection enters a resume state. A key step in OnResume is to create a new underlying connection. This new connection is initialized to start writing data from the persisted size reported by the GCS backend. The existing logic did not work as expected in some cases because it used relative decrements that became invalid after buffer truncation. This could happen if the size of payload persisted by the server did not match the size sent by the client. This change introduces an is_resume flag to the OnQuery method. When OnQuery is called as part of the Resume() flow (i.e., after a new underlying connection is established), write_offset_ is now explicitly set to 0. This ensures that the WriteLoop correctly starts sending from the beginning of the truncated resend_buffer_ on the resumed stream.
1 parent 1b9b3c1 commit 1160a9d

4 files changed

Lines changed: 215 additions & 5 deletions

File tree

google/cloud/storage/internal/async/writer_connection_buffered.cc

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,8 @@ class AsyncWriterConnectionBufferedState
281281
return tmp;
282282
}
283283

284-
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size) {
284+
void OnQuery(std::unique_lock<std::mutex> lk, std::int64_t persisted_size,
285+
bool is_resume = false) {
285286
if (persisted_size < buffer_offset_) {
286287
auto id = UploadId(lk);
287288
return SetError(std::move(lk),
@@ -297,7 +298,22 @@ class AsyncWriterConnectionBufferedState
297298
}
298299
resend_buffer_.RemovePrefix(static_cast<std::size_t>(n));
299300
buffer_offset_ = persisted_size;
300-
write_offset_ -= static_cast<std::size_t>(n);
301+
if (is_resume) {
302+
// Since the buffer has been modified to start exactly at the point of the
303+
// resume, the next write on this new stream should start from the
304+
// beginning of this truncated buffer.
305+
write_offset_ = 0;
306+
} else {
307+
// While rare, it is possible that n >= write_offset_ (i.e. the server has
308+
// persisted more than we have sent) if, for example, multiple clients
309+
// resume the same upload. If that is the case, all the bytes covered by
310+
// write_offset_ have been flushed and we can reset it to 0.
311+
if (static_cast<std::size_t>(n) >= write_offset_) {
312+
write_offset_ = 0;
313+
} else {
314+
write_offset_ -= static_cast<std::size_t>(n);
315+
}
316+
}
301317
// If the buffer is small enough, collect all the handlers to notify them.
302318
auto const handlers = ClearHandlersIfEmpty(lk);
303319
// SetFlushed will release the lock before returning.
@@ -382,7 +398,7 @@ class AsyncWriterConnectionBufferedState
382398
std::move(state)));
383399
}
384400
// Regular resume succeeded, object not finalized. Continue writing.
385-
OnQuery(std::move(lk), absl::get<std::int64_t>(state));
401+
OnQuery(std::move(lk), absl::get<std::int64_t>(state), /*is_resume=*/true);
386402
}
387403

388404
void SetFinalized(std::unique_lock<std::mutex> lk,

google/cloud/storage/internal/async/writer_connection_buffered_test.cc

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,67 @@ TEST(WriteConnectionBuffered, SetFinalizedIsIdempotent) {
12661266
next.first.set_value(true);
12671267
}
12681268

1269+
TEST(WriteConnectionBuffered, ResetWriteOffsetOnResume) {
1270+
AsyncSequencer<bool> sequencer;
1271+
auto mock = std::make_unique<MockAsyncWriterConnection>();
1272+
auto* mock_ptr = mock.get();
1273+
1274+
EXPECT_CALL(*mock_ptr, UploadId).WillRepeatedly(Return("test-upload-id"));
1275+
EXPECT_CALL(*mock_ptr, PersistedState)
1276+
.WillOnce(
1277+
Return(MakePersistedState(0))); // Initial state: 0 bytes persisted.
1278+
1279+
EXPECT_CALL(*mock_ptr, Write).WillOnce([&](auto) {
1280+
return sequencer.PushBack("Write").then([](auto f) {
1281+
if (!f.get()) return TransientError(); // This write will fail.
1282+
return Status{};
1283+
});
1284+
});
1285+
1286+
MockFactory mock_factory;
1287+
auto resumed_mock = std::make_unique<MockAsyncWriterConnection>();
1288+
auto* resumed_mock_ptr = resumed_mock.get();
1289+
1290+
EXPECT_CALL(mock_factory, Call).WillOnce([&]() {
1291+
return sequencer.PushBack("Resume").then([&](auto) {
1292+
// The resumed connection reports that 1024 bytes have been persisted.
1293+
EXPECT_CALL(*resumed_mock_ptr, PersistedState)
1294+
.WillRepeatedly(Return(MakePersistedState(1024)));
1295+
// We expect the next write on the resumed stream to send the remaining
1296+
// 1024 bytes. If the write offset was not reset to 0, this size would be
1297+
// incorrect.
1298+
EXPECT_CALL(*resumed_mock_ptr, Write).WillOnce([&](auto payload) {
1299+
EXPECT_EQ(payload.size(), 1024);
1300+
return sequencer.PushBack("ResumedWrite").then([](auto) {
1301+
return Status{};
1302+
});
1303+
});
1304+
return make_status_or(std::unique_ptr<storage::AsyncWriterConnection>(
1305+
std::move(resumed_mock)));
1306+
});
1307+
});
1308+
1309+
auto connection = MakeWriterConnectionBuffered(
1310+
mock_factory.AsStdFunction(), std::move(mock), TestOptions());
1311+
1312+
// Write a total of 2048 bytes.
1313+
auto write = connection->Write(TestPayload(2048));
1314+
1315+
auto next = sequencer.PopFrontWithName();
1316+
EXPECT_EQ(next.second, "Write");
1317+
next.first.set_value(false);
1318+
1319+
next = sequencer.PopFrontWithName();
1320+
EXPECT_EQ(next.second, "Resume");
1321+
next.first.set_value(true);
1322+
1323+
next = sequencer.PopFrontWithName();
1324+
EXPECT_EQ(next.second, "ResumedWrite");
1325+
next.first.set_value(true);
1326+
1327+
EXPECT_STATUS_OK(write.get());
1328+
}
1329+
12691330
} // namespace
12701331
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
12711332
} // namespace storage_internal

google/cloud/storage/internal/async/writer_connection_resumed.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,22 @@ class AsyncWriterConnectionResumedState
317317
}
318318
resend_buffer_.RemovePrefix(static_cast<std::size_t>(n));
319319
buffer_offset_ = persisted_size;
320-
write_offset_ -= static_cast<std::size_t>(n);
320+
if (state_ == State::kResuming) {
321+
// Since the buffer has been modified to start exactly at the point of the
322+
// resume, the next write on this new stream should start from the
323+
// beginning of this truncated buffer.
324+
write_offset_ = 0;
325+
} else {
326+
// While rare, it is possible that n >= write_offset_ (i.e. the server has
327+
// persisted more than we have sent) if, for example, multiple clients
328+
// resume the same upload. If that is the case, all the bytes covered by
329+
// write_offset_ have been flushed and we can reset it to 0.
330+
if (static_cast<std::size_t>(n) >= write_offset_) {
331+
write_offset_ = 0;
332+
} else {
333+
write_offset_ -= static_cast<std::size_t>(n);
334+
}
335+
}
321336
// If the buffer is small enough, collect all the handlers to notify them.
322337
auto const handlers = ClearHandlersIfEmpty(lk);
323338
state_ = State::kIdle;

google/cloud/storage/internal/async/writer_connection_resumed_test.cc

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
1514
#include "google/cloud/storage/internal/async/writer_connection_resumed.h"
15+
#include "google/cloud/mocks/mock_async_streaming_read_write_rpc.h"
1616
#include "google/cloud/storage/async/connection.h"
17+
#include "google/cloud/storage/internal/grpc/ctype_cord_workaround.h"
1718
#include "google/cloud/storage/mocks/mock_async_writer_connection.h"
1819
#include "google/cloud/storage/testing/canonical_errors.h"
1920
#include "google/cloud/storage/testing/mock_hash_function.h"
@@ -615,6 +616,123 @@ TEST(WriterConnectionResumed, OnQueryUpdatesWriteHandle) {
615616
EXPECT_EQ(current_handle->handle(), "updated-handle");
616617
}
617618

619+
TEST(WriterConnectionResumed, ResetWriteOffsetOnResume) {
620+
AsyncSequencer<bool> sequencer;
621+
auto mock = std::make_unique<MockAsyncWriterConnection>();
622+
auto* mock_ptr = mock.get();
623+
624+
auto initial_request = google::storage::v2::BidiWriteObjectRequest{};
625+
google::storage::v2::BidiWriteObjectResponse first_response;
626+
first_response.mutable_write_handle()->set_handle("initial-handle");
627+
628+
auto mock_hash =
629+
std::make_shared<google::cloud::storage::testing::MockHashFunction>();
630+
EXPECT_CALL(*mock_hash, Update(::testing::An<std::int64_t>(),
631+
::testing::An<absl::Cord const&>(),
632+
::testing::An<std::uint32_t>()))
633+
.WillRepeatedly(Return(Status()));
634+
635+
EXPECT_CALL(*mock_ptr, PersistedState)
636+
.WillOnce(Return(MakePersistedState(0)))
637+
.WillOnce(Return(MakePersistedState(1024)));
638+
639+
auto const payload = TestPayload(2048);
640+
641+
EXPECT_CALL(*mock_ptr, Flush(_)).WillOnce([&](auto) {
642+
return sequencer.PushBack("Flush").then([](auto f) {
643+
if (f.get()) return Status{};
644+
return TransientError();
645+
});
646+
});
647+
648+
MockFactory mock_factory;
649+
auto mock_stream =
650+
std::make_unique<google::cloud::mocks::MockAsyncStreamingReadWriteRpc<
651+
google::storage::v2::BidiWriteObjectRequest,
652+
google::storage::v2::BidiWriteObjectResponse>>();
653+
auto* mock_stream_ptr = mock_stream.get();
654+
655+
EXPECT_CALL(mock_factory, Call(_))
656+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const&) {
657+
WriteObject::WriteResult result;
658+
result.stream = std::move(mock_stream);
659+
result.first_response.mutable_write_handle()->set_handle("new-handle");
660+
return sequencer.PushBack("Factory").then(
661+
[r = std::move(result)](auto) mutable {
662+
return StatusOr<WriteObject::WriteResult>(std::move(r));
663+
});
664+
});
665+
666+
EXPECT_CALL(*mock_stream_ptr, Write(_, _))
667+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
668+
grpc::WriteOptions) {
669+
EXPECT_EQ(GetContent(request.checksummed_data()).size(), 1024);
670+
EXPECT_EQ(GetContent(request.checksummed_data()),
671+
std::string(1024, 'A'));
672+
return sequencer.PushBack("StreamWrite").then([](auto) {
673+
return true;
674+
});
675+
})
676+
.WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request,
677+
grpc::WriteOptions) {
678+
EXPECT_TRUE(GetContent(request.checksummed_data()).empty());
679+
EXPECT_TRUE(request.flush());
680+
return sequencer.PushBack("GhostWrite").then([](auto) { return true; });
681+
});
682+
683+
google::storage::v2::BidiWriteObjectResponse read_response1;
684+
read_response1.set_persisted_size(2048);
685+
google::storage::v2::BidiWriteObjectResponse read_response2;
686+
read_response2.set_persisted_size(2048);
687+
EXPECT_CALL(*mock_stream_ptr, Read)
688+
.WillOnce([&, read_response1]() {
689+
return sequencer.PushBack("StreamRead1").then([read_response1](auto) {
690+
return absl::make_optional(read_response1);
691+
});
692+
})
693+
.WillOnce([&, read_response2]() {
694+
return sequencer.PushBack("StreamRead2").then([read_response2](auto) {
695+
return absl::make_optional(read_response2);
696+
});
697+
});
698+
699+
EXPECT_CALL(*mock_stream_ptr, Finish)
700+
.WillOnce(Return(make_ready_future(Status{})));
701+
EXPECT_CALL(*mock_stream_ptr, Cancel).WillRepeatedly(Return());
702+
703+
auto connection = MakeWriterConnectionResumed(
704+
mock_factory.AsStdFunction(), std::move(mock), initial_request, mock_hash,
705+
first_response, Options{});
706+
707+
auto write = connection->Write(payload);
708+
709+
auto next = sequencer.PopFrontWithName();
710+
EXPECT_EQ(next.second, "Flush");
711+
next.first.set_value(false);
712+
713+
next = sequencer.PopFrontWithName();
714+
EXPECT_EQ(next.second, "Factory");
715+
next.first.set_value(true);
716+
717+
next = sequencer.PopFrontWithName();
718+
EXPECT_EQ(next.second, "StreamWrite");
719+
next.first.set_value(true);
720+
721+
next = sequencer.PopFrontWithName();
722+
EXPECT_EQ(next.second, "StreamRead1");
723+
next.first.set_value(true);
724+
725+
next = sequencer.PopFrontWithName();
726+
EXPECT_EQ(next.second, "GhostWrite");
727+
next.first.set_value(true);
728+
729+
next = sequencer.PopFrontWithName();
730+
EXPECT_EQ(next.second, "StreamRead2");
731+
next.first.set_value(true);
732+
733+
EXPECT_THAT(write.get(), StatusIs(StatusCode::kOk));
734+
}
735+
618736
} // namespace
619737
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
620738
} // namespace storage_internal

0 commit comments

Comments
 (0)