Skip to content

Commit 49cc0c6

Browse files
committed
feat ydb: support retries for interactive tx
1 parent 84a80a8 commit 49cc0c6

File tree

21 files changed

+733
-240
lines changed

21 files changed

+733
-240
lines changed

cmake/SetupYdbCppSDK.cmake

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ write_package_stub(jwt-cpp)
2525

2626
set(RAPIDJSON_INCLUDE_DIRS "${USERVER_THIRD_PARTY_DIRS}/rapidjson/include")
2727

28+
if(Protobuf_INCLUDE_DIR)
29+
set(Protobuf_INCLUDE_DIR "${Protobuf_INCLUDE_DIR}" CACHE PATH "" FORCE)
30+
endif()
31+
2832
if(TARGET userver-api-common-protos)
2933
set(YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET userver-api-common-protos)
3034
else()
@@ -37,6 +41,7 @@ cpmaddpackage(
3741
GIT_TAG v3.13.0
3842
GITHUB_REPOSITORY ydb-platform/ydb-cpp-sdk
3943
GIT_SHALLOW TRUE
44+
PATCHES ydb-cpp-sdk_protobuf_include.patch
4045
OPTIONS "Brotli_VERSION ${Brotli_VERSION}" "RAPIDJSON_INCLUDE_DIRS ${RAPIDJSON_INCLUDE_DIRS}"
4146
"YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET ${YDB_SDK_GOOGLE_COMMON_PROTOS_TARGET}" "YDB_SDK_EXAMPLES OFF"
4247
)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
From 0000000000000000000000000000000000000000 Mon Sep 17 00:00:00 2001
2+
From: userver <userver@yandex-team.ru>
3+
Date: Tue, 25 Feb 2026 00:00:00 +0000
4+
Subject: [PATCH] Fix protobuf 4.24 compatibility
5+
6+
1. cmake/protobuf: Add Protobuf well-known types include path so that
7+
protoc can find google/protobuf/struct.proto when Protobuf is fetched
8+
via CPM.
9+
10+
2. operation.h: Remove include of google/protobuf/stubs/status.h which
11+
was removed in protobuf 4.22+. The include is unnecessary because
12+
MessageToJsonString now returns absl::Status.
13+
14+
---
15+
cmake/protobuf.cmake | 4 ++++
16+
include/ydb-cpp-sdk/client/types/operation/operation.h | 1 -
17+
2 files changed, 4 insertions(+), 1 deletion(-)
18+
19+
diff --git a/cmake/protobuf.cmake b/cmake/protobuf.cmake
20+
index 1111111..2222222 100644
21+
--- a/cmake/protobuf.cmake
22+
+++ b/cmake/protobuf.cmake
23+
@@ -55,6 +55,10 @@ function(_ydb_sdk_init_proto_library_impl Tgt USE_API_COMMON_PROTOS)
24+
25+
set(proto_incls ${YDB_SDK_SOURCE_DIR})
26+
27+
+ if(Protobuf_INCLUDE_DIR)
28+
+ list(APPEND proto_incls ${Protobuf_INCLUDE_DIR})
29+
+ endif()
30+
+
31+
if (USE_API_COMMON_PROTOS)
32+
target_link_libraries(${Tgt} PUBLIC
33+
api-common-protos
34+
diff --git a/include/ydb-cpp-sdk/client/types/operation/operation.h b/include/ydb-cpp-sdk/client/types/operation/operation.h
35+
index 3333333..4444444 100644
36+
--- a/include/ydb-cpp-sdk/client/types/operation/operation.h
37+
+++ b/include/ydb-cpp-sdk/client/types/operation/operation.h
38+
@@ -6,7 +6,6 @@
39+
40+
#include <library/cpp/threading/future/future.h>
41+
42+
-#include <google/protobuf/stubs/status.h>
43+
#include <google/protobuf/timestamp.pb.h>
44+
#include <google/protobuf/util/json_util.h>
45+

samples/ydb_service/views/upsert-2rows/post/view.cpp

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,29 +27,31 @@ VALUES ($id_key, $name_key, $service_key, $channel_key, CurrentUtcTimestamp(), $
2727
ydb::Query::LogMode::kNameOnly,
2828
};
2929

30-
auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW);
31-
32-
for (auto i : {1, 2}) {
33-
auto response = trx.Execute(
34-
kUpsertQuery, //
35-
"$id_key",
36-
request["id"].As<std::string>() + std::to_string(i), //
37-
"$name_key",
38-
ydb::Utf8{request["name"].As<std::string>() + std::to_string(i)}, //
39-
"$service_key",
40-
request["service"].As<std::string>(), //
41-
"$channel_key",
42-
request["channel"].As<int64_t>(), //
43-
"$state_key",
44-
request["state"].As<std::optional<formats::json::Value>>() //
45-
);
46-
47-
if (response.GetCursorCount() != 0) {
48-
throw std::runtime_error("Unexpected response data");
30+
Ydb().RetryTx("trx", {.tx_mode = ydb::TransactionMode::kSerializableRW},
31+
[&](ydb::TxActor& tx) {
32+
for (auto i : {1, 2}) {
33+
auto response = tx.Execute(
34+
kUpsertQuery, //
35+
"$id_key",
36+
request["id"].As<std::string>() + std::to_string(i), //
37+
"$name_key",
38+
ydb::Utf8{request["name"].As<std::string>() + std::to_string(i)}, //
39+
"$service_key",
40+
request["service"].As<std::string>(), //
41+
"$channel_key",
42+
request["channel"].As<int64_t>(), //
43+
"$state_key",
44+
request["state"].As<std::optional<formats::json::Value>>() //
45+
);
46+
47+
if (response.GetCursorCount() != 0) {
48+
throw std::runtime_error("Unexpected response data");
49+
}
50+
}
51+
52+
return ydb::TxAction::kCommit;
4953
}
50-
}
51-
52-
trx.Commit();
54+
);
5355

5456
return formats::json::MakeObject();
5557
}

ydb/functional_tests/basic/views/upsert-row/post/view.cpp

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,26 +34,29 @@ UpsertRowHandler::HandleRequestJsonThrow(const server::http::HttpRequest&, const
3434
const {
3535
engine::SleepFor(std::chrono::milliseconds(10));
3636

37-
auto trx = Ydb().Begin("trx", ydb::TransactionMode::kSerializableRW);
38-
auto response = trx.Execute(
39-
kUpsertQuery, //
40-
"$id_key",
41-
request["id"].As<std::string>(), //
42-
"$name_key",
43-
ydb::Utf8{request["name"].As<std::string>()}, //
44-
"$service_key",
45-
request["service"].As<std::string>(), //
46-
"$channel_key",
47-
request["channel"].As<int64_t>(), //
48-
"$state_key",
49-
request["state"].As<std::optional<formats::json::Value>>() //
50-
);
37+
Ydb().RetryTx("trx", {.tx_mode = ydb::TransactionMode::kSerializableRW},
38+
[&](ydb::TxActor& tx) {
39+
auto response = tx.Execute(
40+
kUpsertQuery, //
41+
"$id_key",
42+
request["id"].As<std::string>(), //
43+
"$name_key",
44+
ydb::Utf8{request["name"].As<std::string>()}, //
45+
"$service_key",
46+
request["service"].As<std::string>(), //
47+
"$channel_key",
48+
request["channel"].As<int64_t>(), //
49+
"$state_key",
50+
request["state"].As<std::optional<formats::json::Value>>() //
51+
);
5152

52-
if (response.GetCursorCount()) {
53-
throw std::runtime_error("Unexpected response data");
54-
}
53+
if (response.GetCursorCount()) {
54+
throw std::runtime_error("Unexpected response data");
55+
}
5556

56-
trx.Commit();
57+
return ydb::TxAction::kCommit;
58+
}
59+
);
5760

5861
return formats::json::MakeObject();
5962
}

ydb/include/userver/ydb/builder.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class PreparedArgsBuilder final {
3737
private:
3838
friend class Transaction;
3939
friend class TableClient;
40+
friend class TxActor;
4041
struct PreparedArgsWithKey;
4142

4243
NYdb::TParams Build() && { return std::move(builder_).Build(); }

ydb/include/userver/ydb/io/structs.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include <boost/pfr/core.hpp>
1414
#include <boost/pfr/core_name.hpp>
1515

16+
#include <fmt/ranges.h>
17+
1618
#include <userver/utils/assert.hpp>
1719
#include <userver/utils/constexpr_indices.hpp>
1820
#include <userver/utils/enumerate.hpp>

ydb/include/userver/ydb/settings.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <chrono>
44
#include <cstdint>
55
#include <optional>
6+
#include <string>
67
#include <string_view>
78

89
#include <ydb-cpp-sdk/client/table/query_stats/stats.h>
@@ -33,6 +34,26 @@ struct QuerySettings final {
3334
std::optional<NYdb::NTable::ECollectQueryStatsMode> collect_query_stats{std::nullopt};
3435
};
3536

37+
struct RequestSettings final {
38+
std::chrono::milliseconds timeout_ms{0};
39+
40+
std::string trace_id{};
41+
};
42+
43+
using ExecuteSettings = RequestSettings;
44+
using CommitSettings = RequestSettings;
45+
using RollbackSettings = RequestSettings;
46+
47+
struct RetryTxSettings final {
48+
TransactionMode tx_mode{TransactionMode::kSerializableRW};
49+
std::chrono::milliseconds timeout_ms{0};
50+
std::uint32_t retries{10};
51+
bool is_idempotent{false};
52+
53+
CommitSettings commit_settings;
54+
RollbackSettings rollback_settings;
55+
};
56+
3657
} // namespace ydb
3758

3859
namespace formats::parse {

ydb/include/userver/ydb/table.hpp

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace impl {
3333
struct Stats;
3434
struct TableSettings;
3535
class Driver;
36+
template <typename Settings = OperationSettings>
3637
struct RequestContext;
3738
enum class IsStreaming : bool {};
3839
} // namespace impl
@@ -48,7 +49,8 @@ using DescribeTableSettings = NYdb::NTable::TDescribeTableSettings;
4849
using DropTableSettings = NYdb::NTable::TDropTableSettings;
4950
using ScanQuerySettings = NYdb::NTable::TStreamExecScanQuerySettings;
5051

51-
/// @brief A dynamic transaction name for @see TableClient::Begin.
52+
/// @brief A dynamic transaction name for @see TableClient::Begin or
53+
/// @see TableClient::RetryTx.
5254
///
5355
/// @warning Make sure that transaction name has low cardinality.
5456
/// If transaction name is unique for every call, per-transaction metrics will overflow metrics quota,
@@ -119,19 +121,45 @@ class TableClient final {
119121
);
120122
/// @}
121123

122-
/// @name Transactions
124+
/// @name Transactions with retry
125+
/// @brief Execute a transactional function with automatic retries.
126+
///
127+
/// The user-provided function receives a TxActor for executing queries
128+
/// and returns TxAction::kCommit or TxAction::kRollback. On transient
129+
/// errors the whole function is retried automatically.
130+
///
131+
/// @code
132+
/// client.RetryTx("my_tx", {.retries = 3},
133+
/// [](ydb::TxActor& tx) {
134+
/// tx.Execute(query, "$id", 1);
135+
/// return ydb::TxAction::kCommit;
136+
/// });
137+
/// @endcode
138+
///
139+
/// @{
140+
void RetryTx(utils::StringLiteral transaction_name, RetryTxSettings retry_settings, RetryTxFunction fn);
141+
142+
/// @warning Make sure that `transaction_name` has low cardinality.
143+
void RetryTx(DynamicTransactionName transaction_name, RetryTxSettings retry_settings, RetryTxFunction fn);
144+
/// @}
145+
146+
/// @name Transactions (deprecated)
123147
/// @brief Begin a transaction with the specified name. The settings are used
124148
/// for the `BEGIN` statement.
149+
/// @deprecated Use RetryTx instead for automatic retry support.
125150
/// @see ydb::Transaction
126151
///
127152
/// @{
153+
[[deprecated("Use RetryTx instead")]]
128154
Transaction Begin(utils::StringLiteral transaction_name, OperationSettings settings = {});
129155

130156
/// @warning Make sure that `transaction_name` has low cardinality.
131157
/// If `transaction_name` is unique for every call, per-transaction metrics will overflow metrics quota,
132158
/// and metrics will become unusable.
159+
[[deprecated("Use RetryTx instead")]]
133160
Transaction Begin(DynamicTransactionName transaction_name, OperationSettings settings = {});
134161

162+
[[deprecated("Use RetryTx instead")]]
135163
Transaction Begin(utils::StringLiteral transaction_name, TransactionMode tx_mode);
136164
/// @}
137165

@@ -233,6 +261,8 @@ class TableClient final {
233261

234262
private:
235263
friend class Transaction;
264+
friend class TxActor;
265+
template <typename Settings>
236266
friend struct impl::RequestContext;
237267

238268
std::string JoinDbPath(std::string_view path) const;
@@ -248,7 +278,7 @@ class TableClient final {
248278
// (TTableClient&, const std::string& full_path, const Settings&)
249279
// -> NThreading::TFuture<T>
250280
// ExecuteSchemeQueryImpl -> T
251-
template <typename QuerySettings, typename Func>
281+
template <typename FuncResult, typename QuerySettings, typename Func>
252282
auto ExecuteWithPathImpl(
253283
std::string_view path,
254284
std::string_view operation_name,

ydb/include/userver/ydb/transaction.hpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <functional>
34
#include <string>
45

56
#include <ydb-cpp-sdk/client/query/client.h>
@@ -18,8 +19,84 @@ USERVER_NAMESPACE_BEGIN
1819

1920
namespace ydb {
2021

22+
/// @brief Transaction actor for use with TableClient::RetryTx.
23+
///
24+
/// Provides only query execution within a transaction. Commit and rollback
25+
/// are controlled by returning TxAction from the retry function.
26+
/// https://ydb.tech/docs/en/concepts/transactions
27+
class TxActor {
28+
public:
29+
TxActor(const TxActor&) = delete;
30+
TxActor& operator=(const TxActor&) = delete;
31+
TxActor(TxActor&&) noexcept = delete;
32+
TxActor& operator=(TxActor&&) = delete;
33+
34+
/// Execute a single data query as a part of the transaction. Query parameters
35+
/// are passed in `Args` as "string key - value" pairs:
36+
///
37+
/// @code
38+
/// tx.Execute(query, "name1", value1, "name2", value2, ...);
39+
/// @endcode
40+
///
41+
/// Use ydb::PreparedArgsBuilder for storing a generic buffer of query params
42+
/// if needed.
43+
///
44+
/// @{
45+
template <typename... Args>
46+
ExecuteResponse Execute(const Query& query, Args&&... args);
47+
48+
template <typename... Args>
49+
ExecuteResponse Execute(ExecuteSettings settings, const Query& query, Args&&... args);
50+
51+
ExecuteResponse Execute(ExecuteSettings settings, const Query& query, PreparedArgsBuilder&& builder);
52+
/// @}
53+
54+
PreparedArgsBuilder GetBuilder() const;
55+
56+
private:
57+
friend class TableClient;
58+
59+
TxActor(
60+
TableClient& table_client,
61+
NYdb::NQuery::TTransaction ydb_tx,
62+
std::string name
63+
) noexcept;
64+
65+
TableClient& table_client_;
66+
std::string name_;
67+
impl::StatsScope stats_scope_;
68+
tracing::Span span_;
69+
NYdb::NQuery::TTransaction ydb_tx_;
70+
};
71+
72+
template <typename... Args>
73+
ExecuteResponse TxActor::Execute(const Query& query, Args&&... args) {
74+
auto builder = GetBuilder();
75+
builder.AddParams(std::forward<Args>(args)...);
76+
return Execute(ExecuteSettings{}, query, std::move(builder));
77+
}
78+
79+
template <typename... Args>
80+
ExecuteResponse TxActor::Execute(ExecuteSettings settings, const Query& query, Args&&... args) {
81+
auto builder = GetBuilder();
82+
builder.AddParams(std::forward<Args>(args)...);
83+
return Execute(std::move(settings), query, std::move(builder));
84+
}
85+
86+
/// Action to take after the retry function completes.
87+
enum class TxAction {
88+
kCommit,
89+
kRollback,
90+
};
91+
92+
/// Signature for the function passed to TableClient::RetryTx.
93+
using RetryTxFunction = std::function<TxAction(TxActor&)>;
94+
2195
/// @brief YDB Transaction
2296
///
97+
/// @deprecated Use TableClient::RetryTx instead of manually managing
98+
/// transactions with Begin/Commit/Rollback.
99+
///
23100
/// https://ydb.tech/docs/en/concepts/transactions
24101
class Transaction final {
25102
public:

0 commit comments

Comments
 (0)