Skip to content

Commit 58f0e01

Browse files
authored
Merge pull request #30254 from redpanda-data/sr/noticket/kafka-transport
2 parents 04a314c + 57d80b7 commit 58f0e01

15 files changed

Lines changed: 719 additions & 399 deletions

src/v/pandaproxy/schema_registry/BUILD

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,53 @@ redpanda_cc_library(
6666
],
6767
)
6868

69+
redpanda_cc_library(
70+
name = "transport",
71+
hdrs = [
72+
"transport.h",
73+
],
74+
visibility = ["//visibility:public"],
75+
deps = [
76+
"//src/v/cluster:errc",
77+
"//src/v/cluster:topic_configuration",
78+
"//src/v/model",
79+
"@seastar",
80+
],
81+
)
82+
83+
redpanda_cc_library(
84+
name = "kafka_client_transport",
85+
srcs = [
86+
"kafka_client_transport.cc",
87+
],
88+
hdrs = [
89+
"kafka_client_transport.h",
90+
],
91+
implementation_deps = [
92+
":exceptions",
93+
"//src/v/cluster",
94+
"//src/v/cluster:ephemeral_credential_frontend",
95+
"//src/v/cluster:members_table",
96+
"//src/v/config",
97+
"//src/v/kafka/client:config_utils",
98+
"//src/v/kafka/client:exceptions",
99+
"//src/v/kafka/data/rpc",
100+
"//src/v/kafka/protocol:create_topics",
101+
"//src/v/kafka/server:topic_config_utils",
102+
"//src/v/model",
103+
"//src/v/pandaproxy:logger",
104+
"//src/v/security",
105+
],
106+
visibility = ["//visibility:public"],
107+
deps = [
108+
":transport",
109+
"//src/v/cluster:fwd",
110+
"//src/v/kafka/client",
111+
"//src/v/kafka/client:configuration",
112+
"@seastar",
113+
],
114+
)
115+
69116
redpanda_cc_library(
70117
name = "core",
71118
srcs = [
@@ -99,32 +146,19 @@ redpanda_cc_library(
99146
"util.h",
100147
],
101148
implementation_deps = [
102-
"//src/v/bytes:iobuf_parser",
103149
"//src/v/bytes:streambuf",
104150
"//src/v/cluster",
105151
"//src/v/container:chunked_hash_map",
106-
"//src/v/container:chunked_vector",
107152
"//src/v/container:json",
108153
"//src/v/hashing:jump_consistent",
109154
"//src/v/hashing:xx",
110-
"//src/v/json",
111155
"//src/v/kafka/protocol",
112-
"//src/v/metrics",
113-
"//src/v/model",
114-
"//src/v/model:batch_compression",
115156
"//src/v/pandaproxy:core",
116-
"//src/v/pandaproxy:logger",
117-
"//src/v/random:time_jitter",
118157
"//src/v/schema/protobuf:confluent_type_cc_proto",
119158
"//src/v/schema/protobuf:google_type_cc_proto",
120159
"//src/v/ssx:future_util",
121-
"//src/v/ssx:semaphore",
122-
"//src/v/ssx:sformat",
123-
"//src/v/storage:record_batch_builder",
124-
"//src/v/strings:string_switch",
125160
"//src/v/utils:base64",
126161
"//src/v/utils:named_type",
127-
"//src/v/utils:retry",
128162
"//src/v/utils:to_string",
129163
"@abseil-cpp//absl/algorithm:container",
130164
"@abseil-cpp//absl/container:btree",
@@ -137,7 +171,6 @@ redpanda_cc_library(
137171
"@boost//:algorithm",
138172
"@boost//:graph",
139173
"@boost//:math",
140-
"@boost//:multi_index",
141174
"@boost//:outcome",
142175
"@boost//:range",
143176
"@fmt",
@@ -155,16 +188,27 @@ redpanda_cc_library(
155188
":config",
156189
":rjson",
157190
":subject_name_strategy",
191+
":transport",
158192
":types",
159193
"//src/v/base",
194+
"//src/v/bytes:iobuf_parser",
160195
"//src/v/config",
161196
"//src/v/config:startup_config",
162-
"//src/v/kafka/client",
163-
"//src/v/kafka/client:config_utils",
164-
"//src/v/kafka/client:configuration",
165-
"//src/v/kafka/client:exceptions",
197+
"//src/v/container:chunked_vector",
198+
"//src/v/json",
199+
"//src/v/metrics",
200+
"//src/v/model",
201+
"//src/v/model:batch_compression",
166202
"//src/v/pandaproxy:json",
203+
"//src/v/pandaproxy:logger",
167204
"//src/v/pandaproxy:parsing",
205+
"//src/v/random:time_jitter",
206+
"//src/v/ssx:semaphore",
207+
"//src/v/ssx:sformat",
208+
"//src/v/storage:record_batch_builder",
209+
"//src/v/strings:string_switch",
210+
"//src/v/utils:retry",
211+
"@boost//:multi_index",
168212
"@seastar",
169213
],
170214
)
@@ -203,6 +247,7 @@ redpanda_cc_library(
203247
],
204248
implementation_deps = [
205249
":core",
250+
":kafka_client_transport",
206251
"//src/v/base",
207252
"//src/v/bytes:iobuf_parser",
208253
"//src/v/cluster",
@@ -221,9 +266,7 @@ redpanda_cc_library(
221266
"//src/v/container:chunked_vector",
222267
"//src/v/container:json",
223268
"//src/v/json",
224-
"//src/v/kafka/client:config_utils",
225269
"//src/v/kafka/client:configuration",
226-
"//src/v/kafka/client:exceptions",
227270
"//src/v/kafka/data/rpc",
228271
"//src/v/kafka/protocol",
229272
"//src/v/kafka/protocol:create_topics",
@@ -260,6 +303,7 @@ redpanda_cc_library(
260303
":config",
261304
":rjson",
262305
":subject_name_strategy",
306+
":transport",
263307
":types",
264308
"//src/v/cluster:client_quota_backend",
265309
"//src/v/cluster:cluster_link_table",
@@ -271,7 +315,6 @@ redpanda_cc_library(
271315
"//src/v/cluster:plugin_table",
272316
"//src/v/cluster:rpc_utils",
273317
"//src/v/config:startup_config",
274-
"//src/v/kafka/client",
275318
"//src/v/utils:variant",
276319
],
277320
)
@@ -301,6 +344,7 @@ redpanda_cc_library(
301344
visibility = ["//visibility:public"],
302345
deps = [
303346
":server",
347+
":transport",
304348
"//src/v/config:startup_config",
305349
"//src/v/pandaproxy:core",
306350
],

src/v/pandaproxy/schema_registry/api.cc

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "kafka/data/rpc/deps.h"
1717
#include "pandaproxy/logger.h"
1818
#include "pandaproxy/schema_registry/configuration.h"
19+
#include "pandaproxy/schema_registry/kafka_client_transport.h"
1920
#include "pandaproxy/schema_registry/schema_id_cache.h"
2021
#include "pandaproxy/schema_registry/service.h"
2122
#include "pandaproxy/schema_registry/sharded_store.h"
@@ -83,38 +84,36 @@ ss::future<> api::start() {
8384
return config::shard_local_cfg()
8485
.kafka_schema_id_validation_cache_capacity.bind();
8586
}));
86-
co_await _client.start(
87-
config::to_yaml(_client_cfg, config::redact_secrets::no),
88-
[this](std::exception_ptr ex) {
89-
return _service.local().mitigate_error(ex);
90-
});
87+
co_await _transport.start(
88+
std::ref(_client_cfg),
89+
std::ref(*_controller),
90+
ss::sharded_parameter([this]() {
91+
return kafka::data::rpc::topic_creator::make_default(
92+
_controller.get());
93+
}));
9194
co_await _sequencer.start(
9295
_node_id,
9396
_sg,
94-
std::ref(_client),
97+
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
9598
std::ref(*_store),
9699
ss::sharded_parameter([this] {
97100
return std::make_unique<sequence_state_checker_impl>(_controller);
98101
}));
99102
co_await _service.start(
100103
config::to_yaml(_cfg, config::redact_secrets::no),
101-
config::to_yaml(_client_cfg, config::redact_secrets::no),
102104
_sg,
103105
_max_memory,
104-
std::ref(_client),
106+
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
105107
std::ref(*_store),
106108
std::ref(_sequencer),
107109
ss::sharded_parameter([this]() {
108110
return kafka::data::rpc::topic_metadata_cache::make_default(
109111
_metadata_cache);
110112
}),
111-
ss::sharded_parameter([this]() {
112-
return kafka::data::rpc::topic_creator::make_default(
113-
_controller.get());
114-
}),
115113
std::ref(_controller),
116114
std::ref(_audit_mgr));
117115

116+
co_await _transport.invoke_on_all(&kafka_client_transport::configure);
118117
co_await _service.invoke_on_all(&service::start);
119118

120119
if (ss::this_shard_id() == 0) {
@@ -143,10 +142,10 @@ ss::future<> api::stop() {
143142
// Reset gate to support api restart
144143
_metrics_gate = ss::gate{};
145144
}
146-
co_await _client.invoke_on_all(&kafka::client::client::stop);
145+
co_await _transport.invoke_on_all(&kafka_client_transport::stop);
147146
co_await _service.stop();
148147
co_await _sequencer.stop();
149-
co_await _client.stop();
148+
co_await _transport.stop();
150149
co_await _schema_id_cache.stop();
151150
co_await _schema_id_validation_probe.stop();
152151
if (_store) {
@@ -171,7 +170,7 @@ const kafka::client::configuration& api::get_client_config() const {
171170
}
172171

173172
bool api::has_ephemeral_credentials() const {
174-
return _service.local().has_ephemeral_credentials();
173+
return _transport.local().has_ephemeral_credentials();
175174
}
176175

177176
ss::future<> api::contribute_metrics(

src/v/pandaproxy/schema_registry/api.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
#include "base/seastarx.h"
1515
#include "cluster/metrics_reporter.h"
16-
#include "kafka/client/fwd.h"
16+
#include "kafka/client/configuration.h"
1717
#include "model/metadata.h"
1818
#include "pandaproxy/schema_registry/fwd.h"
1919
#include "security/fwd.h"
@@ -73,7 +73,7 @@ class api {
7373
ss::sharded<cluster::metadata_cache>* _metadata_cache;
7474
std::unique_ptr<cluster::controller>& _controller;
7575

76-
ss::sharded<kafka::client::client> _client;
76+
ss::sharded<kafka_client_transport> _transport;
7777
std::unique_ptr<pandaproxy::schema_registry::sharded_store> _store;
7878
ss::sharded<schema_id_validation_probe> _schema_id_validation_probe;
7979
ss::sharded<schema_id_cache> _schema_id_cache;

src/v/pandaproxy/schema_registry/fwd.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,7 @@ class seq_writer;
2222
class service;
2323
class sharded_store;
2424
class store;
25+
class transport;
26+
class kafka_client_transport;
2527

2628
} // namespace pandaproxy::schema_registry

0 commit comments

Comments
 (0)