Skip to content

Commit 7d7ce68

Browse files
committed
Merge branch 'upstream_emulator' into sample_row_keys
2 parents 94cdc27 + ab313de commit 7d7ce68

File tree

15 files changed

+1228
-221
lines changed

15 files changed

+1228
-221
lines changed

google/cloud/bigtable/emulator/column_family.cc

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,6 @@ namespace cloud {
2929
namespace bigtable {
3030
namespace emulator {
3131

32-
// FIXME: Workaround our current incorrect ordering of
33-
// timestamps. Remove when that is fixed and they are in decreasing
34-
// order, at which point we can just pick the first element.
35-
std::map<std::chrono::milliseconds, std::string>::iterator latest(
36-
std::map<std::chrono::milliseconds, std::string>& cells_not_empty) {
37-
assert(!cells_not_empty.empty());
38-
39-
auto first_it = cells_not_empty.begin();
40-
auto last_it = std::prev(cells_not_empty.end());
41-
auto latest_it = first_it->first >= last_it->first ? first_it : last_it;
42-
43-
return latest_it;
44-
}
45-
4632
StatusOr<ReadModifyWriteCellResult> ColumnRow::ReadModifyWrite(
4733
std::int64_t inc_value) {
4834
auto system_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
@@ -56,10 +42,7 @@ StatusOr<ReadModifyWriteCellResult> ColumnRow::ReadModifyWrite(
5642
absl::nullopt};
5743
}
5844

59-
// FIXME: Workaround our current incorrect ordering of
60-
// timestamps. Remove when that is fixed and they are in decreasing
61-
// order, at which point we can just pick the first element.
62-
auto latest_it = latest(cells_);
45+
auto latest_it = cells_.begin();
6346

6447
auto maybe_old_value =
6548
google::cloud::internal::DecodeBigEndian<std::int64_t>(latest_it->second);
@@ -97,10 +80,7 @@ ReadModifyWriteCellResult ColumnRow::ReadModifyWrite(
9780
absl::nullopt};
9881
}
9982

100-
// FIXME: Workaround our current incorrect ordering of
101-
// timestamps. Remove when that is fixed and they are in decreasing
102-
// order, at which point we can just pick the first element.
103-
auto latest_it = latest(cells_);
83+
auto latest_it = cells_.begin();
10484

10585
auto value = latest_it->second + append_value;
10686

@@ -159,14 +139,21 @@ StatusOr<absl::optional<std::string>> ColumnRow::UpdateCell(
159139
std::vector<Cell> ColumnRow::DeleteTimeRange(
160140
::google::bigtable::v2::TimestampRange const& time_range) {
161141
std::vector<Cell> deleted_cells;
162-
for (auto cell_it = cells_.lower_bound(
163-
std::chrono::duration_cast<std::chrono::milliseconds>(
164-
std::chrono::microseconds(time_range.start_timestamp_micros())));
142+
absl::optional<std::int64_t> maybe_end_micros =
143+
time_range.end_timestamp_micros();
144+
if (maybe_end_micros.value_or(0) == 0) {
145+
maybe_end_micros.reset();
146+
}
147+
for (auto cell_it =
148+
maybe_end_micros
149+
? upper_bound(
150+
std::chrono::duration_cast<std::chrono::milliseconds>(
151+
std::chrono::microseconds(*maybe_end_micros)))
152+
: begin();
165153
cell_it != cells_.end() &&
166-
(time_range.end_timestamp_micros() == 0 ||
167-
cell_it->first < std::chrono::duration_cast<std::chrono::milliseconds>(
154+
cell_it->first >= std::chrono::duration_cast<std::chrono::milliseconds>(
168155
std::chrono::microseconds(
169-
time_range.end_timestamp_micros())));) {
156+
time_range.start_timestamp_micros()));) {
170157
Cell cell = {std::move(cell_it->first), std::move(cell_it->second)};
171158
deleted_cells.emplace_back(std::move(cell));
172159
cells_.erase(cell_it++);
@@ -346,9 +333,9 @@ FilteredColumnFamilyStream::FilteredColumnFamilyStream(
346333
row_ranges_(std::move(row_set)),
347334
column_ranges_(StringRangeSet::All()),
348335
timestamp_ranges_(TimestampRangeSet::All()),
349-
rows_(RangeFilteredMapView<ColumnFamily, StringRangeSet>(column_family,
350-
*row_ranges_),
351-
std::cref(row_regexes_)) {}
336+
rows_(
337+
StringRangeFilteredMapView<ColumnFamily>(column_family, *row_ranges_),
338+
std::cref(row_regexes_)) {}
352339

353340
bool FilteredColumnFamilyStream::ApplyFilter(
354341
InternalFilter const& internal_filter) {
@@ -404,7 +391,7 @@ void FilteredColumnFamilyStream::InitializeIfNeeded() const {
404391

405392
bool FilteredColumnFamilyStream::PointToFirstCellAfterColumnChange() const {
406393
for (; column_it_.value() != columns_.value().end(); ++(column_it_.value())) {
407-
cells_ = RangeFilteredMapView<ColumnRow, TimestampRangeSet>(
394+
cells_ = TimestampRangeFilteredMapView<ColumnRow>(
408395
column_it_.value()->second, timestamp_ranges_);
409396
cell_it_ = cells_.value().begin();
410397
if (cell_it_.value() != cells_.value().end()) {
@@ -416,10 +403,9 @@ bool FilteredColumnFamilyStream::PointToFirstCellAfterColumnChange() const {
416403

417404
bool FilteredColumnFamilyStream::PointToFirstCellAfterRowChange() const {
418405
for (; (*row_it_) != rows_.end(); ++(*row_it_)) {
419-
columns_ = RegexFiteredMapView<
420-
RangeFilteredMapView<ColumnFamilyRow, StringRangeSet>>(
421-
RangeFilteredMapView<ColumnFamilyRow, StringRangeSet>(
422-
(*row_it_)->second, column_ranges_),
406+
columns_ = RegexFiteredMapView<StringRangeFilteredMapView<ColumnFamilyRow>>(
407+
StringRangeFilteredMapView<ColumnFamilyRow>((*row_it_)->second,
408+
column_ranges_),
423409
column_regexes_);
424410
column_it_ = columns_.value().begin();
425411
if (PointToFirstCellAfterColumnChange()) {

google/cloud/bigtable/emulator/column_family.h

Lines changed: 22 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ class ColumnRow {
8585
* Insert or update and existing cell at a given timestamp.
8686
*
8787
* @param timestamp the time stamp at which the value will be inserted or
88-
* updated. If it equals zero then number of milliseconds since epoch will
89-
* be used instead.
88+
* updated.
9089
* @param value the value to insert/update.
9190
*
9291
* @return no value if the timestamp had no value before, otherwise
@@ -121,32 +120,28 @@ class ColumnRow {
121120
absl::optional<Cell> DeleteTimeStamp(std::chrono::milliseconds timestamp);
122121

123122
bool HasCells() const { return !cells_.empty(); }
124-
using const_iterator =
125-
std::map<std::chrono::milliseconds, std::string>::const_iterator;
126-
using iterator = std::map<std::chrono::milliseconds, std::string>::iterator;
123+
124+
using const_iterator = std::map<std::chrono::milliseconds, std::string,
125+
std::greater<>>::const_iterator;
126+
127127
const_iterator begin() const { return cells_.begin(); }
128128
const_iterator end() const { return cells_.end(); }
129-
iterator begin() { return cells_.begin(); }
130-
iterator end() { return cells_.end(); }
131129
const_iterator lower_bound(std::chrono::milliseconds timestamp) const {
132130
return cells_.lower_bound(timestamp);
133131
}
134132
const_iterator upper_bound(std::chrono::milliseconds timestamp) const {
135133
return cells_.upper_bound(timestamp);
136134
}
137135

138-
std::map<std::chrono::milliseconds, std::string>::iterator find(
139-
std::chrono::milliseconds const& timestamp) {
136+
const_iterator find(std::chrono::milliseconds const& timestamp) {
140137
return cells_.find(timestamp);
141138
}
142139

143-
void erase(
144-
std::map<std::chrono::milliseconds, std::string>::iterator timestamp_it) {
145-
cells_.erase(timestamp_it);
146-
}
140+
void erase(const_iterator timestamp_it) { cells_.erase(timestamp_it); }
147141

148142
private:
149-
std::map<std::chrono::milliseconds, std::string> cells_;
143+
// Note the order - the iterator return the freshest cells first.
144+
std::map<std::chrono::milliseconds, std::string, std::greater<>> cells_;
150145
};
151146

152147
/**
@@ -174,8 +169,7 @@ class ColumnFamilyRow {
174169
*
175170
* @param column_qualifier the column qualifier at which to update the value.
176171
* @param timestamp the time stamp at which the value will be inserted or
177-
* updated. If it equals zero then number of milliseconds since epoch will
178-
* be used instead.
172+
* updated.
179173
* @param value the value to insert/update.
180174
*
181175
* @return no value if the timestamp had no value before, otherwise
@@ -288,8 +282,7 @@ class ColumnFamily {
288282
* @param row_key the row key at which to update the value.
289283
* @param column_qualifier the column qualifier at which to update the value.
290284
* @param timestamp the time stamp at which the value will be inserted or
291-
* updated. If it equals zero then number of milliseconds since epoch will
292-
* be used instead.
285+
* updated.
293286
* @param value the value to insert/update.
294287
*
295288
* @return no value if the timestamp had no value before, otherwise
@@ -432,8 +425,8 @@ class ColumnFamily {
432425
if (!existing_int) {
433426
return existing_int.status();
434427
}
435-
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
436-
std::move(new_value));
428+
auto new_int =
429+
google::cloud::internal::DecodeBigEndian<std::int64_t>(new_value);
437430
if (!new_int) {
438431
return new_int.status();
439432
}
@@ -452,8 +445,8 @@ class ColumnFamily {
452445
if (!existing_int) {
453446
return existing_int.status();
454447
}
455-
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
456-
std::move(new_value));
448+
auto new_int =
449+
google::cloud::internal::DecodeBigEndian<std::int64_t>(new_value);
457450
if (!new_int) {
458451
return new_int.status();
459452
}
@@ -540,25 +533,24 @@ class FilteredColumnFamilyStream : public AbstractCellStreamImpl {
540533
std::vector<std::shared_ptr<re2::RE2 const>> column_regexes_;
541534
mutable TimestampRangeSet timestamp_ranges_;
542535

543-
RegexFiteredMapView<RangeFilteredMapView<ColumnFamily, StringRangeSet>> rows_;
544-
mutable absl::optional<RegexFiteredMapView<
545-
RangeFilteredMapView<ColumnFamilyRow, StringRangeSet>>>
536+
RegexFiteredMapView<StringRangeFilteredMapView<ColumnFamily>> rows_;
537+
mutable absl::optional<
538+
RegexFiteredMapView<StringRangeFilteredMapView<ColumnFamilyRow>>>
546539
columns_;
547-
mutable absl::optional<RangeFilteredMapView<ColumnRow, TimestampRangeSet>>
548-
cells_;
540+
mutable absl::optional<TimestampRangeFilteredMapView<ColumnRow>> cells_;
549541

550542
// If row_it_ == rows_.end() we've reached the end.
551543
// We maintain the following invariant:
552544
// if (row_it_ != rows_.end()) then
553545
// cell_it_ != cells.end() && column_it_ != columns_.end().
554546
mutable absl::optional<RegexFiteredMapView<
555-
RangeFilteredMapView<ColumnFamily, StringRangeSet>>::const_iterator>
547+
StringRangeFilteredMapView<ColumnFamily>>::const_iterator>
556548
row_it_;
557549
mutable absl::optional<RegexFiteredMapView<
558-
RangeFilteredMapView<ColumnFamilyRow, StringRangeSet>>::const_iterator>
550+
StringRangeFilteredMapView<ColumnFamilyRow>>::const_iterator>
559551
column_it_;
560552
mutable absl::optional<
561-
RangeFilteredMapView<ColumnRow, TimestampRangeSet>::const_iterator>
553+
TimestampRangeFilteredMapView<ColumnRow>::const_iterator>
562554
cell_it_;
563555
mutable absl::optional<CellView> cur_value_;
564556
mutable bool initialized_{false};

google/cloud/bigtable/emulator/column_family_test.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/emulator/column_family.h"
16-
#include "google/cloud/bigtable/row_range.h"
1716
#include "google/cloud/testing_util/chrono_literals.h"
18-
#include "google/cloud/testing_util/is_proto_equal.h"
1917
#include <google/protobuf/text_format.h>
2018
#include <gmock/gmock.h>
2119
#include <array>
@@ -69,8 +67,14 @@ TEST(ColumnRow, Trivial) {
6967

7068
col_row.SetCell(0_ms, "baz");
7169
col_row.SetCell(20_ms, "qux");
70+
EXPECT_EQ("qux", col_row.lower_bound(30_ms)->second);
71+
EXPECT_EQ("qux", col_row.lower_bound(20_ms)->second);
7272
EXPECT_EQ("bar", col_row.lower_bound(10_ms)->second);
73-
EXPECT_EQ("qux", col_row.upper_bound(10_ms)->second);
73+
EXPECT_EQ("baz", col_row.lower_bound(0_ms)->second);
74+
EXPECT_EQ("qux", col_row.upper_bound(30_ms)->second);
75+
EXPECT_EQ("bar", col_row.upper_bound(20_ms)->second);
76+
EXPECT_EQ("baz", col_row.upper_bound(10_ms)->second);
77+
EXPECT_EQ(col_row.end(), col_row.upper_bound(0_ms));
7478
}
7579

7680
TEST(ColumnRow, DeleteTimeRangeFinite) {
@@ -225,14 +229,14 @@ TEST(FilteredColumnFamilyStream, Unfiltered) {
225229
FilteredColumnFamilyStream filtered_stream(fam, "cf1", included_rows);
226230
EXPECT_EQ(R"""(
227231
row0 cf1:col0 @10ms: foo
228-
row0 cf1:col1 @20ms: bar
229232
row0 cf1:col1 @30ms: baz
233+
row0 cf1:col1 @20ms: bar
230234
row1 cf1:col0 @10ms: foo
231-
row1 cf1:col1 @20ms: foo
232235
row1 cf1:col1 @30ms: foo
236+
row1 cf1:col1 @20ms: foo
233237
row2 cf1:col0 @10ms: qux
234-
row2 cf1:col2 @40ms: qux
235238
row2 cf1:col2 @50ms: qux
239+
row2 cf1:col2 @40ms: qux
236240
)""",
237241
"\n" + DumpFilteredColumnFamilyStream(filtered_stream));
238242
}
@@ -272,12 +276,12 @@ TEST(FilteredColumnFamilyStream, FilterByTimestampRange) {
272276
TimestampRange{TimestampRangeSet::Range(100_ms, 200_ms)});
273277
EXPECT_EQ(R"""(
274278
row0 cf1:col0 @100ms: foo
275-
row0 cf1:col2 @100ms: foo
276-
row0 cf1:col2 @120ms: foo
277279
row0 cf1:col2 @140ms: foo
278-
row1 cf1:col2 @100ms: foo
279-
row1 cf1:col2 @120ms: foo
280+
row0 cf1:col2 @120ms: foo
281+
row0 cf1:col2 @100ms: foo
280282
row1 cf1:col2 @140ms: foo
283+
row1 cf1:col2 @120ms: foo
284+
row1 cf1:col2 @100ms: foo
281285
)""",
282286
"\n" + DumpFilteredColumnFamilyStream(filtered_stream));
283287
}

google/cloud/bigtable/emulator/drop_row_range_test.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/emulator/column_family.h"
16-
#include "google/cloud/bigtable/emulator/row_streamer.h"
1716
#include "google/cloud/bigtable/emulator/table.h"
1817
#include "google/cloud/internal/make_status.h"
1918
#include "google/cloud/status.h"
2019
#include "google/cloud/status_or.h"
2120
#include "google/cloud/testing_util/status_matchers.h"
22-
#include "gmock/gmock.h"
2321
#include <google/bigtable/admin/v2/bigtable_table_admin.grpc.pb.h>
2422
#include <google/bigtable/admin/v2/bigtable_table_admin.pb.h>
2523
#include <google/bigtable/admin/v2/table.pb.h>

google/cloud/bigtable/emulator/filter.cc

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "google/cloud/internal/make_status.h"
2020
#include "google/cloud/status_or.h"
2121
#include <re2/re2.h>
22-
#include <queue>
2322
#include <random>
2423

2524
namespace google {
@@ -325,8 +324,8 @@ class TrivialFilter : public AbstractCellStreamImpl {
325324
* @param filter_filter a functor which given an `InternalFilter` decides
326325
* whether filtering this cell stream's results and then applying the
327326
* `InternalFilter` would yield the same results as applying
328-
* `InternalFilter` to the underlying stream and the perform this stream's
329-
* filtering.
327+
* `InternalFilter` to the underlying stream and then performing this
328+
* stream's filtering.
330329
*/
331330
template <typename Filter>
332331
CellStream MakeTrivialFilter(
@@ -539,14 +538,14 @@ class ConditionStream : public AbstractCellStreamImpl {
539538
if (condition_true_) {
540539
true_stream_.Next(mode);
541540
if (!true_stream_ ||
542-
!internal::CompareRowKey(current_row_, true_stream_->row_key())) {
541+
internal::CompareRowKey(current_row_, true_stream_->row_key()) != 0) {
543542
source_.Next(NextMode::kRow);
544543
OnNewRow();
545544
}
546545
} else {
547546
false_stream_.Next(mode);
548-
if (!false_stream_ ||
549-
!internal::CompareRowKey(current_row_, false_stream_->row_key())) {
547+
if (!false_stream_ || internal::CompareRowKey(
548+
current_row_, false_stream_->row_key()) != 0) {
550549
source_.Next(NextMode::kRow);
551550
OnNewRow();
552551
}
@@ -870,7 +869,7 @@ StatusOr<CellStreamConstructor> CreateFilterImpl(
870869
if (per_row_state-- <= 0) {
871870
return {};
872871
}
873-
return NextMode::kRow;
872+
return NextMode::kCell;
874873
},
875874
[cells_per_row_offset]() { return cells_per_row_offset; },
876875
[](InternalFilter const& internal_filter) {
@@ -971,10 +970,14 @@ StatusOr<CellStreamConstructor> CreateFilterImpl(
971970
}
972971
CellStreamConstructor res = [source_ctor = std::move(source_ctor)] {
973972
auto source = source_ctor();
974-
return MakeTrivialTransformer(std::move(source), [](CellView cell_view) {
975-
cell_view.SetValue("");
976-
return cell_view;
977-
});
973+
// We need to ensure that the value outlives the reference.
974+
std::string const stripped_value;
975+
return MakeTrivialTransformer(
976+
std::move(source),
977+
[stripped_value = std::move(stripped_value)](CellView cell_view) {
978+
cell_view.SetValue(stripped_value);
979+
return cell_view;
980+
});
978981
};
979982
return res;
980983
}
@@ -1046,6 +1049,16 @@ StatusOr<CellStreamConstructor> CreateFilterImpl(
10461049
"`condition` must have a `predicate_filter` set.",
10471050
GCP_ERROR_INFO().WithMetadata("filter", filter.DebugString()));
10481051
}
1052+
if (!filter.condition().has_true_filter() &&
1053+
!filter.condition().has_false_filter()) {
1054+
return InvalidArgumentError(
1055+
"`condition` must have `true_filter` or `false_filter` set.",
1056+
GCP_ERROR_INFO().WithMetadata("filter", filter.DebugString()));
1057+
}
1058+
// FIXME: validate that `sink` is not present in condition's predicate.
1059+
// Expected error:
1060+
// INVALID_ARGUMENT: Error in field 'condition filter predicate' : sink
1061+
// cannot be nested in a condition filter
10491062

10501063
auto maybe_predicate_stream_ctor = CreateFilterImpl(
10511064
filter.condition().predicate_filter(), source_ctor, direct_sinks);

0 commit comments

Comments
 (0)