Skip to content

Commit 0a67a44

Browse files
committed
sr: Swap in transport for kafka::client in service & seq_writer
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent b847f78 commit 0a67a44

10 files changed

Lines changed: 109 additions & 386 deletions

File tree

src/v/pandaproxy/schema_registry/BUILD

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -143,32 +143,19 @@ redpanda_cc_library(
143143
"util.h",
144144
],
145145
implementation_deps = [
146-
"//src/v/bytes:iobuf_parser",
147146
"//src/v/bytes:streambuf",
148147
"//src/v/cluster",
149148
"//src/v/container:chunked_hash_map",
150-
"//src/v/container:chunked_vector",
151149
"//src/v/container:json",
152150
"//src/v/hashing:jump_consistent",
153151
"//src/v/hashing:xx",
154-
"//src/v/json",
155152
"//src/v/kafka/protocol",
156-
"//src/v/metrics",
157-
"//src/v/model",
158-
"//src/v/model:batch_compression",
159153
"//src/v/pandaproxy:core",
160-
"//src/v/pandaproxy:logger",
161-
"//src/v/random:time_jitter",
162154
"//src/v/schema/protobuf:confluent_type_cc_proto",
163155
"//src/v/schema/protobuf:google_type_cc_proto",
164156
"//src/v/ssx:future_util",
165-
"//src/v/ssx:semaphore",
166-
"//src/v/ssx:sformat",
167-
"//src/v/storage:record_batch_builder",
168-
"//src/v/strings:string_switch",
169157
"//src/v/utils:base64",
170158
"//src/v/utils:named_type",
171-
"//src/v/utils:retry",
172159
"//src/v/utils:to_string",
173160
"@abseil-cpp//absl/algorithm:container",
174161
"@abseil-cpp//absl/container:btree",
@@ -181,7 +168,6 @@ redpanda_cc_library(
181168
"@boost//:algorithm",
182169
"@boost//:graph",
183170
"@boost//:math",
184-
"@boost//:multi_index",
185171
"@boost//:outcome",
186172
"@boost//:range",
187173
"@fmt",
@@ -199,15 +185,26 @@ redpanda_cc_library(
199185
":config",
200186
":rjson",
201187
":subject_name_strategy",
188+
":transport",
202189
":types",
203190
"//src/v/base",
191+
"//src/v/bytes:iobuf_parser",
204192
"//src/v/config",
205193
"//src/v/config:startup_config",
206-
"//src/v/kafka/client",
207-
"//src/v/kafka/client:config_utils",
208-
"//src/v/kafka/client:configuration",
209-
"//src/v/kafka/client:exceptions",
194+
"//src/v/container:chunked_vector",
195+
"//src/v/json",
196+
"//src/v/metrics",
197+
"//src/v/model",
198+
"//src/v/model:batch_compression",
210199
"//src/v/pandaproxy:json",
200+
"//src/v/pandaproxy:logger",
201+
"//src/v/random:time_jitter",
202+
"//src/v/ssx:semaphore",
203+
"//src/v/ssx:sformat",
204+
"//src/v/storage:record_batch_builder",
205+
"//src/v/strings:string_switch",
206+
"//src/v/utils:retry",
207+
"@boost//:multi_index",
211208
"@seastar",
212209
],
213210
)
@@ -246,6 +243,7 @@ redpanda_cc_library(
246243
],
247244
implementation_deps = [
248245
":core",
246+
":kafka_client_transport",
249247
"//src/v/base",
250248
"//src/v/bytes:iobuf_parser",
251249
"//src/v/cluster",
@@ -264,9 +262,7 @@ redpanda_cc_library(
264262
"//src/v/container:chunked_vector",
265263
"//src/v/container:json",
266264
"//src/v/json",
267-
"//src/v/kafka/client:config_utils",
268265
"//src/v/kafka/client:configuration",
269-
"//src/v/kafka/client:exceptions",
270266
"//src/v/kafka/data/rpc",
271267
"//src/v/kafka/protocol",
272268
"//src/v/kafka/protocol:create_topics",
@@ -303,6 +299,7 @@ redpanda_cc_library(
303299
":config",
304300
":rjson",
305301
":subject_name_strategy",
302+
":transport",
306303
":types",
307304
"//src/v/cluster:client_quota_backend",
308305
"//src/v/cluster:cluster_link_table",
@@ -314,7 +311,6 @@ redpanda_cc_library(
314311
"//src/v/cluster:plugin_table",
315312
"//src/v/cluster:rpc_utils",
316313
"//src/v/config:startup_config",
317-
"//src/v/kafka/client",
318314
],
319315
)
320316

@@ -343,6 +339,7 @@ redpanda_cc_library(
343339
visibility = ["//visibility:public"],
344340
deps = [
345341
":server",
342+
":transport",
346343
"//src/v/config:startup_config",
347344
"//src/v/pandaproxy:core",
348345
],

src/v/pandaproxy/schema_registry/api.cc

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "kafka/data/rpc/deps.h"
1717
#include "pandaproxy/logger.h"
1818
#include "pandaproxy/schema_registry/configuration.h"
19+
#include "pandaproxy/schema_registry/kafka_client_transport.h"
1920
#include "pandaproxy/schema_registry/schema_id_cache.h"
2021
#include "pandaproxy/schema_registry/service.h"
2122
#include "pandaproxy/schema_registry/sharded_store.h"
@@ -83,35 +84,34 @@ ss::future<> api::start() {
8384
return config::shard_local_cfg()
8485
.kafka_schema_id_validation_cache_capacity.bind();
8586
}));
86-
co_await _client.start(
87-
config::to_yaml(_client_cfg, config::redact_secrets::no),
88-
[this](std::exception_ptr ex) {
89-
return _service.local().mitigate_error(ex);
90-
});
87+
co_await _transport.start(
88+
std::ref(_client_cfg),
89+
std::ref(*_controller),
90+
ss::sharded_parameter([this]() {
91+
return kafka::data::rpc::topic_creator::make_default(
92+
_controller.get());
93+
}));
9194
co_await _sequencer.start(
9295
_node_id,
9396
_sg,
94-
std::ref(_client),
97+
ss::sharded_parameter(
98+
[this]() -> transport* { return &_transport.local(); }),
9599
std::ref(*_store),
96100
ss::sharded_parameter([this] {
97101
return std::make_unique<sequence_state_checker_impl>(_controller);
98102
}));
99103
co_await _service.start(
100104
config::to_yaml(_cfg, config::redact_secrets::no),
101-
config::to_yaml(_client_cfg, config::redact_secrets::no),
102105
_sg,
103106
_max_memory,
104-
std::ref(_client),
107+
ss::sharded_parameter(
108+
[this]() -> transport* { return &_transport.local(); }),
105109
std::ref(*_store),
106110
std::ref(_sequencer),
107111
ss::sharded_parameter([this]() {
108112
return kafka::data::rpc::topic_metadata_cache::make_default(
109113
_metadata_cache);
110114
}),
111-
ss::sharded_parameter([this]() {
112-
return kafka::data::rpc::topic_creator::make_default(
113-
_controller.get());
114-
}),
115115
std::ref(_controller),
116116
std::ref(_audit_mgr));
117117

@@ -143,10 +143,10 @@ ss::future<> api::stop() {
143143
// Reset gate to support api restart
144144
_metrics_gate = ss::gate{};
145145
}
146-
co_await _client.invoke_on_all(&kafka::client::client::stop);
146+
co_await _transport.invoke_on_all(&kafka_client_transport::stop);
147147
co_await _service.stop();
148148
co_await _sequencer.stop();
149-
co_await _client.stop();
149+
co_await _transport.stop();
150150
co_await _schema_id_cache.stop();
151151
co_await _schema_id_validation_probe.stop();
152152
if (_store) {

src/v/pandaproxy/schema_registry/api.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
#include "base/seastarx.h"
1515
#include "cluster/metrics_reporter.h"
16-
#include "kafka/client/fwd.h"
16+
#include "kafka/client/configuration.h"
1717
#include "model/metadata.h"
1818
#include "pandaproxy/schema_registry/fwd.h"
1919
#include "security/fwd.h"
@@ -73,7 +73,7 @@ class api {
7373
ss::sharded<cluster::metadata_cache>* _metadata_cache;
7474
std::unique_ptr<cluster::controller>& _controller;
7575

76-
ss::sharded<kafka::client::client> _client;
76+
ss::sharded<kafka_client_transport> _transport;
7777
std::unique_ptr<pandaproxy::schema_registry::sharded_store> _store;
7878
ss::sharded<schema_id_validation_probe> _schema_id_validation_probe;
7979
ss::sharded<schema_id_cache> _schema_id_cache;

src/v/pandaproxy/schema_registry/fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ class service;
2323
class sharded_store;
2424
class store;
2525
class transport;
26+
class kafka_client_transport;
2627

2728
} // namespace pandaproxy::schema_registry

src/v/pandaproxy/schema_registry/seq_writer.cc

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111

1212
#include "base/vassert.h"
1313
#include "base/vlog.h"
14-
#include "kafka/client/client_fetch_batch_reader.h"
1514
#include "model/namespace.h"
1615
#include "pandaproxy/logger.h"
1716
#include "pandaproxy/schema_registry/error.h"
1817
#include "pandaproxy/schema_registry/errors.h"
1918
#include "pandaproxy/schema_registry/exceptions.h"
2019
#include "pandaproxy/schema_registry/sharded_store.h"
2120
#include "pandaproxy/schema_registry/storage.h"
21+
#include "pandaproxy/schema_registry/transport.h"
2222
#include "pandaproxy/schema_registry/types.h"
2323
#include "storage/record_batch_builder.h"
2424

@@ -106,17 +106,7 @@ struct batch_builder : public storage::record_batch_builder {
106106
/// a REST API endpoint that requires global knowledge of latest
107107
/// data (i.e. any listings)
108108
ss::future<> seq_writer::read_sync() {
109-
auto offsets = co_await _client.local().list_offsets(
110-
model::schema_registry_internal_tp);
111-
if (
112-
offsets.data.topics.size() != 1
113-
|| offsets.data.topics[0].partitions.size() != 1) {
114-
throw kafka::exception(
115-
kafka::error_code::unknown_server_error,
116-
"Malformed ListOffsets Kafka response for internal topic");
117-
}
118-
119-
auto max_offset = offsets.data.topics[0].partitions[0].offset;
109+
auto max_offset = co_await _transport->get_high_watermark();
120110
co_await wait_for(max_offset - model::offset{1});
121111
co_await _store.process_marked_schemas();
122112
}
@@ -145,14 +135,12 @@ ss::future<> seq_writer::wait_for(model::offset offset) {
145135
"wait_for dirty! Reading {}..{}",
146136
seq._loaded_offset,
147137
offset);
148-
149-
return kafka::client::make_client_fetch_batch_reader(
150-
seq._client.local(),
151-
model::schema_registry_internal_tp,
152-
seq._loaded_offset + model::offset{1},
153-
offset + model::offset{1})
154-
.consume(
155-
consume_to_store{seq._store, seq}, model::no_timeout);
138+
consume_to_store c{seq._store, seq};
139+
return seq._transport->consume_range(
140+
seq._loaded_offset + model::offset{1},
141+
offset + model::offset{1},
142+
[c = std::move(c)](this auto, model::record_batch batch)
143+
-> ss::future<> { co_await c(std::move(batch)); });
156144
} else {
157145
vlog(srlog.trace, "wait_for clean (offset {})", offset);
158146
return ss::make_ready_future<>();
@@ -174,29 +162,25 @@ ss::future<bool> seq_writer::produce_and_apply(
174162
write_at.value_or(batch.base_offset()) == batch.base_offset(),
175163
"Set the base_offset to the expected write_at");
176164

177-
kafka::partition_produce_response res
178-
= co_await _client.local().produce_record_batch(
179-
model::schema_registry_internal_tp, batch.copy());
180-
181-
if (res.error_code != kafka::error_code::none) {
182-
throw kafka::exception(res.error_code, res.error_message.value_or(""));
183-
}
165+
auto result = co_await _transport->produce(batch.copy());
184166

185-
auto success = write_at.value_or(res.base_offset) == res.base_offset;
167+
auto success = write_at.value_or(result.base_offset) == result.base_offset;
186168
if (success) {
187169
vlog(
188-
srlog.debug, "seq_writer: Successful write at {}", res.base_offset);
170+
srlog.debug,
171+
"seq_writer: Successful write at {}",
172+
result.base_offset);
189173
co_await consume_to_store(_store, *this)(std::move(batch));
190174
co_await _store.process_marked_schemas();
191175
} else {
192176
vlog(
193177
srlog.debug,
194178
"seq_writer: Failed write at {} (wrote at {})",
195-
write_at,
196-
res.base_offset);
179+
write_at.value_or(model::offset{-1}),
180+
result.base_offset);
197181
}
198182
co_return success;
199-
};
183+
}
200184

201185
ss::future<> seq_writer::advance_offset(model::offset offset) {
202186
auto remote = [offset](seq_writer& s) { s.advance_offset_inner(offset); };

src/v/pandaproxy/schema_registry/seq_writer.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
#pragma once
1111

1212
#include "base/outcome.h"
13-
#include "kafka/client/client.h"
1413
#include "pandaproxy/logger.h"
1514
#include "pandaproxy/schema_registry/error.h"
1615
#include "pandaproxy/schema_registry/errors.h"
1716
#include "pandaproxy/schema_registry/exceptions.h"
1817
#include "pandaproxy/schema_registry/sharded_store.h"
18+
#include "pandaproxy/schema_registry/transport.h"
1919
#include "pandaproxy/schema_registry/types.h"
2020
#include "random/simple_time_jitter.h"
2121
#include "ssx/semaphore.h"
@@ -48,14 +48,16 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
4848
seq_writer(
4949
model::node_id node_id,
5050
ss::smp_service_group smp_group,
51-
ss::sharded<kafka::client::client>& client,
51+
transport* transport,
5252
sharded_store& store,
5353
std::unique_ptr<sequence_state_checker> state_checker)
5454
: _smp_opts(ss::smp_submit_to_options{smp_group})
55-
, _client(client)
55+
, _transport(transport)
5656
, _store(store)
5757
, _node_id(node_id)
58-
, _state_checker(std::move(state_checker)) {}
58+
, _state_checker(std::move(state_checker)) {
59+
vassert(_transport != nullptr, "Transport unexpectedly null");
60+
}
5961

6062
ss::future<> read_sync();
6163

@@ -92,7 +94,7 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
9294
private:
9395
ss::smp_submit_to_options _smp_opts;
9496

95-
ss::sharded<kafka::client::client>& _client;
97+
transport* _transport;
9698
sharded_store& _store;
9799

98100
model::node_id _node_id;

0 commit comments

Comments
 (0)