Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google/cloud/storage/async/object_descriptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/async/object_descriptor.h"
#include "google/cloud/storage/async/options.h"
#include "google/cloud/internal/make_status.h"

namespace google {
Expand All @@ -26,6 +27,11 @@ absl::optional<google::storage::v2::Object> ObjectDescriptor::metadata() const {

std::pair<AsyncReader, AsyncToken> ObjectDescriptor::Read(std::int64_t offset,
std::int64_t limit) {
std::int64_t max_range =
impl_->options().get<storage_experimental::MaximumRangeSizeOption>();
if (limit > max_range) {
impl_->MakeSubsequentStream();
}
auto reader = impl_->Read({offset, limit});
auto token = storage_internal::MakeAsyncToken(reader.get());
return {AsyncReader(std::move(reader)), std::move(token)};
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/storage/async/object_descriptor_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class ObjectDescriptorConnection {
* Starts a new range read in the current descriptor.
*/
virtual std::unique_ptr<AsyncReaderConnection> Read(ReadParams p) = 0;

virtual void MakeSubsequentStream() = 0;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
43 changes: 43 additions & 0 deletions google/cloud/storage/async/object_descriptor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/storage/async/object_descriptor.h"
#include "google/cloud/storage/async/options.h"
#include "google/cloud/storage/mocks/mock_async_object_descriptor_connection.h"
#include "google/cloud/storage/mocks/mock_async_reader_connection.h"
#include "google/cloud/testing_util/status_matchers.h"
Expand Down Expand Up @@ -147,6 +148,48 @@ TEST(ObjectDescriptor, ReadLast) {
EXPECT_FALSE(token.valid());
}

TEST(ObjectDescriptor, ReadExceedsMaxRange) {
auto mock = std::make_shared<MockAsyncObjectDescriptorConnection>();
auto constexpr kMaxRange = 1024;
EXPECT_CALL(*mock, options)
.WillRepeatedly(
Return(Options{}.set<storage_experimental::MaximumRangeSizeOption>(
kMaxRange)));

EXPECT_CALL(*mock, MakeSubsequentStream).Times(1);

EXPECT_CALL(*mock, Read)
.WillOnce([&](ReadParams p) -> std::unique_ptr<AsyncReaderConnection> {
EXPECT_EQ(p.start, 100);
EXPECT_EQ(p.length, kMaxRange + 1);
auto reader = std::make_unique<MockAsyncReaderConnection>();
EXPECT_CALL(*reader, Read)
.WillOnce([] {
return make_ready_future(
ReadResponse(ReadPayload(std::string("some data"))));
})
.WillOnce([] { return make_ready_future(ReadResponse(Status{})); });
return reader;
});

auto tested = ObjectDescriptor(mock);
AsyncReader reader;
AsyncToken token;
std::tie(reader, token) = tested.Read(100, kMaxRange + 1);
ASSERT_TRUE(token.valid());

auto r1 = reader.Read(std::move(token)).get();
ASSERT_STATUS_OK(r1);
ReadPayload payload;
std::tie(payload, token) = *std::move(r1);
EXPECT_THAT(payload.contents(), ElementsAre("some data"));

auto r2 = reader.Read(std::move(token)).get();
ASSERT_STATUS_OK(r2);
std::tie(payload, token) = *std::move(r2);
EXPECT_FALSE(token.valid());
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_experimental
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/storage/async/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ struct UseMD5ValueOption {
* @endcode
*/
struct MaximumRangeSizeOption {
using Type = std::uint64_t;
using Type = std::int64_t;
Comment thread
bajajneha27 marked this conversation as resolved.
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
12 changes: 8 additions & 4 deletions google/cloud/storage/async/resume_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
public:
explicit LimitedErrorCountResumePolicyImpl(int maximum_resumes)
: maximum_resumes_(maximum_resumes) {}
~LimitedErrorCountResumePolicyImpl() override = default;

std::unique_ptr<ResumePolicy> clone() const override {
return std::make_unique<LimitedErrorCountResumePolicyImpl>(*this);
}

void OnStartSuccess() override {
// For this policy we are only interested in the number of failures.
Expand All @@ -46,7 +49,10 @@ class LimitedErrorCountResumePolicyImpl : public ResumePolicy {
class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {
public:
StopOnConsecutiveErrorsResumePolicyImpl() = default;
~StopOnConsecutiveErrorsResumePolicyImpl() override = default;

std::unique_ptr<ResumePolicy> clone() const override {
return std::make_unique<StopOnConsecutiveErrorsResumePolicyImpl>(*this);
}

void OnStartSuccess() override { next_action_ = kContinue; }
Action OnFinish(Status const&) override {
Expand All @@ -59,8 +65,6 @@ class StopOnConsecutiveErrorsResumePolicyImpl : public ResumePolicy {

} // namespace

ResumePolicy::~ResumePolicy() = default;

ResumePolicyFactory LimitedErrorCountResumePolicy(int maximum_resumes) {
return [maximum_resumes]() -> std::unique_ptr<ResumePolicy> {
return std::make_unique<LimitedErrorCountResumePolicyImpl>(maximum_resumes);
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/storage/async/resume_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class ResumePolicy {
public:
enum Action { kStop, kContinue };

virtual ~ResumePolicy() = 0;
virtual ~ResumePolicy() = default;

virtual std::unique_ptr<ResumePolicy> clone() const = 0;

/**
* Notifies the policy about successful connections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class AsyncObjectDescriptorConnectionTracing
return MakeTracingReaderConnection(span_, std::move(result));
}

void MakeSubsequentStream() override {
return impl_->MakeSubsequentStream();
};

private:
opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span> span_;
std::shared_ptr<storage_experimental::ObjectDescriptorConnection> impl_;
Expand Down
61 changes: 44 additions & 17 deletions google/cloud/storage/internal/async/object_descriptor_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,50 @@ ObjectDescriptorImpl::ObjectDescriptorImpl(
OpenStreamFactory make_stream,
google::storage::v2::BidiReadObjectSpec read_object_spec,
std::shared_ptr<OpenStream> stream, Options options)
: resume_policy_(std::move(resume_policy)),
: resume_policy_prototype_(std::move(resume_policy)),
make_stream_(std::move(make_stream)),
read_object_spec_(std::move(read_object_spec)),
stream_(std::move(stream)),
options_(std::move(options)) {}
options_(std::move(options)) {
streams_.push_back(
Stream{std::move(stream), {}, resume_policy_prototype_->clone()});
}

ObjectDescriptorImpl::~ObjectDescriptorImpl() { stream_->Cancel(); }
ObjectDescriptorImpl::~ObjectDescriptorImpl() {
for (auto const& stream : streams_) {
stream.stream->Cancel();
}
}

void ObjectDescriptorImpl::Start(
google::storage::v2::BidiReadObjectResponse first_response) {
OnRead(std::move(first_response));
}

void ObjectDescriptorImpl::Cancel() { stream_->Cancel(); }
void ObjectDescriptorImpl::Cancel() {
for (auto const& stream : streams_) {
stream.stream->Cancel();
}
}

absl::optional<google::storage::v2::Object> ObjectDescriptorImpl::metadata()
const {
std::unique_lock<std::mutex> lk(mu_);
return metadata_;
}

void ObjectDescriptorImpl::MakeSubsequentStream() {
auto request = google::storage::v2::BidiReadObjectRequest{};

*request.mutable_read_object_spec() = read_object_spec_;
auto stream_result = make_stream_(std::move(request)).get();

std::unique_lock<std::mutex> lk(mu_);
streams_.push_back(Stream{
std::move(stream_result->stream), {}, resume_policy_prototype_->clone()});
lk.unlock();
OnRead(std::move(stream_result->first_response));
}

std::unique_ptr<storage_experimental::AsyncReaderConnection>
ObjectDescriptorImpl::Read(ReadParams p) {
std::shared_ptr<storage::internal::HashFunction> hash_function =
Expand All @@ -69,7 +92,7 @@ ObjectDescriptorImpl::Read(ReadParams p) {

std::unique_lock<std::mutex> lk(mu_);
auto const id = ++read_id_generator_;
active_ranges_.emplace(id, range);
streams_.back().active_ranges.emplace(id, range);
auto& read_range = *next_request_.add_read_ranges();
read_range.set_read_id(id);
read_range.set_read_offset(p.start);
Expand All @@ -85,8 +108,10 @@ ObjectDescriptorImpl::Read(ReadParams p) {
}

void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk) {
if (write_pending_ || next_request_.read_ranges().empty()) return;
write_pending_ = true;
if (streams_.back().write_pending || next_request_.read_ranges().empty()) {
return;
}
streams_.back().write_pending = true;
google::storage::v2::BidiReadObjectRequest request;
request.Swap(&next_request_);

Expand All @@ -102,7 +127,7 @@ void ObjectDescriptorImpl::Flush(std::unique_lock<std::mutex> lk) {
void ObjectDescriptorImpl::OnWrite(bool ok) {
std::unique_lock<std::mutex> lk(mu_);
if (!ok) return DoFinish(std::move(lk));
write_pending_ = false;
streams_.back().write_pending = false;
Flush(std::move(lk));
}

Expand Down Expand Up @@ -146,9 +171,11 @@ void ObjectDescriptorImpl::OnRead(

void ObjectDescriptorImpl::CleanupDoneRanges(
std::unique_lock<std::mutex> const&) {
for (auto i = active_ranges_.begin(); i != active_ranges_.end();) {
if (streams_.empty()) return;
auto& active_ranges = streams_.back().active_ranges;
for (auto i = active_ranges.begin(); i != active_ranges.end();) {
if (i->second->IsDone()) {
i = active_ranges_.erase(i);
i = active_ranges.erase(i);
} else {
++i;
}
Expand Down Expand Up @@ -185,12 +212,12 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
ApplyRedirectErrors(read_object_spec_, proto_status);
auto request = google::storage::v2::BidiReadObjectRequest{};
*request.mutable_read_object_spec() = read_object_spec_;
for (auto const& kv : active_ranges_) {
for (auto const& kv : streams_.back().active_ranges) {
auto range = kv.second->RangeForResume(kv.first);
if (!range) continue;
*request.add_read_ranges() = *std::move(range);
}
write_pending_ = true;
streams_.back().write_pending = true;
lk.unlock();
make_stream_(std::move(request)).then([w = WeakFromThis()](auto f) {
if (auto self = w.lock()) self->OnResume(f.get());
Expand All @@ -200,7 +227,8 @@ void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) {
void ObjectDescriptorImpl::OnResume(StatusOr<OpenStreamResult> result) {
if (!result) return OnFinish(std::move(result).status());
std::unique_lock<std::mutex> lk(mu_);
stream_ = std::move(result->stream);
streams_.push_back(
Stream{std::move(result->stream), {}, resume_policy_prototype_->clone()});
// TODO(#15105) - this should be done without release the lock.
Flush(std::move(lk));
OnRead(std::move(result->first_response));
Expand All @@ -211,7 +239,6 @@ bool ObjectDescriptorImpl::IsResumable(
for (auto const& any : proto_status.details()) {
auto error = google::storage::v2::BidiReadObjectError{};
if (!any.UnpackTo(&error)) continue;
auto ranges = CopyActiveRanges();
for (auto const& range : CopyActiveRanges()) {
for (auto const& range_error : error.read_range_errors()) {
if (range.first != range_error.read_id()) continue;
Expand All @@ -221,8 +248,8 @@ bool ObjectDescriptorImpl::IsResumable(
CleanupDoneRanges(std::unique_lock<std::mutex>(mu_));
return true;
}

return resume_policy_->OnFinish(status) ==
std::unique_lock<std::mutex> lk(mu_);
return streams_.back().resume_policy->OnFinish(status) ==
storage_experimental::ResumePolicy::kContinue;
}

Expand Down
22 changes: 16 additions & 6 deletions google/cloud/storage/internal/async/object_descriptor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
class ObjectDescriptorImpl
: public storage_experimental::ObjectDescriptorConnection,
public std::enable_shared_from_this<ObjectDescriptorImpl> {
private:
struct Stream {
std::shared_ptr<OpenStream> stream;
std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges;
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy;
bool write_pending = false;
};

public:
ObjectDescriptorImpl(
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy,
Expand All @@ -62,6 +70,8 @@ class ObjectDescriptorImpl
std::unique_ptr<storage_experimental::AsyncReaderConnection> Read(
ReadParams p) override;

void MakeSubsequentStream() override;

private:
std::weak_ptr<ObjectDescriptorImpl> WeakFromThis() {
return shared_from_this();
Expand All @@ -70,14 +80,16 @@ class ObjectDescriptorImpl
// This may seem expensive, but it is less bug-prone than iterating over
// the map with the lock held.
auto CopyActiveRanges(std::unique_lock<std::mutex> const&) const {
return active_ranges_;
return streams_.back().active_ranges;
}

auto CopyActiveRanges() const {
return CopyActiveRanges(std::unique_lock<std::mutex>(mu_));
}

auto CurrentStream(std::unique_lock<std::mutex>) const { return stream_; }
auto CurrentStream(std::unique_lock<std::mutex>) const {
return streams_.back().stream;
}

void Flush(std::unique_lock<std::mutex> lk);
void OnWrite(bool ok);
Expand All @@ -92,19 +104,17 @@ class ObjectDescriptorImpl
bool IsResumable(Status const& status,
google::rpc::Status const& proto_status);

std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_;
std::unique_ptr<storage_experimental::ResumePolicy> resume_policy_prototype_;
OpenStreamFactory make_stream_;

mutable std::mutex mu_;
google::storage::v2::BidiReadObjectSpec read_object_spec_;
std::shared_ptr<OpenStream> stream_;
absl::optional<google::storage::v2::Object> metadata_;
std::int64_t read_id_generator_ = 0;
bool write_pending_ = false;
google::storage::v2::BidiReadObjectRequest next_request_;

std::unordered_map<std::int64_t, std::shared_ptr<ReadRange>> active_ranges_;
Options options_;
std::vector<Stream> streams_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
Loading
Loading