Skip to content

Commit 5725fbc

Browse files
committed
Tests for subsequent stream
1 parent ccb1362 commit 5725fbc

File tree

5 files changed

+277
-2
lines changed

5 files changed

+277
-2
lines changed

google/cloud/storage/async/object_descriptor_test.cc

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/storage/async/object_descriptor.h"
16+
#include "google/cloud/storage/async/options.h"
1617
#include "google/cloud/storage/mocks/mock_async_object_descriptor_connection.h"
1718
#include "google/cloud/storage/mocks/mock_async_reader_connection.h"
1819
#include "google/cloud/testing_util/status_matchers.h"
@@ -147,6 +148,48 @@ TEST(ObjectDescriptor, ReadLast) {
147148
EXPECT_FALSE(token.valid());
148149
}
149150

151+
TEST(ObjectDescriptor, ReadExceedsMaxRange) {
152+
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
153+
auto constexpr kMaxRange = 1024;
154+
EXPECT_CALL(*mock, options)
155+
.WillRepeatedly(
156+
Return(Options{}.set<storage_experimental::MaximumRangeSizeOption>(
157+
kMaxRange)));
158+
159+
EXPECT_CALL(*mock, MakeSubsequentStream).Times(1);
160+
161+
EXPECT_CALL(*mock, Read)
162+
.WillOnce([&](ReadParams p) -> std::unique_ptr<AsyncReaderConnection> {
163+
EXPECT_EQ(p.start, 100);
164+
EXPECT_EQ(p.length, kMaxRange + 1);
165+
auto reader = std::make_unique<MockAsyncReaderConnection>();
166+
EXPECT_CALL(*reader, Read)
167+
.WillOnce([] {
168+
return make_ready_future(
169+
ReadResponse(ReadPayload(std::string("some data"))));
170+
})
171+
.WillOnce([] { return make_ready_future(ReadResponse(Status{})); });
172+
return reader;
173+
});
174+
175+
auto tested = ObjectDescriptor(mock);
176+
AsyncReader reader;
177+
AsyncToken token;
178+
std::tie(reader, token) = tested.Read(100, kMaxRange + 1);
179+
ASSERT_TRUE(token.valid());
180+
181+
auto r1 = reader.Read(std::move(token)).get();
182+
ASSERT_STATUS_OK(r1);
183+
ReadPayload payload;
184+
std::tie(payload, token) = *std::move(r1);
185+
EXPECT_THAT(payload.contents(), ElementsAre("some data"));
186+
187+
auto r2 = reader.Read(std::move(token)).get();
188+
ASSERT_STATUS_OK(r2);
189+
std::tie(payload, token) = *std::move(r2);
190+
EXPECT_FALSE(token.valid());
191+
}
192+
150193
} // namespace
151194
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
152195
} // namespace storage_experimental

google/cloud/storage/async/resume_policy.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
3535
}
3636
Action OnFinish(Status const& s) override {
3737
if (!s.ok()) ++error_count_;
38-
return error_count_ > maximum_resumes_ ? kStop : kContinue;
38+
return error_count_ >= maximum_resumes_ ? kStop : kContinue;
3939
}
4040

4141
private:

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ bool ObjectDescriptorImpl::IsResumable(
248248
for (auto const& any : proto_status.details()) {
249249
auto error = google::storage::v2::BidiReadObjectError{};
250250
if (!any.UnpackTo(&error)) continue;
251-
auto ranges = CopyActiveRanges();
252251
for (auto const& range : CopyActiveRanges()) {
253252
for (auto const& range_error : error.read_range_errors()) {
254253
if (range.first != range_error.read_id()) continue;

google/cloud/storage/internal/async/object_descriptor_impl_test.cc

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,183 @@ TEST(ObjectDescriptorImpl, RecoverFromPartialFailure) {
11901190
EXPECT_THAT(s3r1.get(), VariantWith<Status>(PermanentError()));
11911191
}
11921192

1193+
/// @test Verify that we can create a subsequent stream and read from it.
1194+
TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) {
1195+
// Setup
1196+
auto constexpr kResponse0 = R"pb(
1197+
metadata {
1198+
bucket: "projects/_/buckets/test-bucket"
1199+
name: "test-object"
1200+
generation: 42
1201+
}
1202+
read_handle { handle: "handle-12345" }
1203+
)pb";
1204+
auto constexpr kRequest1 = R"pb(
1205+
read_ranges { read_id: 1 read_offset: 100 read_length: 100 }
1206+
)pb";
1207+
auto constexpr kResponse1 = R"pb(
1208+
object_data_ranges {
1209+
range_end: true
1210+
read_range { read_id: 1 read_offset: 100 }
1211+
checksummed_data { content: "payload-for-stream-1" }
1212+
}
1213+
)pb";
1214+
auto constexpr kRequest2 = R"pb(
1215+
read_ranges { read_id: 2 read_offset: 200 read_length: 200 }
1216+
)pb";
1217+
auto constexpr kResponse2 = R"pb(
1218+
object_data_ranges {
1219+
range_end: true
1220+
read_range { read_id: 2 read_offset: 200 }
1221+
checksummed_data { content: "payload-for-stream-2" }
1222+
}
1223+
)pb";
1224+
1225+
AsyncSequencer<bool> sequencer;
1226+
1227+
// First stream setup
1228+
auto stream1 = std::make_unique<MockStream>();
1229+
EXPECT_CALL(*stream1, Write)
1230+
.WillOnce([&](Request const& request, grpc::WriteOptions) {
1231+
auto expected = Request{};
1232+
EXPECT_TRUE(TextFormat::ParseFromString(kRequest1, &expected));
1233+
EXPECT_THAT(request, IsProtoEqual(expected));
1234+
return sequencer.PushBack("Write[1]").then([](auto f) {
1235+
return f.get();
1236+
});
1237+
});
1238+
EXPECT_CALL(*stream1, Read)
1239+
.WillOnce([&]() {
1240+
return sequencer.PushBack("Read[1]").then([&](auto) {
1241+
auto response = Response{};
1242+
EXPECT_TRUE(TextFormat::ParseFromString(kResponse1, &response));
1243+
return absl::make_optional(response);
1244+
});
1245+
})
1246+
.WillOnce([&]() {
1247+
return sequencer.PushBack("Read[1.eos]").then([&](auto) {
1248+
return absl::optional<Response>{};
1249+
});
1250+
});
1251+
EXPECT_CALL(*stream1, Finish).WillOnce([&]() {
1252+
return sequencer.PushBack("Finish[1]").then([](auto) { return Status{}; });
1253+
});
1254+
EXPECT_CALL(*stream1, Cancel).Times(1);
1255+
1256+
// Second stream setup
1257+
auto stream2 = std::make_unique<MockStream>();
1258+
EXPECT_CALL(*stream2, Write)
1259+
.WillOnce([&](Request const& request, grpc::WriteOptions) {
1260+
auto expected = Request{};
1261+
EXPECT_TRUE(TextFormat::ParseFromString(kRequest2, &expected));
1262+
EXPECT_THAT(request, IsProtoEqual(expected));
1263+
return sequencer.PushBack("Write[2]").then([](auto f) {
1264+
return f.get();
1265+
});
1266+
});
1267+
EXPECT_CALL(*stream2, Read)
1268+
.WillOnce([&]() {
1269+
return sequencer.PushBack("Read[2]").then([&](auto) {
1270+
auto response = Response{};
1271+
EXPECT_TRUE(TextFormat::ParseFromString(kResponse2, &response));
1272+
return absl::make_optional(response);
1273+
});
1274+
})
1275+
.WillOnce([&]() {
1276+
return sequencer.PushBack("Read[2.eos]").then([](auto) {
1277+
return absl::optional<Response>{};
1278+
});
1279+
});
1280+
EXPECT_CALL(*stream2, Finish).WillOnce([&]() {
1281+
return sequencer.PushBack("Finish[2]").then([](auto) { return Status{}; });
1282+
});
1283+
EXPECT_CALL(*stream2, Cancel).Times(1);
1284+
1285+
// Mock factory for subsequent streams
1286+
MockFactory factory;
1287+
EXPECT_CALL(factory, Call).WillOnce([&](Request const&) {
1288+
auto stream_result = OpenStreamResult{
1289+
std::make_shared<OpenStream>(std::move(stream2)), Response{}};
1290+
return make_ready_future(make_status_or(std::move(stream_result)));
1291+
});
1292+
1293+
// Create the ObjectDescriptorImpl
1294+
auto tested = std::make_shared<ObjectDescriptorImpl>(
1295+
NoResume(), factory.AsStdFunction(),
1296+
google::storage::v2::BidiReadObjectSpec{},
1297+
std::make_shared<OpenStream>(std::move(stream1)));
1298+
1299+
auto response0 = Response{};
1300+
EXPECT_TRUE(TextFormat::ParseFromString(kResponse0, &response0));
1301+
tested->Start(std::move(response0));
1302+
1303+
auto read1 = sequencer.PopFrontWithName();
1304+
EXPECT_EQ(read1.second, "Read[1]");
1305+
// Start a read on the first stream
1306+
auto reader1 = tested->Read({100, 100});
1307+
auto future1 = reader1->Read();
1308+
// The implementation starts a read loop eagerly after Start(), and then
1309+
// the call to tested->Read() schedules a write.
1310+
auto write1 = sequencer.PopFrontWithName();
1311+
EXPECT_EQ(write1.second, "Write[1]");
1312+
write1.first.set_value(true);
1313+
1314+
// Now we can satisfy the read. This will deliver the data to the reader.
1315+
read1.first.set_value(true);
1316+
1317+
EXPECT_THAT(future1.get(),
1318+
VariantWith<storage_experimental::ReadPayload>(ResultOf(
1319+
"contents are",
1320+
[](storage_experimental::ReadPayload const& p) {
1321+
return p.contents();
1322+
},
1323+
ElementsAre(absl::string_view{"payload-for-stream-1"}))));
1324+
1325+
EXPECT_THAT(reader1->Read().get(), VariantWith<Status>(IsOk()));
1326+
1327+
auto next = sequencer.PopFrontWithName();
1328+
EXPECT_EQ(next.second, "Read[1.eos]");
1329+
next.first.set_value(true);
1330+
1331+
// The first stream should be finishing now.
1332+
auto finish1 = sequencer.PopFrontWithName();
1333+
EXPECT_EQ(finish1.second, "Finish[1]");
1334+
finish1.first.set_value(true);
1335+
1336+
// Create and switch to a new stream
1337+
tested->MakeSubsequentStream();
1338+
1339+
auto read2 = sequencer.PopFrontWithName();
1340+
EXPECT_EQ(read2.second, "Read[2]");
1341+
// Start a read on the second stream
1342+
auto reader2 = tested->Read({200, 200});
1343+
auto future2 = reader2->Read();
1344+
1345+
auto write2 = sequencer.PopFrontWithName();
1346+
EXPECT_EQ(write2.second, "Write[2]");
1347+
write2.first.set_value(true);
1348+
1349+
read2.first.set_value(true);
1350+
1351+
EXPECT_THAT(future2.get(),
1352+
VariantWith<storage_experimental::ReadPayload>(ResultOf(
1353+
"contents are",
1354+
[](storage_experimental::ReadPayload const& p) {
1355+
return p.contents();
1356+
},
1357+
ElementsAre(absl::string_view{"payload-for-stream-2"}))));
1358+
1359+
EXPECT_THAT(reader2->Read().get(), VariantWith<Status>(IsOk()));
1360+
1361+
auto read2_eos = sequencer.PopFrontWithName();
1362+
EXPECT_EQ(read2_eos.second, "Read[2.eos]");
1363+
read2_eos.first.set_value(true);
1364+
1365+
auto finish2 = sequencer.PopFrontWithName();
1366+
EXPECT_EQ(finish2.second, "Finish[2]");
1367+
finish2.first.set_value(true);
1368+
}
1369+
11931370
} // namespace
11941371
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
11951372
} // namespace storage_internal

google/cloud/storage/tests/async_client_integration_test.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "google/cloud/storage/async/bucket_name.h"
1818
#include "google/cloud/storage/async/client.h"
1919
#include "google/cloud/storage/async/idempotency_policy.h"
20+
#include "google/cloud/storage/async/options.h"
2021
#include "google/cloud/storage/async/read_all.h"
2122
#include "google/cloud/storage/grpc_plugin.h"
2223
#include "google/cloud/storage/testing/storage_integration_test.h"
@@ -962,6 +963,61 @@ TEST_F(AsyncClientIntegrationTest, Open) {
962963
storage::Generation(metadata->generation()));
963964
}
964965

966+
TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) {
967+
if (!UsingEmulator()) GTEST_SKIP();
968+
auto async = AsyncClient(
969+
TestOptions().set<storage_experimental::MaximumRangeSizeOption>(1024));
970+
auto client = MakeIntegrationTestClient(true, TestOptions());
971+
auto object_name = MakeRandomObjectName();
972+
973+
auto create = client.CreateBucket(
974+
bucket_name(), storage::BucketMetadata{}.set_location("us-west4"));
975+
if (!create && create.status().code() != StatusCode::kAlreadyExists) {
976+
GTEST_FAIL() << "cannot create bucket: " << create.status();
977+
}
978+
979+
auto constexpr kSize = 2048;
980+
auto const block = MakeRandomData(kSize);
981+
982+
auto w =
983+
async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name)
984+
.get();
985+
ASSERT_STATUS_OK(w);
986+
AsyncWriter writer;
987+
AsyncToken token;
988+
std::tie(writer, token) = *std::move(w);
989+
auto p = writer.Write(std::move(token), WritePayload(block)).get();
990+
ASSERT_STATUS_OK(p);
991+
token = *std::move(p);
992+
993+
auto metadata = writer.Finalize(std::move(token)).get();
994+
ASSERT_STATUS_OK(metadata);
995+
996+
auto spec = google::storage::v2::BidiReadObjectSpec{};
997+
spec.set_bucket(BucketName(bucket_name()).FullName());
998+
spec.set_object(object_name);
999+
auto descriptor = async.Open(spec).get();
1000+
ASSERT_STATUS_OK(descriptor);
1001+
1002+
AsyncReader r0;
1003+
AsyncToken t0;
1004+
auto actual0 = std::string{};
1005+
std::tie(r0, t0) = descriptor->Read(0, kSize);
1006+
while (t0.valid()) {
1007+
auto read = r0.Read(std::move(t0)).get();
1008+
ASSERT_STATUS_OK(read);
1009+
ReadPayload p;
1010+
AsyncToken t;
1011+
std::tie(p, t) = *std::move(read);
1012+
for (auto sv : p.contents()) actual0 += std::string(sv);
1013+
t0 = std::move(t);
1014+
}
1015+
1016+
EXPECT_EQ(actual0.size(), kSize);
1017+
client.DeleteObject(bucket_name(), object_name,
1018+
storage::Generation(metadata->generation()));
1019+
}
1020+
9651021
} // namespace
9661022
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
9671023
} // namespace storage_experimental

0 commit comments

Comments
 (0)