Skip to content

Commit 94cdc27

Browse files
committed
Merge branch 'upstream_emulator' into sample_row_keys
2 parents 4db993a + e2155d9 commit 94cdc27

6 files changed

Lines changed: 656 additions & 55 deletions

File tree

google/cloud/bigtable/emulator/column_family.cc

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414

1515
#include "google/cloud/bigtable/emulator/column_family.h"
1616
#include "google/cloud/internal/big_endian.h"
17+
#include <google/bigtable/v2/types.pb.h>
1718
#include <absl/types/optional.h>
19+
#include <array>
1820
#include <chrono>
1921
#include <cstdint>
22+
#include <iterator>
2023
#include <map>
24+
#include <string>
25+
#include <utility>
2126

2227
namespace google {
2328
namespace cloud {
@@ -118,9 +123,32 @@ ReadModifyWriteCellResult ColumnRow::ReadModifyWrite(
118123
absl::optional<std::string> ColumnRow::SetCell(
119124
std::chrono::milliseconds timestamp, std::string const& value) {
120125
absl::optional<std::string> ret = absl::nullopt;
126+
127+
auto cell_it = cells_.find(timestamp);
128+
if (!(cell_it == cells_.end())) {
129+
ret = std::move(cell_it->second);
130+
}
131+
132+
cells_[timestamp] = value;
133+
134+
return ret;
135+
}
136+
137+
StatusOr<absl::optional<std::string>> ColumnRow::UpdateCell(
138+
std::chrono::milliseconds timestamp, std::string& value,
139+
std::function<StatusOr<std::string>(std::string const&,
140+
std::string&&)> const& update_fn) {
141+
absl::optional<std::string> ret = absl::nullopt;
142+
121143
auto cell_it = cells_.find(timestamp);
122144
if (!(cell_it == cells_.end())) {
145+
auto maybe_update_value = update_fn(cell_it->second, std::move(value));
146+
if (!maybe_update_value) {
147+
return maybe_update_value.status();
148+
}
123149
ret = std::move(cell_it->second);
150+
maybe_update_value.value().swap(cell_it->second);
151+
return ret;
124152
}
125153

126154
cells_[timestamp] = value;
@@ -166,6 +194,15 @@ absl::optional<std::string> ColumnFamilyRow::SetCell(
166194
return columns_[column_qualifier].SetCell(timestamp, value);
167195
}
168196

197+
StatusOr<absl::optional<std::string>> ColumnFamilyRow::UpdateCell(
198+
std::string const& column_qualifier, std::chrono::milliseconds timestamp,
199+
std::string& value,
200+
std::function<StatusOr<std::string>(std::string const&,
201+
std::string&&)> const& update_fn) {
202+
return columns_[column_qualifier].UpdateCell(timestamp, value,
203+
std::move(update_fn));
204+
}
205+
169206
std::vector<Cell> ColumnFamilyRow::DeleteColumn(
170207
std::string const& column_qualifier,
171208
::google::bigtable::v2::TimestampRange const& time_range) {
@@ -201,6 +238,13 @@ absl::optional<std::string> ColumnFamily::SetCell(
201238
return rows_[row_key].SetCell(column_qualifier, timestamp, value);
202239
}
203240

241+
StatusOr<absl::optional<std::string>> ColumnFamily::UpdateCell(
242+
std::string const& row_key, std::string const& column_qualifier,
243+
std::chrono::milliseconds timestamp, std::string& value) {
244+
return rows_[row_key].UpdateCell(column_qualifier, timestamp, value,
245+
update_cell_);
246+
}
247+
204248
std::map<std::string, std::vector<Cell>> ColumnFamily::DeleteRow(
205249
std::string const& row_key) {
206250
std::map<std::string, std::vector<Cell>> res;
@@ -215,11 +259,11 @@ std::map<std::string, std::vector<Cell>> ColumnFamily::DeleteRow(
215259
::google::bigtable::v2::TimestampRange time_range;
216260
auto deleted_cells = column.second.DeleteTimeRange(time_range);
217261
if (!deleted_cells.empty()) {
218-
res[std::move(column.first)] = std::move(deleted_cells);
262+
res[column.first] = std::move(deleted_cells);
219263
}
220264
}
221265

222-
rows_.erase(row_key);
266+
rows_.erase(row_it);
223267

224268
return res;
225269
}
@@ -385,6 +429,41 @@ bool FilteredColumnFamilyStream::PointToFirstCellAfterRowChange() const {
385429
return false;
386430
}
387431

432+
StatusOr<std::shared_ptr<ColumnFamily>>
433+
ColumnFamily::ConstructAggregateColumnFamily(
434+
google::bigtable::admin::v2::Type value_type) {
435+
auto cf = std::make_shared<ColumnFamily>();
436+
437+
if (value_type.has_aggregate_type()) {
438+
auto const& aggregate_type = value_type.aggregate_type();
439+
switch (aggregate_type.aggregator_case()) {
440+
case google::bigtable::admin::v2::Type::Aggregate::kSum:
441+
cf->update_cell_ = cf->SumUpdateCellBEInt64;
442+
break;
443+
case google::bigtable::admin::v2::Type::Aggregate::kMin:
444+
cf->update_cell_ = cf->MinUpdateCellBEInt64;
445+
break;
446+
case google::bigtable::admin::v2::Type::Aggregate::kMax:
447+
cf->update_cell_ = cf->MaxUpdateCellBEInt64;
448+
break;
449+
default:
450+
return InvalidArgumentError(
451+
"unsupported aggregation type",
452+
GCP_ERROR_INFO().WithMetadata(
453+
"aggregation case",
454+
absl::StrFormat("%d", aggregate_type.aggregator_case())));
455+
}
456+
457+
cf->value_type_ = std::move(value_type);
458+
459+
return cf;
460+
}
461+
462+
return InvalidArgumentError(
463+
"no aggregate type set in the supplied value_type",
464+
GCP_ERROR_INFO().WithMetadata("supplied value type",
465+
value_type.DebugString()));
466+
}
388467
} // namespace emulator
389468
} // namespace bigtable
390469
} // namespace cloud

google/cloud/bigtable/emulator/column_family.h

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,23 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_EMULATOR_COLUMN_FAMILY_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_EMULATOR_COLUMN_FAMILY_H
1717

18+
#include "google/cloud/bigtable/cell.h"
1819
#include "google/cloud/bigtable/emulator/cell_view.h"
1920
#include "google/cloud/bigtable/emulator/filter.h"
2021
#include "google/cloud/bigtable/emulator/filtered_map.h"
2122
#include "google/cloud/bigtable/emulator/range_set.h"
2223
#include "google/cloud/bigtable/read_modify_write_rule.h"
2324
#include "google/cloud/internal/big_endian.h"
2425
#include "google/cloud/internal/make_status.h"
26+
#include "google/cloud/status_or.h"
2527
#include "absl/types/optional.h"
2628
#include <google/bigtable/admin/v2/table.pb.h>
2729
#include <google/bigtable/v2/data.pb.h>
30+
#include <google/bigtable/v2/types.pb.h>
31+
#include <absl/strings/str_format.h>
2832
#include <chrono>
2933
#include <cstddef>
34+
#include <cstdint>
3035
#include <map>
3136
#include <memory>
3237
#include <optional>
@@ -89,6 +94,12 @@ class ColumnRow {
8994
*/
9095
absl::optional<std::string> SetCell(std::chrono::milliseconds timestamp,
9196
std::string const& value);
97+
98+
StatusOr<absl::optional<std::string>> UpdateCell(
99+
std::chrono::milliseconds timestamp, std::string& value,
100+
std::function<StatusOr<std::string>(std::string const&,
101+
std::string&&)> const& update_fn);
102+
92103
/**
93104
* Delete cells falling into a given timestamp range.
94105
*
@@ -174,6 +185,13 @@ class ColumnFamilyRow {
174185
absl::optional<std::string> SetCell(std::string const& column_qualifier,
175186
std::chrono::milliseconds timestamp,
176187
std::string const& value);
188+
189+
StatusOr<absl::optional<std::string>> UpdateCell(
190+
std::string const& column_qualifier, std::chrono::milliseconds timestamp,
191+
std::string& value,
192+
std::function<StatusOr<std::string>(std::string const&,
193+
std::string&&)> const& update_fn);
194+
177195
/**
178196
* Delete cells falling into a given timestamp range in one column.
179197
*
@@ -238,6 +256,13 @@ class ColumnFamilyRow {
238256
class ColumnFamily {
239257
public:
240258
ColumnFamily() = default;
259+
// ConstructAggregateColumnFamily can be used to return an aggregate
260+
// ColumnFamily that can support AddToCell or MergeToCell and
261+
// similar aggregate complex types. To construct an ordinary
262+
// ColumnFamily, use the default constructor ColumnFamily().
263+
static StatusOr<std::shared_ptr<ColumnFamily>> ConstructAggregateColumnFamily(
264+
google::bigtable::admin::v2::Type value_type);
265+
241266
// Disable copying.
242267
ColumnFamily(ColumnFamily const&) = delete;
243268
ColumnFamily& operator=(ColumnFamily const&) = delete;
@@ -275,6 +300,21 @@ class ColumnFamily {
275300
std::string const& column_qualifier,
276301
std::chrono::milliseconds timestamp,
277302
std::string const& value);
303+
304+
/**
305+
* UpdateCell is like SetCell except that, when a cell exists with
306+
* the same timestamp, an update function (that depends on the column
307+
* family type) is called to derive a new value from the new and
308+
* existing value, and that is the value that is written.
309+
*
310+
* Simple (non-aggregate) column families have a default update
311+
* function that just returns the new value.
312+
*
313+
*/
314+
StatusOr<absl::optional<std::string>> UpdateCell(
315+
std::string const& row_key, std::string const& column_qualifier,
316+
std::chrono::milliseconds timestamp, std::string& value);
317+
278318
/**
279319
* Delete the whole row from this column family.
280320
*
@@ -352,9 +392,81 @@ class ColumnFamily {
352392
}
353393

354394
void clear() { rows_.clear(); }
395+
absl::optional<google::bigtable::admin::v2::Type> GetValueType() {
396+
return value_type_;
397+
};
355398

356399
private:
357400
std::map<std::string, ColumnFamilyRow> rows_;
401+
402+
// Support for aggregate and other complex types.
403+
absl::optional<google::bigtable::admin::v2::Type> value_type_ = absl::nullopt;
404+
405+
static StatusOr<std::string> DefaultUpdateCell(
406+
std::string const& /*existing_value*/, std::string&& new_value) {
407+
return new_value;
408+
};
409+
410+
static StatusOr<std::string> SumUpdateCellBEInt64(
411+
std::string const& existing_value, std::string&& new_value) {
412+
auto existing_value_int =
413+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
414+
if (!existing_value_int) {
415+
return existing_value_int.status();
416+
}
417+
418+
auto new_value_int =
419+
google::cloud::internal::DecodeBigEndian<std::int64_t>(new_value);
420+
if (!new_value_int) {
421+
return new_value_int.status();
422+
}
423+
424+
return google::cloud::internal::EncodeBigEndian(existing_value_int.value() +
425+
new_value_int.value());
426+
};
427+
428+
static StatusOr<std::string> MaxUpdateCellBEInt64(
429+
std::string const& existing_value, std::string&& new_value) {
430+
auto existing_int =
431+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
432+
if (!existing_int) {
433+
return existing_int.status();
434+
}
435+
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
436+
std::move(new_value));
437+
if (!new_int) {
438+
return new_int.status();
439+
}
440+
441+
if (existing_int.value() > new_int.value()) {
442+
return existing_value;
443+
}
444+
445+
return new_value;
446+
};
447+
448+
static StatusOr<std::string> MinUpdateCellBEInt64(
449+
std::string const& existing_value, std::string&& new_value) {
450+
auto existing_int =
451+
google::cloud::internal::DecodeBigEndian<std::int64_t>(existing_value);
452+
if (!existing_int) {
453+
return existing_int.status();
454+
}
455+
auto new_int = google::cloud::internal::DecodeBigEndian<std::int64_t>(
456+
std::move(new_value));
457+
if (!new_int) {
458+
return new_int.status();
459+
}
460+
461+
if (existing_int.value() < new_int.value()) {
462+
return existing_value;
463+
}
464+
465+
return new_value;
466+
};
467+
468+
std::function<StatusOr<std::string>(std::string const&, std::string&&)>
469+
update_cell_ = DefaultUpdateCell;
358470
};
359471

360472
/**

google/cloud/bigtable/emulator/column_family_test.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "google/cloud/testing_util/is_proto_equal.h"
1919
#include <google/protobuf/text_format.h>
2020
#include <gmock/gmock.h>
21+
#include <array>
22+
#include <cstdint>
2123

2224
namespace google {
2325
namespace cloud {

0 commit comments

Comments
 (0)