Skip to content

Commit f3de489

Browse files
authored
feat(storage): add IsOpen API for zonal read operation (#16063)
1 parent be0df4b commit f3de489

11 files changed

+162
-22
lines changed
9.3 KB
Binary file not shown.

google/cloud/storage/async/object_descriptor.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ std::pair<AsyncReader, AsyncToken> ObjectDescriptor::ReadLast(
5252
return {AsyncReader(std::move(reader)), std::move(token)};
5353
}
5454

55+
bool ObjectDescriptor::IsOpen() const { return impl_->IsOpen(); }
56+
5557
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
5658
} // namespace storage
5759
} // namespace cloud

google/cloud/storage/async/object_descriptor.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ class ObjectDescriptor {
7676
*/
7777
std::pair<AsyncReader, AsyncToken> ReadLast(std::int64_t limit);
7878

79+
/**
80+
* Returns true if the descriptor is open.
81+
*
82+
* A descriptor is open if it has not been cancelled and has not hit a
83+
* permanent failure.
84+
*/
85+
bool IsOpen() const;
86+
7987
private:
8088
std::shared_ptr<ObjectDescriptorConnection> impl_;
8189
};

google/cloud/storage/async/object_descriptor_connection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ class ObjectDescriptorConnection {
6161
virtual std::unique_ptr<AsyncReaderConnection> Read(ReadParams p) = 0;
6262

6363
virtual void MakeSubsequentStream() = 0;
64+
65+
/**
66+
* Returns true if the descriptor is open.
67+
*
68+
* A descriptor is open if it has not been cancelled and has not hit a
69+
* permanent failure.
70+
*/
71+
virtual bool IsOpen() const = 0;
6472
};
6573

6674
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/async/object_descriptor_test.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,15 @@ TEST(ObjectDescriptor, ReadExceedsMaxRange) {
186186
EXPECT_FALSE(token.valid());
187187
}
188188

189+
TEST(ObjectDescriptor, IsOpen) {
190+
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
191+
EXPECT_CALL(*mock, IsOpen).WillOnce(Return(true)).WillOnce(Return(false));
192+
193+
auto tested = ObjectDescriptor(mock);
194+
EXPECT_TRUE(tested.IsOpen());
195+
EXPECT_FALSE(tested.IsOpen());
196+
}
197+
189198
} // namespace
190199
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
191200
} // namespace storage

google/cloud/storage/internal/async/connection_impl.cc

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "google/cloud/internal/async_streaming_read_rpc_timeout.h"
5656
#include "google/cloud/internal/async_streaming_write_rpc_timeout.h"
5757
#include "google/cloud/internal/make_status.h"
58+
#include <grpcpp/grpcpp.h>
5859
#include <memory>
5960
#include <utility>
6061

@@ -231,19 +232,33 @@ AsyncConnectionImpl::Open(OpenParams p) {
231232

232233
auto pending = factory(std::move(initial_request));
233234
using ReturnType = std::shared_ptr<storage::ObjectDescriptorConnection>;
234-
return pending.then(
235-
[rp = std::move(resume_policy), fa = std::move(factory),
236-
rs = std::move(p.read_spec),
237-
options = std::move(p.options)](auto f) mutable -> StatusOr<ReturnType> {
238-
auto result = f.get();
239-
if (!result) return std::move(result).status();
240-
241-
auto impl = std::make_shared<ObjectDescriptorImpl>(
242-
std::move(rp), std::move(fa), std::move(rs),
243-
std::move(result->stream), std::move(options));
244-
impl->Start(std::move(result->first_response));
245-
return ReturnType(impl);
246-
});
235+
return pending.then([rp = std::move(resume_policy), fa = std::move(factory),
236+
rs = std::move(p.read_spec),
237+
options = std::move(p.options), refresh = refresh_](
238+
auto f) mutable -> StatusOr<ReturnType> {
239+
auto result = f.get();
240+
if (!result) return std::move(result).status();
241+
242+
// The descriptor remains open if at least one gRPC channel is in a
243+
// functional state. We consider READY, IDLE, and CONNECTING to be
244+
// functional. TRANSIENT_FAILURE and SHUTDOWN are not included because they
245+
// indicate a definitive loss of connectivity or terminal closure.
246+
auto transport_ok = [refresh] {
247+
if (!refresh) return true;
248+
auto const& channels = refresh->channels();
249+
return std::any_of(
250+
channels.begin(), channels.end(), [](auto const& channel) {
251+
auto state = channel->GetState(false);
252+
return state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_IDLE ||
253+
state == GRPC_CHANNEL_CONNECTING;
254+
});
255+
};
256+
auto impl = std::make_shared<ObjectDescriptorImpl>(
257+
std::move(rp), std::move(fa), std::move(rs), std::move(result->stream),
258+
std::move(options), std::move(transport_ok));
259+
impl->Start(std::move(result->first_response));
260+
return ReturnType(impl);
261+
});
247262
}
248263

249264
future<StatusOr<std::unique_ptr<storage::AsyncReaderConnection>>>

google/cloud/storage/internal/async/object_descriptor_connection_tracing.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class AsyncObjectDescriptorConnectionTracing
4646
absl::optional<google::storage::v2::Object> metadata() const override {
4747
return impl_->metadata();
4848
}
49+
bool IsOpen() const override { return impl_->IsOpen(); }
4950

5051
std::unique_ptr<storage::AsyncReaderConnection> Read(ReadParams p) override {
5152
internal::OTelScope scope(span_);

google/cloud/storage/internal/async/object_descriptor_impl.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
3434
std::unique_ptr<storage::ResumePolicy> resume_policy,
3535
OpenStreamFactory make_stream,
3636
google::storage::v2::BidiReadObjectSpec read_object_spec,
37-
std::shared_ptr<OpenStream> stream, Options options)
37+
std::shared_ptr<OpenStream> stream, Options options,
38+
std::function<bool()> transport_ok)
3839
: resume_policy_prototype_(std::move(resume_policy)),
3940
make_stream_(std::move(make_stream)),
4041
read_object_spec_(std::move(read_object_spec)),
41-
options_(std::move(options)) {
42+
options_(std::move(options)),
43+
transport_ok_(std::move(transport_ok)) {
4244
stream_manager_ = std::make_unique<StreamManager>(
4345
[]() -> std::shared_ptr<ReadStream> { return nullptr; }, // NOLINT
4446
std::make_shared<ReadStream>(std::move(stream),
@@ -62,8 +64,18 @@ void ObjectDescriptorImpl::Start(
6264
}
6365
}
6466

67+
bool ObjectDescriptorImpl::IsOpen() const {
68+
{
69+
std::scoped_lock<std::mutex> lk(mu_);
70+
if (cancelled_) return false;
71+
if (stream_manager_->Empty()) return false;
72+
}
73+
return !transport_ok_ || transport_ok_();
74+
}
75+
6576
void ObjectDescriptorImpl::Cancel() {
6677
std::unique_lock<std::mutex> lk(mu_);
78+
if (cancelled_) return;
6779
cancelled_ = true;
6880
if (stream_manager_) stream_manager_->CancelAll();
6981
if (pending_stream_.valid()) pending_stream_.cancel();

google/cloud/storage/internal/async/object_descriptor_impl.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "absl/types/optional.h"
2828
#include "google/storage/v2/storage.pb.h"
2929
#include <cstdint>
30+
#include <functional>
3031
#include <memory>
3132
#include <mutex>
3233
#include <unordered_map>
@@ -59,8 +60,8 @@ class ObjectDescriptorImpl
5960
ObjectDescriptorImpl(std::unique_ptr<storage::ResumePolicy> resume_policy,
6061
OpenStreamFactory make_stream,
6162
google::storage::v2::BidiReadObjectSpec read_object_spec,
62-
std::shared_ptr<OpenStream> stream,
63-
Options options = {});
63+
std::shared_ptr<OpenStream> stream, Options options = {},
64+
std::function<bool()> transport_ok = {});
6465
~ObjectDescriptorImpl() override;
6566

6667
// Start the read loop.
@@ -82,6 +83,8 @@ class ObjectDescriptorImpl
8283

8384
std::size_t StreamSize() const;
8485

86+
bool IsOpen() const override;
87+
8588
private:
8689
using StreamManager = MultiStreamManager<ReadStream, ReadRange>;
8790
using StreamIterator =
@@ -123,6 +126,7 @@ class ObjectDescriptorImpl
123126
google::cloud::StatusOr<storage_internal::OpenStreamResult>>
124127
pending_stream_;
125128
bool cancelled_ = false;
129+
std::function<bool()> transport_ok_;
126130
};
127131

128132
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/storage/internal/async/object_descriptor_impl_test.cc

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,18 @@ auto constexpr kMetadataText = R"pb(
6767

6868
auto NoResume() { return storage::LimitedErrorCountResumePolicy(0)(); }
6969

70-
auto MakeTested(std::unique_ptr<storage::ResumePolicy> resume_policy,
71-
OpenStreamFactory make_stream,
72-
google::storage::v2::BidiReadObjectSpec read_object_spec,
73-
std::shared_ptr<OpenStream> stream) {
70+
auto MakeTested(
71+
std::unique_ptr<storage::ResumePolicy> resume_policy,
72+
OpenStreamFactory make_stream,
73+
google::storage::v2::BidiReadObjectSpec read_object_spec,
74+
std::shared_ptr<OpenStream> stream,
75+
std::function<bool()> transport_ok = [] { return true; }) {
7476
Options options;
7577
options.set<storage::EnableMultiStreamOptimizationOption>(true);
7678
return std::make_shared<ObjectDescriptorImpl>(
7779
std::move(resume_policy), std::move(make_stream),
78-
std::move(read_object_spec), std::move(stream), std::move(options));
80+
std::move(read_object_spec), std::move(stream), std::move(options),
81+
std::move(transport_ok));
7982
}
8083

8184
MATCHER_P(IsProtoEqualModuloRepeatedFieldOrdering, value,
@@ -1784,6 +1787,83 @@ TEST(ObjectDescriptorImpl, MultiStreamOptimizationDisabled) {
17841787
tested.reset();
17851788
}
17861789

1790+
/// @test Verify that IsOpen() is true by default.
1791+
TEST(ObjectDescriptorImpl, IsOpenTrueByDefault) {
1792+
MockFactory factory;
1793+
auto stream = std::make_unique<MockStream>();
1794+
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
1795+
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
1796+
auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
1797+
google::storage::v2::BidiReadObjectSpec{},
1798+
std::make_shared<OpenStream>(std::move(stream)));
1799+
EXPECT_TRUE(tested->IsOpen());
1800+
}
1801+
1802+
/// @test Verify that IsOpen() is false after Cancel().
1803+
TEST(ObjectDescriptorImpl, IsOpenFalseOnCancel) {
1804+
MockFactory factory;
1805+
auto stream = std::make_unique<MockStream>();
1806+
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
1807+
EXPECT_CALL(*stream, Cancel).Times(AtMost(2));
1808+
auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
1809+
google::storage::v2::BidiReadObjectSpec{},
1810+
std::make_shared<OpenStream>(std::move(stream)));
1811+
EXPECT_TRUE(tested->IsOpen());
1812+
tested->Cancel();
1813+
EXPECT_FALSE(tested->IsOpen());
1814+
}
1815+
1816+
/// @test Verify that IsOpen() is false if transport health check fails.
1817+
TEST(ObjectDescriptorImpl, IsOpenFalseOnPermanentError) {
1818+
MockFactory factory;
1819+
auto stream = std::make_unique<MockStream>();
1820+
EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{})));
1821+
EXPECT_CALL(*stream, Cancel).Times(AtMost(1));
1822+
bool transport_ok = true;
1823+
auto transport_ok_callback = [&transport_ok] { return transport_ok; };
1824+
1825+
auto tested = MakeTested(NoResume(), factory.AsStdFunction(),
1826+
google::storage::v2::BidiReadObjectSpec{},
1827+
std::make_shared<OpenStream>(std::move(stream)),
1828+
transport_ok_callback);
1829+
1830+
EXPECT_TRUE(tested->IsOpen());
1831+
transport_ok = false;
1832+
EXPECT_FALSE(tested->IsOpen());
1833+
}
1834+
1835+
TEST(ObjectDescriptorImpl, IsOpenFalseOnTransportFailure) {
1836+
auto stream = std::make_unique<MockStream>();
1837+
EXPECT_CALL(*stream, Cancel).Times(1);
1838+
EXPECT_CALL(*stream, Finish).WillOnce([] {
1839+
return make_ready_future(Status{});
1840+
});
1841+
MockFactory factory;
1842+
auto transport_ok = [] { return false; };
1843+
auto tested = std::make_shared<ObjectDescriptorImpl>(
1844+
NoResume(), factory.AsStdFunction(),
1845+
google::storage::v2::BidiReadObjectSpec{},
1846+
std::make_shared<OpenStream>(std::move(stream)), Options{},
1847+
std::move(transport_ok));
1848+
EXPECT_FALSE(tested->IsOpen());
1849+
}
1850+
1851+
TEST(ObjectDescriptorImpl, IsOpenTrueOnTransportSuccess) {
1852+
auto stream = std::make_unique<MockStream>();
1853+
EXPECT_CALL(*stream, Cancel).Times(1);
1854+
EXPECT_CALL(*stream, Finish).WillOnce([] {
1855+
return make_ready_future(Status{});
1856+
});
1857+
MockFactory factory;
1858+
auto transport_ok = [] { return true; };
1859+
auto tested = std::make_shared<ObjectDescriptorImpl>(
1860+
NoResume(), factory.AsStdFunction(),
1861+
google::storage::v2::BidiReadObjectSpec{},
1862+
std::make_shared<OpenStream>(std::move(stream)), Options{},
1863+
std::move(transport_ok));
1864+
EXPECT_TRUE(tested->IsOpen());
1865+
}
1866+
17871867
} // namespace
17881868
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
17891869
} // namespace storage_internal

0 commit comments

Comments
 (0)