Skip to content

Commit e028da1

Browse files
committed
sr: Swap in transport for kafka::client in service & seq_writer
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent ed36548 commit e028da1

10 files changed

Lines changed: 104 additions & 394 deletions

File tree

src/v/pandaproxy/schema_registry/BUILD

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -145,32 +145,19 @@ redpanda_cc_library(
145145
"util.h",
146146
],
147147
implementation_deps = [
148-
"//src/v/bytes:iobuf_parser",
149148
"//src/v/bytes:streambuf",
150149
"//src/v/cluster",
151150
"//src/v/container:chunked_hash_map",
152-
"//src/v/container:chunked_vector",
153151
"//src/v/container:json",
154152
"//src/v/hashing:jump_consistent",
155153
"//src/v/hashing:xx",
156-
"//src/v/json",
157154
"//src/v/kafka/protocol",
158-
"//src/v/metrics",
159-
"//src/v/model",
160-
"//src/v/model:batch_compression",
161155
"//src/v/pandaproxy:core",
162-
"//src/v/pandaproxy:logger",
163-
"//src/v/random:time_jitter",
164156
"//src/v/schema/protobuf:confluent_type_cc_proto",
165157
"//src/v/schema/protobuf:google_type_cc_proto",
166158
"//src/v/ssx:future_util",
167-
"//src/v/ssx:semaphore",
168-
"//src/v/ssx:sformat",
169-
"//src/v/storage:record_batch_builder",
170-
"//src/v/strings:string_switch",
171159
"//src/v/utils:base64",
172160
"//src/v/utils:named_type",
173-
"//src/v/utils:retry",
174161
"//src/v/utils:to_string",
175162
"@abseil-cpp//absl/algorithm:container",
176163
"@abseil-cpp//absl/container:btree",
@@ -183,7 +170,6 @@ redpanda_cc_library(
183170
"@boost//:algorithm",
184171
"@boost//:graph",
185172
"@boost//:math",
186-
"@boost//:multi_index",
187173
"@boost//:outcome",
188174
"@boost//:range",
189175
"@fmt",
@@ -201,15 +187,26 @@ redpanda_cc_library(
201187
":config",
202188
":rjson",
203189
":subject_name_strategy",
190+
":transport",
204191
":types",
205192
"//src/v/base",
193+
"//src/v/bytes:iobuf_parser",
206194
"//src/v/config",
207195
"//src/v/config:startup_config",
208-
"//src/v/kafka/client",
209-
"//src/v/kafka/client:config_utils",
210-
"//src/v/kafka/client:configuration",
211-
"//src/v/kafka/client:exceptions",
196+
"//src/v/container:chunked_vector",
197+
"//src/v/json",
198+
"//src/v/metrics",
199+
"//src/v/model",
200+
"//src/v/model:batch_compression",
212201
"//src/v/pandaproxy:json",
202+
"//src/v/pandaproxy:logger",
203+
"//src/v/random:time_jitter",
204+
"//src/v/ssx:semaphore",
205+
"//src/v/ssx:sformat",
206+
"//src/v/storage:record_batch_builder",
207+
"//src/v/strings:string_switch",
208+
"//src/v/utils:retry",
209+
"@boost//:multi_index",
213210
"@seastar",
214211
],
215212
)
@@ -248,6 +245,7 @@ redpanda_cc_library(
248245
],
249246
implementation_deps = [
250247
":core",
248+
":kafka_client_transport",
251249
"//src/v/base",
252250
"//src/v/bytes:iobuf_parser",
253251
"//src/v/cluster",
@@ -266,9 +264,7 @@ redpanda_cc_library(
266264
"//src/v/container:chunked_vector",
267265
"//src/v/container:json",
268266
"//src/v/json",
269-
"//src/v/kafka/client:config_utils",
270267
"//src/v/kafka/client:configuration",
271-
"//src/v/kafka/client:exceptions",
272268
"//src/v/kafka/data/rpc",
273269
"//src/v/kafka/protocol",
274270
"//src/v/kafka/protocol:create_topics",
@@ -305,6 +301,7 @@ redpanda_cc_library(
305301
":config",
306302
":rjson",
307303
":subject_name_strategy",
304+
":transport",
308305
":types",
309306
"//src/v/cluster:client_quota_backend",
310307
"//src/v/cluster:cluster_link_table",
@@ -316,7 +313,6 @@ redpanda_cc_library(
316313
"//src/v/cluster:plugin_table",
317314
"//src/v/cluster:rpc_utils",
318315
"//src/v/config:startup_config",
319-
"//src/v/kafka/client",
320316
],
321317
)
322318

@@ -345,6 +341,7 @@ redpanda_cc_library(
345341
visibility = ["//visibility:public"],
346342
deps = [
347343
":server",
344+
":transport",
348345
"//src/v/config:startup_config",
349346
"//src/v/pandaproxy:core",
350347
],

src/v/pandaproxy/schema_registry/api.cc

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "kafka/data/rpc/deps.h"
1717
#include "pandaproxy/logger.h"
1818
#include "pandaproxy/schema_registry/configuration.h"
19+
#include "pandaproxy/schema_registry/kafka_client_transport.h"
1920
#include "pandaproxy/schema_registry/schema_id_cache.h"
2021
#include "pandaproxy/schema_registry/service.h"
2122
#include "pandaproxy/schema_registry/sharded_store.h"
@@ -83,38 +84,36 @@ ss::future<> api::start() {
8384
return config::shard_local_cfg()
8485
.kafka_schema_id_validation_cache_capacity.bind();
8586
}));
86-
co_await _client.start(
87-
config::to_yaml(_client_cfg, config::redact_secrets::no),
88-
[this](std::exception_ptr ex) {
89-
return _service.local().mitigate_error(ex);
90-
});
87+
co_await _transport.start(
88+
std::ref(_client_cfg),
89+
std::ref(*_controller),
90+
ss::sharded_parameter([this]() {
91+
return kafka::data::rpc::topic_creator::make_default(
92+
_controller.get());
93+
}));
9194
co_await _sequencer.start(
9295
_node_id,
9396
_sg,
94-
std::ref(_client),
97+
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
9598
std::ref(*_store),
9699
ss::sharded_parameter([this] {
97100
return std::make_unique<sequence_state_checker_impl>(_controller);
98101
}));
99102
co_await _service.start(
100103
config::to_yaml(_cfg, config::redact_secrets::no),
101-
config::to_yaml(_client_cfg, config::redact_secrets::no),
102104
_sg,
103105
_max_memory,
104-
std::ref(_client),
106+
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
105107
std::ref(*_store),
106108
std::ref(_sequencer),
107109
ss::sharded_parameter([this]() {
108110
return kafka::data::rpc::topic_metadata_cache::make_default(
109111
_metadata_cache);
110112
}),
111-
ss::sharded_parameter([this]() {
112-
return kafka::data::rpc::topic_creator::make_default(
113-
_controller.get());
114-
}),
115113
std::ref(_controller),
116114
std::ref(_audit_mgr));
117115

116+
co_await _transport.invoke_on_all(&kafka_client_transport::configure);
118117
co_await _service.invoke_on_all(&service::start);
119118

120119
if (ss::this_shard_id() == 0) {
@@ -143,10 +142,10 @@ ss::future<> api::stop() {
143142
// Reset gate to support api restart
144143
_metrics_gate = ss::gate{};
145144
}
146-
co_await _client.invoke_on_all(&kafka::client::client::stop);
145+
co_await _transport.invoke_on_all(&kafka_client_transport::stop);
147146
co_await _service.stop();
148147
co_await _sequencer.stop();
149-
co_await _client.stop();
148+
co_await _transport.stop();
150149
co_await _schema_id_cache.stop();
151150
co_await _schema_id_validation_probe.stop();
152151
if (_store) {
@@ -171,7 +170,7 @@ const kafka::client::configuration& api::get_client_config() const {
171170
}
172171

173172
bool api::has_ephemeral_credentials() const {
174-
return _service.local().has_ephemeral_credentials();
173+
return _transport.local().has_ephemeral_credentials();
175174
}
176175

177176
ss::future<> api::contribute_metrics(

src/v/pandaproxy/schema_registry/api.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
#include "base/seastarx.h"
1515
#include "cluster/metrics_reporter.h"
16-
#include "kafka/client/fwd.h"
16+
#include "kafka/client/configuration.h"
1717
#include "model/metadata.h"
1818
#include "pandaproxy/schema_registry/fwd.h"
1919
#include "security/fwd.h"
@@ -73,7 +73,7 @@ class api {
7373
ss::sharded<cluster::metadata_cache>* _metadata_cache;
7474
std::unique_ptr<cluster::controller>& _controller;
7575

76-
ss::sharded<kafka::client::client> _client;
76+
ss::sharded<kafka_client_transport> _transport;
7777
std::unique_ptr<pandaproxy::schema_registry::sharded_store> _store;
7878
ss::sharded<schema_id_validation_probe> _schema_id_validation_probe;
7979
ss::sharded<schema_id_cache> _schema_id_cache;

src/v/pandaproxy/schema_registry/fwd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,6 @@ class service;
2323
class sharded_store;
2424
class store;
2525
class transport;
26+
class kafka_client_transport;
2627

2728
} // namespace pandaproxy::schema_registry

src/v/pandaproxy/schema_registry/seq_writer.cc

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111

1212
#include "base/vassert.h"
1313
#include "base/vlog.h"
14-
#include "kafka/client/client_fetch_batch_reader.h"
1514
#include "model/namespace.h"
1615
#include "pandaproxy/logger.h"
1716
#include "pandaproxy/schema_registry/error.h"
1817
#include "pandaproxy/schema_registry/errors.h"
1918
#include "pandaproxy/schema_registry/exceptions.h"
2019
#include "pandaproxy/schema_registry/sharded_store.h"
2120
#include "pandaproxy/schema_registry/storage.h"
21+
#include "pandaproxy/schema_registry/transport.h"
2222
#include "pandaproxy/schema_registry/types.h"
2323
#include "storage/record_batch_builder.h"
2424

@@ -106,17 +106,7 @@ struct batch_builder : public storage::record_batch_builder {
106106
/// a REST API endpoint that requires global knowledge of latest
107107
/// data (i.e. any listings)
108108
ss::future<> seq_writer::read_sync() {
109-
auto offsets = co_await _client.local().list_offsets(
110-
model::schema_registry_internal_tp);
111-
if (
112-
offsets.data.topics.size() != 1
113-
|| offsets.data.topics[0].partitions.size() != 1) {
114-
throw kafka::exception(
115-
kafka::error_code::unknown_server_error,
116-
"Malformed ListOffsets Kafka response for internal topic");
117-
}
118-
119-
auto max_offset = offsets.data.topics[0].partitions[0].offset;
109+
auto max_offset = co_await _transport->get_high_watermark();
120110
co_await wait_for(max_offset - model::offset{1});
121111
co_await _store.process_marked_schemas();
122112
}
@@ -145,14 +135,10 @@ ss::future<> seq_writer::wait_for(model::offset offset) {
145135
"wait_for dirty! Reading {}..{}",
146136
seq._loaded_offset,
147137
offset);
148-
149-
return kafka::client::make_client_fetch_batch_reader(
150-
seq._client.local(),
151-
model::schema_registry_internal_tp,
152-
seq._loaded_offset + model::offset{1},
153-
offset + model::offset{1})
154-
.consume(
155-
consume_to_store{seq._store, seq}, model::no_timeout);
138+
return seq._transport->consume_range(
139+
seq._loaded_offset + model::offset{1},
140+
offset + model::offset{1},
141+
consume_to_store{seq._store, seq});
156142
} else {
157143
vlog(srlog.trace, "wait_for clean (offset {})", offset);
158144
return ss::make_ready_future<>();
@@ -174,29 +160,25 @@ ss::future<bool> seq_writer::produce_and_apply(
174160
write_at.value_or(batch.base_offset()) == batch.base_offset(),
175161
"Set the base_offset to the expected write_at");
176162

177-
kafka::partition_produce_response res
178-
= co_await _client.local().produce_record_batch(
179-
model::schema_registry_internal_tp, batch.copy());
180-
181-
if (res.error_code != kafka::error_code::none) {
182-
throw kafka::exception(res.error_code, res.error_message.value_or(""));
183-
}
163+
auto result = co_await _transport->produce(batch.copy());
184164

185-
auto success = write_at.value_or(res.base_offset) == res.base_offset;
165+
auto success = write_at.value_or(result.base_offset) == result.base_offset;
186166
if (success) {
187167
vlog(
188-
srlog.debug, "seq_writer: Successful write at {}", res.base_offset);
168+
srlog.debug,
169+
"seq_writer: Successful write at {}",
170+
result.base_offset);
189171
co_await consume_to_store(_store, *this)(std::move(batch));
190172
co_await _store.process_marked_schemas();
191173
} else {
192174
vlog(
193175
srlog.debug,
194176
"seq_writer: Failed write at {} (wrote at {})",
195-
write_at,
196-
res.base_offset);
177+
write_at.value_or(model::offset{-1}),
178+
result.base_offset);
197179
}
198180
co_return success;
199-
};
181+
}
200182

201183
ss::future<> seq_writer::advance_offset(model::offset offset) {
202184
auto remote = [offset](seq_writer& s) { s.advance_offset_inner(offset); };

src/v/pandaproxy/schema_registry/seq_writer.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
#pragma once
1111

1212
#include "base/outcome.h"
13-
#include "kafka/client/client.h"
1413
#include "pandaproxy/logger.h"
1514
#include "pandaproxy/schema_registry/error.h"
1615
#include "pandaproxy/schema_registry/errors.h"
1716
#include "pandaproxy/schema_registry/exceptions.h"
1817
#include "pandaproxy/schema_registry/sharded_store.h"
18+
#include "pandaproxy/schema_registry/transport.h"
1919
#include "pandaproxy/schema_registry/types.h"
2020
#include "random/simple_time_jitter.h"
2121
#include "ssx/semaphore.h"
@@ -48,11 +48,11 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
4848
seq_writer(
4949
model::node_id node_id,
5050
ss::smp_service_group smp_group,
51-
ss::sharded<kafka::client::client>& client,
51+
transport& transport,
5252
sharded_store& store,
5353
std::unique_ptr<sequence_state_checker> state_checker)
5454
: _smp_opts(ss::smp_submit_to_options{smp_group})
55-
, _client(client)
55+
, _transport(&transport)
5656
, _store(store)
5757
, _node_id(node_id)
5858
, _state_checker(std::move(state_checker)) {}
@@ -92,7 +92,7 @@ class seq_writer final : public ss::peering_sharded_service<seq_writer> {
9292
private:
9393
ss::smp_submit_to_options _smp_opts;
9494

95-
ss::sharded<kafka::client::client>& _client;
95+
transport* _transport;
9696
sharded_store& _store;
9797

9898
model::node_id _node_id;

0 commit comments

Comments
 (0)