Skip to content

Commit e2155d9

Browse files
authored
emulator: Complex Types: Implement AddToCell mutation
This implements the AddToCell mutation. * Unit tested and tests passing * Supports atomic rollback * Support Sum, Min and Max aggregations for non-negative Int64 numbers * It is now possible to create an aggregate column family (supporting AddToCell or MergeToCell)
1 parent f8d303b commit e2155d9

6 files changed

Lines changed: 657 additions & 55 deletions

File tree

google/cloud/bigtable/emulator/column_family.cc

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414

1515
#include "google/cloud/bigtable/emulator/column_family.h"
1616
#include "google/cloud/internal/big_endian.h"
17+
#include <google/bigtable/v2/types.pb.h>
1718
#include <absl/types/optional.h>
19+
#include <array>
1820
#include <chrono>
1921
#include <cstdint>
22+
#include <iterator>
2023
#include <map>
24+
#include <string>
25+
#include <utility>
2126

2227
namespace google {
2328
namespace cloud {
@@ -118,9 +123,32 @@ ReadModifyWriteCellResult ColumnRow::ReadModifyWrite(
118123
absl::optional<std::string> ColumnRow::SetCell(
119124
std::chrono::milliseconds timestamp, std::string const& value) {
120125
absl::optional<std::string> ret = absl::nullopt;
126+
127+
auto cell_it = cells_.find(timestamp);
128+
if (!(cell_it == cells_.end())) {
129+
ret = std::move(cell_it->second);
130+
}
131+
132+
cells_[timestamp] = value;
133+
134+
return ret;
135+
}
136+
137+
StatusOr<absl::optional<std::string>> ColumnRow::UpdateCell(
138+
std::chrono::milliseconds timestamp, std::string& value,
139+
std::function<StatusOr<std::string>(std::string const&,
140+
std::string&&)> const& update_fn) {
141+
absl::optional<std::string> ret = absl::nullopt;
142+
121143
auto cell_it = cells_.find(timestamp);
122144
if (!(cell_it == cells_.end())) {
145+
auto maybe_update_value = update_fn(cell_it->second, std::move(value));
146+
if (!maybe_update_value) {
147+
return maybe_update_value.status();
148+
}
123149
ret = std::move(cell_it->second);
150+
maybe_update_value.value().swap(cell_it->second);
151+
return ret;
124152
}
125153

126154
cells_[timestamp] = value;
@@ -166,6 +194,15 @@ absl::optional<std::string> ColumnFamilyRow::SetCell(
166194
return columns_[column_qualifier].SetCell(timestamp, value);
167195
}
168196

197+
StatusOr<absl::optional<std::string>> ColumnFamilyRow::UpdateCell(
198+
std::string const& column_qualifier, std::chrono::milliseconds timestamp,
199+
std::string& value,
200+
std::function<StatusOr<std::string>(std::string const&,
201+
std::string&&)> const& update_fn) {
202+
return columns_[column_qualifier].UpdateCell(timestamp, value,
203+
std::move(update_fn));
204+
}
205+
169206
std::vector<Cell> ColumnFamilyRow::DeleteColumn(
170207
std::string const& column_qualifier,
171208
::google::bigtable::v2::TimestampRange const& time_range) {
@@ -201,6 +238,13 @@ absl::optional<std::string> ColumnFamily::SetCell(
201238
return rows_[row_key].SetCell(column_qualifier, timestamp, value);
202239
}
203240

241+
StatusOr<absl::optional<std::string>> ColumnFamily::UpdateCell(
242+
std::string const& row_key, std::string const& column_qualifier,
243+
std::chrono::milliseconds timestamp, std::string& value) {
244+
return rows_[row_key].UpdateCell(column_qualifier, timestamp, value,
245+
update_cell_);
246+
}
247+
204248
std::map<std::string, std::vector<Cell>> ColumnFamily::DeleteRow(
205249
std::string const& row_key) {
206250
std::map<std::string, std::vector<Cell>> res;
@@ -215,11 +259,11 @@ std::map<std::string, std::vector<Cell>> ColumnFamily::DeleteRow(
215259
::google::bigtable::v2::TimestampRange time_range;
216260
auto deleted_cells = column.second.DeleteTimeRange(time_range);
217261
if (!deleted_cells.empty()) {
218-
res[std::move(column.first)] = std::move(deleted_cells);
262+
res[column.first] = std::move(deleted_cells);
219263
}
220264
}
221265

222-
rows_.erase(row_key);
266+
rows_.erase(row_it);
223267

224268
return res;
225269
}
@@ -385,6 +429,41 @@ bool FilteredColumnFamilyStream::PointToFirstCellAfterRowChange() const {
385429
return false;
386430
}
387431

432+
StatusOr<std::shared_ptr<ColumnFamily>>
433+
ColumnFamily::ConstructAggregateColumnFamily(
434+
google::bigtable::admin::v2::Type value_type) {
435+
auto cf = std::make_shared<ColumnFamily>();
436+
437+
if (value_type.has_aggregate_type()) {
438+
auto const& aggregate_type = value_type.aggregate_type();
439+
switch (aggregate_type.aggregator_case()) {
440+
case google::bigtable::admin::v2::Type::Aggregate::kSum:
441+
cf->update_cell_ = cf->SumUpdateCellBEInt64;
442+
break;
443+
case google::bigtable::admin::v2::Type::Aggregate::kMin:
444+
cf->update_cell_ = cf->MinUpdateCellBEInt64;
445+
break;
446+
case google::bigtable::admin::v2::Type::Aggregate::kMax:
447+
cf->update_cell_ = cf->MaxUpdateCellBEInt64;
448+
break;
449+
default:
450+
return InvalidArgumentError(
451+
"unsupported aggregation type",
452+
GCP_ERROR_INFO().WithMetadata(
453+
"aggregation case",
454+
absl::StrFormat("%d", aggregate_type.aggregator_case())));
455+
}
456+
457+
cf->value_type_ = std::move(value_type);
458+
459+
return cf;
460+
}
461+
462+
return InvalidArgumentError(
463+
"no aggregate type set in the supplied value_type",
464+
GCP_ERROR_INFO().WithMetadata("supplied value type",
465+
value_type.DebugString()));
466+
}
388467
} // namespace emulator
389468
} // namespace bigtable
390469
} // namespace cloud

google/cloud/bigtable/emulator/column_family.h

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_EMULATOR_COLUMN_FAMILY_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_EMULATOR_COLUMN_FAMILY_H
1717

18+
#include "google/cloud/bigtable/cell.h"
1819
#include "google/cloud/bigtable/emulator/cell_view.h"
1920
#include "google/cloud/bigtable/emulator/filter.h"
2021
#include "google/cloud/bigtable/emulator/filtered_map.h"
2122
#include "google/cloud/bigtable/emulator/range_set.h"
2223
#include "google/cloud/bigtable/read_modify_write_rule.h"
2324
#include "google/cloud/internal/big_endian.h"
2425
#include "google/cloud/internal/make_status.h"
26+
#include "google/cloud/status_or.h"
2527
#include "absl/types/optional.h"
2628
#include <google/bigtable/admin/v2/table.pb.h>
2729
#include <google/bigtable/v2/data.pb.h>
30+
#include <google/bigtable/v2/types.pb.h>
31+
#include <absl/strings/str_format.h>
2832
#include <chrono>
33+
#include <cstdint>
2934
#include <map>
3035
#include <memory>
3136
#include <optional>
@@ -88,6 +93,12 @@ class ColumnRow {
8893
*/
8994
absl::optional<std::string> SetCell(std::chrono::milliseconds timestamp,
9095
std::string const& value);
96+
97+
StatusOr<absl::optional<std::string>> UpdateCell(
98+
std::chrono::milliseconds timestamp, std::string& value,
99+
std::function<StatusOr<std::string>(std::string const&,
100+
std::string&&)> const& update_fn);
101+
91102
/**
92103
* Delete cells falling into a given timestamp range.
93104
*
@@ -173,6 +184,13 @@ class ColumnFamilyRow {
173184
absl::optional<std::string> SetCell(std::string const& column_qualifier,
174185
std::chrono::milliseconds timestamp,
175186
std::string const& value);
187+
188+
StatusOr<absl::optional<std::string>> UpdateCell(
189+
std::string const& column_qualifier, std::chrono::milliseconds timestamp,
190+
std::string& value,
191+
std::function<StatusOr<std::string>(std::string const&,
192+
std::string&&)> const& update_fn);
193+
176194
/**
177195
* Delete cells falling into a given timestamp range in one column.
178196
*
@@ -237,6 +255,13 @@ class ColumnFamilyRow {
237255
class ColumnFamily {
238256
public:
239257
ColumnFamily() = default;
258+
// ConstructAggregateColumnFamily can be used to return an aggregate
259+
// ColumnFamily that can support AddToCell or MergeToCell and
260+
// similar aggregate complex types. To construct an ordinary
261+
// ColumnFamily, use the default constructor ColumnFamily().
262+
static StatusOr<std::shared_ptr<ColumnFamily>> ConstructAggregateColumnFamily(
263+
google::bigtable::admin::v2::Type value_type);
264+
240265
// Disable copying.
241266
ColumnFamily(ColumnFamily const&) = delete;
242267
ColumnFamily& operator=(ColumnFamily const&) = delete;
@@ -274,6 +299,21 @@ class ColumnFamily {
274299
std::string const& column_qualifier,
275300
std::chrono::milliseconds timestamp,
276301
std::string const& value);
302+
303+
/**
304+
* UpdateCell is like SetCell except that, when a cell exists with
305+
* the same timestamp, an update function (that depends on the column
306+
* family type) is called to derive a new value from the new and
307+
* existing value, and that is the value that is written.
308+
*
309+
* Simple (non-aggregate) column families have a default update
310+
* function that just returns the new value.
311+
*
312+
*/
313+
StatusOr<absl::optional<std::string>> UpdateCell(
314+
std::string const& row_key, std::string const& column_qualifier,
315+
std::chrono::milliseconds timestamp, std::string& value);
316+
277317
/**
278318
* Delete the whole row from this column family.
279319
*
@@ -351,9 +391,81 @@ class ColumnFamily {
351391
}
352392

353393
void clear() { rows_.clear(); }
394+
absl::optional<google::bigtable::admin::v2::Type> GetValueType() {
395+
return value_type_;
396+
};
354397

355398
private:
356399
std::map<std::string, ColumnFamilyRow> rows_;
400+
401+
// Support for aggregate and other complex types.
402+
absl::optional<google::bigtable::admin::v2::Type> value_type_ = absl::nullopt;
403+
404+
static StatusOr<std::string> DefaultUpdateCell(
405+
std::string const& /*existing_value*/, std::string&& new_value) {
406+
return new_value;
407+
};
408+
409+
static StatusOr<std::string> SumUpdateCellBEInt64(
410+
std::string const& existing_value, std::string&& new_value) {
411+
auto existing_value_int =
412+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
413+
if (!existing_value_int) {
414+
return existing_value_int.status();
415+
}
416+
417+
auto new_value_int =
418+
google::cloud::internal::DecodeBigEndian<std::int64_t>(new_value);
419+
if (!new_value_int) {
420+
return new_value_int.status();
421+
}
422+
423+
return google::cloud::internal::EncodeBigEndian(existing_value_int.value() +
424+
new_value_int.value());
425+
};
426+
427+
static StatusOr<std::string> MaxUpdateCellBEInt64(
428+
std::string const& existing_value, std::string&& new_value) {
429+
auto existing_int =
430+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
431+
if (!existing_int) {
432+
return existing_int.status();
433+
}
434+
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
435+
std::move(new_value));
436+
if (!new_int) {
437+
return new_int.status();
438+
}
439+
440+
if (existing_int.value() > new_int.value()) {
441+
return existing_value;
442+
}
443+
444+
return new_value;
445+
};
446+
447+
static StatusOr<std::string> MinUpdateCellBEInt64(
448+
std::string const& existing_value, std::string&& new_value) {
449+
auto existing_int =
450+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
451+
if (!existing_int) {
452+
return existing_int.status();
453+
}
454+
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
455+
std::move(new_value));
456+
if (!new_int) {
457+
return new_int.status();
458+
}
459+
460+
if (existing_int.value() < new_int.value()) {
461+
return existing_value;
462+
}
463+
464+
return new_value;
465+
};
466+
467+
std::function<StatusOr<std::string>(std::string const&, std::string&&)>
468+
update_cell_ = DefaultUpdateCell;
357469
};
358470

359471
/**

google/cloud/bigtable/emulator/column_family_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "google/cloud/testing_util/is_proto_equal.h"
1919
#include <google/protobuf/text_format.h>
2020
#include <gmock/gmock.h>
21+
#include <array>
22+
#include <cstdint>
2123

2224
namespace google {
2325
namespace cloud {

0 commit comments

Comments
 (0)