Skip to content

Commit 51b9d48

Browse files
committed
sr: Introduce kafka_client_transport
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
1 parent a535a27 commit 51b9d48

3 files changed

Lines changed: 445 additions & 0 deletions

File tree

src/v/pandaproxy/schema_registry/BUILD

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,38 @@ redpanda_cc_library(
7979
],
8080
)
8181

82+
redpanda_cc_library(
83+
name = "kafka_client_transport",
84+
srcs = [
85+
"kafka_client_transport.cc",
86+
],
87+
hdrs = [
88+
"kafka_client_transport.h",
89+
],
90+
implementation_deps = [
91+
":exceptions",
92+
"//src/v/cluster",
93+
"//src/v/cluster:ephemeral_credential_frontend",
94+
"//src/v/cluster:members_table",
95+
"//src/v/config",
96+
"//src/v/kafka/client:config_utils",
97+
"//src/v/kafka/client:exceptions",
98+
"//src/v/kafka/data/rpc",
99+
"//src/v/kafka/protocol:create_topics",
100+
"//src/v/kafka/server:topic_config_utils",
101+
"//src/v/model",
102+
"//src/v/pandaproxy:logger",
103+
"//src/v/security",
104+
],
105+
visibility = ["//visibility:public"],
106+
deps = [
107+
":transport",
108+
"//src/v/kafka/client",
109+
"//src/v/kafka/client:configuration",
110+
"@seastar",
111+
],
112+
)
113+
82114
redpanda_cc_library(
83115
name = "core",
84116
srcs = [
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/*
2+
* Copyright 2025 Redpanda Data, Inc.
3+
*
4+
* Use of this software is governed by the Business Source License
5+
* included in the file licenses/BSL.md
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
#include "pandaproxy/schema_registry/kafka_client_transport.h"
13+
14+
#include "cluster/cluster_link/frontend.h"
15+
#include "cluster/controller.h"
16+
#include "cluster/ephemeral_credential_frontend.h"
17+
#include "cluster/members_table.h"
18+
#include "cluster/security_frontend.h"
19+
#include "config/broker_authn_endpoint.h"
20+
#include "config/configuration.h"
21+
#include "kafka/client/client.h"
22+
#include "kafka/client/client_fetch_batch_reader.h"
23+
#include "kafka/client/config_utils.h"
24+
#include "kafka/client/exceptions.h"
25+
#include "kafka/data/rpc/deps.h"
26+
#include "kafka/protocol/create_topics.h"
27+
#include "kafka/server/handlers/topics/types.h"
28+
#include "model/namespace.h"
29+
#include "pandaproxy/logger.h"
30+
#include "pandaproxy/schema_registry/exceptions.h"
31+
#include "security/acl.h"
32+
#include "security/credential_store.h"
33+
#include "security/ephemeral_credential_store.h"
34+
35+
#include <seastar/core/coroutine.hh>
36+
#include <seastar/coroutine/parallel_for_each.hh>
37+
38+
#include <chrono>
39+
40+
using namespace std::chrono_literals;
41+
42+
namespace pandaproxy::schema_registry {
43+
44+
namespace {
45+
46+
ss::future<> create_acls(cluster::security_frontend& security_fe) {
47+
std::vector<security::acl_binding> principal_acl_binding{
48+
security::acl_binding{
49+
security::resource_pattern{
50+
security::resource_type::topic,
51+
model::schema_registry_internal_tp.topic,
52+
security::pattern_type::literal},
53+
security::acl_entry{
54+
security::schema_registry_principal,
55+
security::acl_host::wildcard_host(),
56+
security::acl_operation::all,
57+
security::acl_permission::allow}}};
58+
59+
auto err_vec = co_await security_fe.create_acls(principal_acl_binding, 5s);
60+
auto it = std::find_if(err_vec.begin(), err_vec.end(), [](const auto& err) {
61+
return err != cluster::errc::success;
62+
});
63+
64+
if (it != err_vec.end()) {
65+
vlog(
66+
srlog.warn,
67+
"Failed to create ACLs for {}, err {} - {}",
68+
security::schema_registry_principal,
69+
*it,
70+
cluster::make_error_code(*it).message());
71+
} else {
72+
vlog(
73+
srlog.debug,
74+
"Successfully created ACLs for {}",
75+
security::schema_registry_principal);
76+
}
77+
}
78+
79+
} // namespace
80+
81+
kafka_client_transport::kafka_client_transport(
82+
kafka::client::configuration& client_cfg,
83+
cluster::controller& controller,
84+
std::unique_ptr<kafka::data::rpc::topic_creator> topic_creator)
85+
: _client_config(client_cfg)
86+
, _controller(controller)
87+
, _client(
88+
std::make_unique<kafka::client::client>(
89+
config::to_yaml(_client_config, config::redact_secrets::no),
90+
[this](std::exception_ptr ex) {
91+
return mitigate_error(std::move(ex));
92+
}))
93+
, _topic_creator(std::move(topic_creator)) {}
94+
95+
kafka_client_transport::~kafka_client_transport() = default;
96+
97+
ss::future<> kafka_client_transport::stop() { co_await _client->stop(); }
98+
99+
ss::future<produce_result>
100+
kafka_client_transport::produce(model::record_batch batch) {
101+
try {
102+
auto res = co_await _client->produce_record_batch(
103+
model::schema_registry_internal_tp, std::move(batch));
104+
if (res.error_code != kafka::error_code::none) {
105+
throw kafka::exception(
106+
res.error_code, res.error_message.value_or(""));
107+
}
108+
co_return produce_result{.base_offset = res.base_offset};
109+
} catch (const kafka::client::partition_error& ex) {
110+
if (
111+
ex.error == kafka::error_code::unknown_topic_or_partition
112+
&& ex.tp.topic == model::schema_registry_internal_tp.topic) {
113+
throw exception(
114+
kafka::error_code::unknown_server_error,
115+
"_schemas topic does not exist");
116+
}
117+
throw;
118+
}
119+
}
120+
121+
ss::future<model::offset> kafka_client_transport::get_high_watermark() {
122+
try {
123+
auto offsets = co_await _client->list_offsets(
124+
model::schema_registry_internal_tp);
125+
if (
126+
offsets.data.topics.size() != 1
127+
|| offsets.data.topics[0].partitions.size() != 1) {
128+
throw kafka::exception(
129+
kafka::error_code::unknown_server_error,
130+
"Malformed ListOffsets Kafka response for internal topic");
131+
}
132+
co_return offsets.data.topics[0].partitions[0].offset;
133+
} catch (const kafka::client::partition_error& ex) {
134+
if (
135+
ex.error == kafka::error_code::unknown_topic_or_partition
136+
&& ex.tp.topic == model::schema_registry_internal_tp.topic) {
137+
throw exception(
138+
kafka::error_code::unknown_server_error,
139+
"_schemas topic does not exist");
140+
}
141+
throw;
142+
}
143+
}
144+
145+
ss::future<> kafka_client_transport::consume_range(
146+
model::offset start,
147+
model::offset end,
148+
ss::noncopyable_function<ss::future<>(model::record_batch)> consumer) {
149+
try {
150+
struct batch_consumer {
151+
ss::noncopyable_function<ss::future<>(model::record_batch)> fn;
152+
ss::future<ss::stop_iteration>
153+
operator()(model::record_batch batch) {
154+
return fn(std::move(batch)).then([] {
155+
return ss::stop_iteration::no;
156+
});
157+
}
158+
void end_of_stream() {}
159+
};
160+
auto rdr = kafka::client::make_client_fetch_batch_reader(
161+
*_client, model::schema_registry_internal_tp, start, end);
162+
co_await std::move(rdr).consume(
163+
batch_consumer{std::move(consumer)}, model::no_timeout);
164+
} catch (const kafka::client::partition_error& ex) {
165+
if (
166+
ex.error == kafka::error_code::unknown_topic_or_partition
167+
&& ex.tp.topic == model::schema_registry_internal_tp.topic) {
168+
throw exception(
169+
kafka::error_code::unknown_server_error,
170+
"_schemas topic does not exist");
171+
}
172+
throw;
173+
}
174+
}
175+
176+
ss::future<> kafka_client_transport::configure() {
177+
auto sasl_config = co_await kafka::client::create_client_credentials(
178+
_controller, _client_config, security::schema_registry_principal);
179+
_client->set_credentials(std::move(sasl_config));
180+
181+
const auto& store = _controller.get_ephemeral_credential_store().local();
182+
_has_ephemeral_credentials = store.has(
183+
store.find(security::schema_registry_principal));
184+
185+
if (_has_ephemeral_credentials) {
186+
vlog(srlog.info, "[configure] Creating ACLs for ephemeral credentials");
187+
co_await create_acls(_controller.get_security_frontend().local());
188+
}
189+
}
190+
191+
ss::future<> kafka_client_transport::mitigate_error(std::exception_ptr eptr) {
192+
vlog(srlog.warn, "mitigate_error: {}", eptr);
193+
return ss::make_exception_future<>(eptr)
194+
.handle_exception_type(
195+
[this, eptr](const kafka::client::broker_error& ex) {
196+
if (
197+
ex.error == kafka::error_code::sasl_authentication_failed
198+
&& _has_ephemeral_credentials) {
199+
return inform(ex.node_id).then([this]() {
200+
// This fully mitigates, don't rethrow.
201+
return _client->connect();
202+
});
203+
}
204+
205+
// Rethrow unhandled exceptions
206+
return ss::make_exception_future<>(eptr);
207+
})
208+
.handle_exception_type(
209+
[this, eptr](const kafka::client::partition_error& ex) {
210+
if (
211+
(ex.error == kafka::error_code::topic_authorization_failed
212+
|| ex.error == kafka::error_code::unknown_topic_or_partition)
213+
&& _has_ephemeral_credentials) {
214+
vlog(
215+
srlog.info,
216+
"Creating ACLs to mitigate partition error: {}",
217+
ex);
218+
return create_acls(_controller.get_security_frontend().local())
219+
.then([this]() { return _client->update_metadata(); });
220+
}
221+
222+
return ss::make_exception_future<>(eptr);
223+
})
224+
.handle_exception_type(
225+
[this, eptr](const kafka::client::topic_error& ex) {
226+
if (
227+
(ex.error == kafka::error_code::topic_authorization_failed
228+
|| ex.error == kafka::error_code::unknown_topic_or_partition)
229+
&& _has_ephemeral_credentials) {
230+
vlog(
231+
srlog.info,
232+
"Creating ACLs to mitigate partition error: {}",
233+
ex);
234+
return create_acls(_controller.get_security_frontend().local())
235+
.then([this]() { return _client->update_metadata(); });
236+
}
237+
238+
return ss::make_exception_future<>(eptr);
239+
});
240+
}
241+
242+
ss::future<> kafka_client_transport::inform(model::node_id id) {
243+
vlog(srlog.trace, "inform: {}", id);
244+
245+
// Inform a particular node
246+
if (id != kafka::client::unknown_node_id) {
247+
return do_inform(id);
248+
}
249+
250+
// Inform all nodes
251+
return seastar::parallel_for_each(
252+
_controller.get_members_table().local().node_ids(),
253+
[this](model::node_id id) { return do_inform(id); });
254+
}
255+
256+
ss::future<> kafka_client_transport::do_inform(model::node_id id) {
257+
auto& fe = _controller.get_ephemeral_credential_frontend().local();
258+
auto ec = co_await fe.inform(id, security::schema_registry_principal);
259+
vlog(srlog.info, "Informed: broker: {}, ec: {}", id, ec);
260+
}
261+
262+
ss::future<> kafka_client_transport::validate_topic_creation_authorization(
263+
int16_t replication_factor) {
264+
kafka::metadata_request req;
265+
req.data.topics = {kafka::metadata_request_topic{
266+
.name = model::schema_registry_internal_tp.topic}};
267+
req.data.include_topic_authorized_operations = true;
268+
auto resp = co_await _client->fetch_metadata(std::move(req));
269+
vlog(srlog.trace, "Validating topic creation authorization");
270+
// If authz is not enabled on the cluster, then no need to validate
271+
// authn/authz
272+
if (!config::kafka_authz_enabled()) {
273+
co_return;
274+
}
275+
276+
// If the client is not configured with a SCRAM user, it will be using
277+
// ephemeral credentials which are assumed to work
278+
if (!kafka::client::is_scram_configured(_client_config)) {
279+
co_return;
280+
}
281+
282+
kafka::creatable_topic ct{
283+
.name{model::schema_registry_internal_tp.topic},
284+
.num_partitions = 1,
285+
.replication_factor = replication_factor,
286+
};
287+
288+
auto res = co_await _client->create_topic(
289+
std::move(ct), kafka::client::client::validate_only_t::yes);
290+
291+
if (res.data.topics.size() != 1) {
292+
throw kafka::exception(
293+
kafka::error_code::unknown_server_error,
294+
"Malformed CreateTopics Kafka response for internal topic");
295+
}
296+
297+
const auto& topic_res = res.data.topics[0];
298+
if (
299+
topic_res.error_code == kafka::error_code::none
300+
|| topic_res.error_code == kafka::error_code::topic_already_exists
301+
|| (topic_res.error_code == kafka::error_code::topic_authorization_failed && shadow_linking_active())) {
302+
// if shadow linking is active, then the user must be a superuser to
303+
// create the topic via the Kafka API. To continue with normal
304+
// operations, we will assume the user is authorized to create the
305+
// topic.
306+
vlog(srlog.trace, "User is properly authorized");
307+
co_return;
308+
}
309+
throw kafka::exception(
310+
topic_res.error_code,
311+
fmt::format(
312+
"User is not authorized to create internal schema registry topic "
313+
"'{}'",
314+
model::schema_registry_internal_tp.topic));
315+
}
316+
317+
ss::future<cluster::errc> kafka_client_transport::create_topic(
318+
model::topic_namespace_view tp_ns,
319+
int32_t partition_count,
320+
cluster::topic_properties properties,
321+
std::optional<int16_t> replication_factor) {
322+
co_await validate_topic_creation_authorization(
323+
replication_factor.value_or(_controller.internal_topic_replication()));
324+
co_return co_await _topic_creator->create_topic(
325+
tp_ns, partition_count, std::move(properties), replication_factor);
326+
}
327+
328+
bool kafka_client_transport::has_ephemeral_credentials() const {
329+
return _has_ephemeral_credentials;
330+
}
331+
332+
bool kafka_client_transport::shadow_linking_active() const {
333+
const auto& clfe = _controller.get_cluster_link_frontend().local();
334+
return clfe.cluster_linking_enabled() && clfe.cluster_link_active();
335+
}
336+
337+
} // namespace pandaproxy::schema_registry

0 commit comments

Comments
 (0)