Skip to content

Commit d667401

Browse files
committed
Merge branch 'sample_row_keys' into for_google
2 parents 8ac5df7 + 7d7ce68 commit d667401

File tree

4 files changed

+137
-4
lines changed

4 files changed

+137
-4
lines changed

google/cloud/bigtable/emulator/column_family.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <google/bigtable/v2/types.pb.h>
3131
#include <absl/strings/str_format.h>
3232
#include <chrono>
33+
#include <cstddef>
3334
#include <cstdint>
3435
#include <map>
3536
#include <memory>

google/cloud/bigtable/emulator/server.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <grpcpp/server_builder.h>
2828
#include <grpcpp/support/client_callback.h>
2929
#include <grpcpp/support/status.h>
30+
#include <cstddef>
3031
#include <cstdint>
3132

3233
namespace google {
@@ -56,10 +57,16 @@ class EmulatorService final : public btproto::Bigtable::Service {
5657

5758
grpc::Status SampleRowKeys(
5859
grpc::ServerContext* /* context */,
59-
btproto::SampleRowKeysRequest const* /* request */,
60-
grpc::ServerWriter<btproto::SampleRowKeysResponse>* /* writer */)
61-
override {
62-
return grpc::Status::OK;
60+
btproto::SampleRowKeysRequest const* request,
61+
grpc::ServerWriter<btproto::SampleRowKeysResponse>* writer) override {
62+
auto maybe_table = cluster_->FindTable(request->table_name());
63+
if (!maybe_table) {
64+
return ToGrpcStatus(maybe_table.status());
65+
}
66+
67+
auto& table = maybe_table.value();
68+
69+
return ToGrpcStatus(table->SampleRowKeys(0.0001, writer));
6370
}
6471

6572
grpc::Status MutateRow(grpc::ServerContext* /* context */,

google/cloud/bigtable/emulator/table.cc

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,18 @@
2828
#include <absl/strings/match.h>
2929
#include <absl/strings/str_format.h>
3030
#include <absl/types/optional.h>
31+
#include <grpcpp/support/sync_stream.h>
3132
#include <re2/re2.h>
3233
#include <chrono>
34+
#include <climits>
35+
#include <cstddef>
3336
#include <cstdint>
3437
#include <cstdlib>
38+
#include <exception>
3539
#include <memory>
3640
#include <mutex>
41+
#include <random>
42+
#include <ratio>
3743
#include <optional>
3844
#include <string>
3945
#include <type_traits>
@@ -593,6 +599,119 @@ bool Table::IsDeleteProtectedNoLock() const {
593599
return schema_.deletion_protection();
594600
}
595601

602+
Status Table::SampleRowKeys(
603+
double pass_probability,
604+
grpc::ServerWriter<google::bigtable::v2::SampleRowKeysResponse>* writer) {
605+
std::lock_guard<std::mutex> lock(mu_);
606+
607+
// First, stream all rows and cells and compute the offsets.
608+
auto all_rows_set = std::make_shared<StringRangeSet>(StringRangeSet::All());
609+
auto maybe_all_rows_steam = CreateCellStream(all_rows_set, absl::nullopt);
610+
if (!maybe_all_rows_steam) {
611+
return maybe_all_rows_steam.status();
612+
}
613+
614+
auto& stream = *maybe_all_rows_steam;
615+
616+
std::map<std::string, std::size_t> row_offset_map;
617+
size_t row_offset = 0;
618+
619+
std::string current_row_key;
620+
bool first_row = true;
621+
622+
std::map<std::string, std::size_t> column_family_size_map;
623+
std::map<std::string, std::size_t> column_qualifier_size_map;
624+
size_t timestamp_total_row_size = 0;
625+
size_t value_total_row_size = 0;
626+
627+
for (; stream; ++stream) {
628+
auto row_key = stream->row_key();
629+
630+
if ((row_key != current_row_key) || first_row) {
631+
row_offset += current_row_key.size();
632+
633+
for (auto const& cf : column_family_size_map) {
634+
row_offset += cf.second;
635+
}
636+
637+
for (auto const& cq : column_qualifier_size_map) {
638+
row_offset += cq.second;
639+
}
640+
641+
row_offset += timestamp_total_row_size;
642+
row_offset += value_total_row_size;
643+
644+
// The rows before this (row_key) have this size in total.
645+
row_offset_map[row_key] = row_offset;
646+
647+
current_row_key = row_key;
648+
649+
first_row = false;
650+
651+
column_family_size_map.clear();
652+
column_qualifier_size_map.clear();
653+
timestamp_total_row_size = 0;
654+
value_total_row_size = 0;
655+
}
656+
657+
column_family_size_map.emplace(stream->column_family(),
658+
stream->column_family().size());
659+
column_qualifier_size_map.emplace(stream->column_qualifier(),
660+
stream->column_qualifier().size());
661+
timestamp_total_row_size += sizeof(stream->timestamp());
662+
value_total_row_size += stream->value().size();
663+
}
664+
665+
google::bigtable::v2::RowFilter sample_filter;
666+
sample_filter.set_row_sample_filter(pass_probability);
667+
668+
auto maybe_stream = CreateCellStream(all_rows_set, sample_filter);
669+
if (!maybe_stream) {
670+
return maybe_stream.status();
671+
}
672+
673+
auto& sampled_stream = *maybe_stream;
674+
675+
bool wrote_a_sample = false;
676+
677+
for (; sampled_stream; ++sampled_stream) {
678+
google::bigtable::v2::SampleRowKeysResponse resp;
679+
resp.set_row_key(sampled_stream->row_key());
680+
resp.set_offset_bytes(row_offset_map[sampled_stream->row_key()]);
681+
682+
writer->Write(std::move(resp));
683+
684+
wrote_a_sample = true;
685+
}
686+
687+
// Cloud bigtable client tests expect that, if they populated the
688+
// table with at least one row, then at least one row sampele is
689+
// returned.
690+
//
691+
// In such a case, return the last row key.
692+
if (!wrote_a_sample && !row_offset_map.empty()) {
693+
auto it = std::prev(row_offset_map.end());
694+
695+
google::bigtable::v2::SampleRowKeysResponse resp;
696+
resp.set_row_key(it->first);
697+
resp.set_offset_bytes(it->second);
698+
}
699+
700+
// Client code expects the last response to be an empty row key
701+
// and moreover it also expects the offset for the last response
702+
// to be more than every other offset.
703+
google::bigtable::v2::SampleRowKeysResponse resp;
704+
resp.set_row_key("");
705+
// Client test code expects offset_bytes to be strictly
706+
// increasing.
707+
resp.set_offset_bytes(row_offset + 1);
708+
auto opts = grpc::WriteOptions();
709+
opts.set_last_message();
710+
writer->WriteLast(std::move(resp), opts);
711+
712+
return Status();
713+
}
714+
596715
Status Table::DropRowRange(
597716
::google::bigtable::admin::v2::DropRowRangeRequest const& request) {
598717
std::lock_guard<std::mutex> lock(mu_);

google/cloud/bigtable/emulator/table.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@
3232
#include <google/protobuf/util/time_util.h>
3333
#include <absl/types/optional.h>
3434
#include <chrono>
35+
#include <cstddef>
3536
#include <map>
3637
#include <memory>
3738
#include <mutex>
3839
#include <stack>
3940
#include <string>
41+
#include <utility>
4042

4143
namespace google {
4244
namespace cloud {
@@ -93,6 +95,10 @@ class Table : public std::enable_shared_from_this<Table> {
9395
return column_families_.find(column_family);
9496
}
9597

98+
Status SampleRowKeys(
99+
double pass_probability,
100+
grpc::ServerWriter<google::bigtable::v2::SampleRowKeysResponse>* writer);
101+
96102
std::shared_ptr<Table> get() { return shared_from_this(); }
97103

98104
Status DropRowRange(

0 commit comments

Comments
 (0)