Skip to content

Commit 530d2ea

Browse files
authored
emulator: Implement SampleRowKeys.
This implements the SampleRowKeys bigtable RPC. This change (when combined with the MutateRows PR in an integration branch) fixes the bigtable_table_sample_rows_integration_test tests which were all failing, and thus fixes TBL-60. Care has been taken to ensure that the RPC can return any number of sample rows using constant memory. TESTED=bigtable_table_sample_rows_integration_test now passes Fixes: TBL-60 References: TBL-60
1 parent f5e14cf commit 530d2ea

4 files changed

Lines changed: 169 additions & 10 deletions

File tree

google/cloud/bigtable/emulator/column_family.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <google/bigtable/v2/types.pb.h>
3131
#include <absl/strings/str_format.h>
3232
#include <chrono>
33+
#include <cstddef>
3334
#include <cstdint>
3435
#include <map>
3536
#include <memory>
@@ -374,6 +375,8 @@ class ColumnFamily {
374375
return rows_.upper_bound(row_key);
375376
}
376377

378+
std::size_t size() { return rows_.size(); }
379+
377380
std::map<std::string, ColumnFamilyRow>::iterator find(
378381
std::string const& row_key) {
379382
return rows_.find(row_key);

google/cloud/bigtable/emulator/server.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <grpcpp/server_builder.h>
2828
#include <grpcpp/support/client_callback.h>
2929
#include <grpcpp/support/status.h>
30+
#include <cstddef>
3031
#include <cstdint>
3132

3233
namespace google {
@@ -56,10 +57,16 @@ class EmulatorService final : public btproto::Bigtable::Service {
5657

5758
grpc::Status SampleRowKeys(
5859
grpc::ServerContext* /* context */,
59-
btproto::SampleRowKeysRequest const* /* request */,
60-
grpc::ServerWriter<btproto::SampleRowKeysResponse>* /* writer */)
61-
override {
62-
return grpc::Status::OK;
60+
btproto::SampleRowKeysRequest const* request,
61+
grpc::ServerWriter<btproto::SampleRowKeysResponse>* writer) override {
62+
auto maybe_table = cluster_->FindTable(request->table_name());
63+
if (!maybe_table) {
64+
return ToGrpcStatus(maybe_table.status());
65+
}
66+
67+
auto& table = maybe_table.value();
68+
69+
return ToGrpcStatus(table->SampleRowKeys(0.0001, writer));
6370
}
6471

6572
grpc::Status MutateRow(grpc::ServerContext* /* context */,

google/cloud/bigtable/emulator/table.cc

Lines changed: 146 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,42 @@
1616
#include "google/cloud/bigtable/emulator/column_family.h"
1717
#include "google/cloud/bigtable/emulator/filter.h"
1818
#include "google/cloud/bigtable/emulator/range_set.h"
19-
#include "google/cloud/bigtable/internal/google_bytes_traits.h"
19+
#include "google/cloud/bigtable/emulator/row_streamer.h"
2020
#include "google/cloud/internal/big_endian.h"
2121
#include "google/cloud/internal/make_status.h"
2222
#include "google/cloud/status.h"
23+
#include "google/cloud/status_or.h"
2324
#include "google/protobuf/util/field_mask_util.h"
24-
#include <grpc/grpc_security_constants.h>
25+
#include <google/bigtable/admin/v2/bigtable_table_admin.pb.h>
26+
#include <google/bigtable/admin/v2/table.pb.h>
2527
#include <google/bigtable/admin/v2/types.pb.h>
2628
#include <google/bigtable/v2/bigtable.pb.h>
2729
#include <google/bigtable/v2/data.pb.h>
30+
#include <google/protobuf/field_mask.pb.h>
2831
#include <absl/strings/match.h>
2932
#include <absl/strings/str_format.h>
3033
#include <absl/types/optional.h>
34+
#include <absl/types/variant.h>
35+
#include <grpcpp/support/sync_stream.h>
3136
#include <re2/re2.h>
37+
#include <cassert>
3238
#include <chrono>
39+
#include <climits>
40+
#include <cmath>
41+
#include <cstddef>
3342
#include <cstdint>
3443
#include <cstdlib>
44+
#include <functional>
45+
#include <iostream>
46+
#include <map>
3547
#include <memory>
3648
#include <mutex>
37-
#include <optional>
49+
#include <ostream>
50+
#include <stack>
3851
#include <string>
3952
#include <type_traits>
53+
#include <utility>
54+
#include <vector>
4055

4156
namespace google {
4257
namespace cloud {
@@ -602,6 +617,134 @@ bool Table::IsDeleteProtectedNoLock() const {
602617
return schema_.deletion_protection();
603618
}
604619

620+
Status Table::SampleRowKeys(
621+
double pass_probability,
622+
grpc::ServerWriter<google::bigtable::v2::SampleRowKeysResponse>* writer) {
623+
if (pass_probability <= 0.0) {
624+
return InvalidArgumentError(
625+
"The sampling probabality must be positive",
626+
GCP_ERROR_INFO().WithMetadata("provided sampling probability",
627+
absl::StrFormat("%f", pass_probability)));
628+
}
629+
630+
auto sample_every =
631+
static_cast<std::uint64_t>(std::ceil(1.0 / pass_probability));
632+
633+
std::lock_guard<std::mutex> lock(mu_);
634+
635+
// First, stream all rows and cells and compute the offsets.
636+
auto all_rows_set = std::make_shared<StringRangeSet>(StringRangeSet::All());
637+
auto maybe_all_rows_stream = CreateCellStream(all_rows_set, absl::nullopt);
638+
if (!maybe_all_rows_stream) {
639+
return maybe_all_rows_stream.status();
640+
}
641+
642+
auto& stream = *maybe_all_rows_stream;
643+
644+
absl::optional<std::string> first_row_key;
645+
// The first row read will be used as a constant estimate of row
646+
// sizes. If we are sampling 1/n rows, the value added to the offset
647+
// (which is to be regarded as the size of all the rows before the
648+
// sampled one) will be (n * row_size_estimate).
649+
//
650+
// That is every time a row is sampled, we do: offset += (n *
651+
// row_size_estimate).
652+
std::size_t row_size_estimate = 0;
653+
654+
for (; stream; ++stream) {
655+
if (first_row_key.has_value() &&
656+
stream->row_key() != first_row_key.value()) {
657+
break;
658+
}
659+
660+
first_row_key = stream->row_key();
661+
662+
row_size_estimate += stream->row_key().size();
663+
row_size_estimate += stream->column_qualifier().size();
664+
row_size_estimate += stream->value().size();
665+
row_size_estimate += sizeof(stream->timestamp());
666+
}
667+
668+
if (!first_row_key.has_value()) {
669+
// No rows in the table
670+
google::bigtable::v2::SampleRowKeysResponse resp;
671+
resp.set_row_key("");
672+
resp.set_offset_bytes(0);
673+
674+
auto opts = grpc::WriteOptions();
675+
opts.set_last_message();
676+
677+
writer->WriteLast(std::move(resp), opts);
678+
return Status();
679+
}
680+
681+
std::int64_t offset_delta = sample_every * row_size_estimate;
682+
683+
google::bigtable::v2::RowFilter sample_filter;
684+
sample_filter.set_row_sample_filter(pass_probability);
685+
686+
auto maybe_stream = CreateCellStream(all_rows_set, sample_filter);
687+
if (!maybe_stream) {
688+
return maybe_stream.status();
689+
}
690+
691+
auto& sampled_stream = *maybe_stream;
692+
693+
std::int64_t offset = 0;
694+
695+
bool wrote_a_sample;
696+
697+
for (; sampled_stream; sampled_stream.Next(NextMode::kRow)) {
698+
google::bigtable::v2::SampleRowKeysResponse resp;
699+
offset += offset_delta;
700+
resp.set_row_key(sampled_stream->row_key());
701+
resp.set_offset_bytes(offset);
702+
703+
writer->Write(std::move(resp));
704+
705+
wrote_a_sample = true;
706+
}
707+
708+
// Cloud bigtable client tests expect that, if they populated the
709+
// table with at least one row, then at least one row sample is
710+
// returned.
711+
//
712+
// In such a case, return any string that represents the last key,
713+
// and an offset that is the estimated row size * the number of rows
714+
// in the largest column family. We can return any string because
715+
// the keys returned need not be in the table. See the proto
716+
// specification.
717+
if (!wrote_a_sample) {
718+
std::size_t row_count_estimate = 0;
719+
720+
for (auto const& cf : *get()) {
721+
if (cf.second->size() > row_count_estimate) {
722+
row_count_estimate = cf.second->size();
723+
}
724+
}
725+
726+
std::int64_t this_offset = row_count_estimate * row_size_estimate;
727+
728+
google::bigtable::v2::SampleRowKeysResponse resp;
729+
resp.set_row_key("last_key");
730+
resp.set_offset_bytes(this_offset);
731+
writer->Write(std::move(resp));
732+
733+
offset += this_offset;
734+
}
735+
736+
google::bigtable::v2::SampleRowKeysResponse resp;
737+
resp.set_row_key("");
738+
// Client test code expects offset_bytes to be strictly
739+
// increasing.
740+
resp.set_offset_bytes(offset + 1);
741+
auto opts = grpc::WriteOptions();
742+
opts.set_last_message();
743+
writer->WriteLast(std::move(resp), opts);
744+
745+
return Status();
746+
}
747+
605748
Status Table::DropRowRange(
606749
::google::bigtable::admin::v2::DropRowRangeRequest const& request) {
607750
std::lock_guard<std::mutex> lock(mu_);

google/cloud/bigtable/emulator/table.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,22 @@
2323
#include "google/cloud/status_or.h"
2424
#include "absl/types/variant.h"
2525
#include "google/protobuf/repeated_ptr_field.h"
26-
#include <google/bigtable/admin/v2/bigtable_table_admin.grpc.pb.h>
26+
#include <google/bigtable/admin/v2/bigtable_table_admin.pb.h>
2727
#include <google/bigtable/admin/v2/table.pb.h>
28-
#include <google/bigtable/v2/bigtable.grpc.pb.h>
2928
#include <google/bigtable/v2/bigtable.pb.h>
3029
#include <google/bigtable/v2/data.pb.h>
3130
#include <google/protobuf/field_mask.pb.h>
32-
#include <google/protobuf/util/time_util.h>
3331
#include <absl/types/optional.h>
32+
#include <grpcpp/support/sync_stream.h>
3433
#include <chrono>
34+
#include <functional>
3535
#include <map>
3636
#include <memory>
3737
#include <mutex>
3838
#include <stack>
3939
#include <string>
40+
#include <utility>
41+
#include <vector>
4042

4143
namespace google {
4244
namespace cloud {
@@ -93,6 +95,10 @@ class Table : public std::enable_shared_from_this<Table> {
9395
return column_families_.find(column_family);
9496
}
9597

98+
Status SampleRowKeys(
99+
double pass_probability,
100+
grpc::ServerWriter<google::bigtable::v2::SampleRowKeysResponse>* writer);
101+
96102
std::shared_ptr<Table> get() { return shared_from_this(); }
97103

98104
Status DropRowRange(

0 commit comments

Comments
 (0)