Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 65 additions & 21 deletions src/v/pandaproxy/schema_registry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,53 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "transport",
hdrs = [
"transport.h",
],
visibility = ["//visibility:public"],
deps = [
"//src/v/cluster:errc",
"//src/v/cluster:topic_configuration",
"//src/v/model",
"@seastar",
],
)

redpanda_cc_library(
name = "kafka_client_transport",
srcs = [
"kafka_client_transport.cc",
],
hdrs = [
"kafka_client_transport.h",
],
implementation_deps = [
":exceptions",
"//src/v/cluster",
Comment thread
oleiman marked this conversation as resolved.
"//src/v/cluster:ephemeral_credential_frontend",
"//src/v/cluster:members_table",
"//src/v/config",
"//src/v/kafka/client:config_utils",
"//src/v/kafka/client:exceptions",
"//src/v/kafka/data/rpc",
"//src/v/kafka/protocol:create_topics",
"//src/v/kafka/server:topic_config_utils",
"//src/v/model",
"//src/v/pandaproxy:logger",
"//src/v/security",
],
visibility = ["//visibility:public"],
deps = [
":transport",
"//src/v/cluster:fwd",
"//src/v/kafka/client",
"//src/v/kafka/client:configuration",
"@seastar",
],
)

redpanda_cc_library(
name = "core",
srcs = [
Expand Down Expand Up @@ -99,32 +146,19 @@ redpanda_cc_library(
"util.h",
],
implementation_deps = [
"//src/v/bytes:iobuf_parser",
"//src/v/bytes:streambuf",
"//src/v/cluster",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/container:json",
"//src/v/hashing:jump_consistent",
"//src/v/hashing:xx",
"//src/v/json",
"//src/v/kafka/protocol",
"//src/v/metrics",
"//src/v/model",
"//src/v/model:batch_compression",
"//src/v/pandaproxy:core",
"//src/v/pandaproxy:logger",
"//src/v/random:time_jitter",
"//src/v/schema/protobuf:confluent_type_cc_proto",
"//src/v/schema/protobuf:google_type_cc_proto",
"//src/v/ssx:future_util",
"//src/v/ssx:semaphore",
"//src/v/ssx:sformat",
"//src/v/storage:record_batch_builder",
"//src/v/strings:string_switch",
"//src/v/utils:base64",
"//src/v/utils:named_type",
"//src/v/utils:retry",
"//src/v/utils:to_string",
"@abseil-cpp//absl/algorithm:container",
"@abseil-cpp//absl/container:btree",
Expand All @@ -137,7 +171,6 @@ redpanda_cc_library(
"@boost//:algorithm",
"@boost//:graph",
"@boost//:math",
"@boost//:multi_index",
"@boost//:outcome",
"@boost//:range",
"@fmt",
Expand All @@ -155,16 +188,27 @@ redpanda_cc_library(
":config",
":rjson",
":subject_name_strategy",
":transport",
":types",
"//src/v/base",
"//src/v/bytes:iobuf_parser",
"//src/v/config",
"//src/v/config:startup_config",
"//src/v/kafka/client",
"//src/v/kafka/client:config_utils",
"//src/v/kafka/client:configuration",
"//src/v/kafka/client:exceptions",
"//src/v/container:chunked_vector",
"//src/v/json",
"//src/v/metrics",
"//src/v/model",
"//src/v/model:batch_compression",
"//src/v/pandaproxy:json",
"//src/v/pandaproxy:logger",
"//src/v/pandaproxy:parsing",
"//src/v/random:time_jitter",
"//src/v/ssx:semaphore",
"//src/v/ssx:sformat",
"//src/v/storage:record_batch_builder",
"//src/v/strings:string_switch",
"//src/v/utils:retry",
"@boost//:multi_index",
"@seastar",
],
)
Expand Down Expand Up @@ -203,6 +247,7 @@ redpanda_cc_library(
],
implementation_deps = [
":core",
":kafka_client_transport",
"//src/v/base",
"//src/v/bytes:iobuf_parser",
"//src/v/cluster",
Expand All @@ -221,9 +266,7 @@ redpanda_cc_library(
"//src/v/container:chunked_vector",
"//src/v/container:json",
"//src/v/json",
"//src/v/kafka/client:config_utils",
"//src/v/kafka/client:configuration",
"//src/v/kafka/client:exceptions",
"//src/v/kafka/data/rpc",
"//src/v/kafka/protocol",
"//src/v/kafka/protocol:create_topics",
Expand Down Expand Up @@ -260,6 +303,7 @@ redpanda_cc_library(
":config",
":rjson",
":subject_name_strategy",
":transport",
":types",
"//src/v/cluster:client_quota_backend",
"//src/v/cluster:cluster_link_table",
Expand All @@ -271,7 +315,6 @@ redpanda_cc_library(
"//src/v/cluster:plugin_table",
"//src/v/cluster:rpc_utils",
"//src/v/config:startup_config",
"//src/v/kafka/client",
"//src/v/utils:variant",
],
)
Expand Down Expand Up @@ -301,6 +344,7 @@ redpanda_cc_library(
visibility = ["//visibility:public"],
deps = [
":server",
":transport",
"//src/v/config:startup_config",
"//src/v/pandaproxy:core",
],
Expand Down
29 changes: 14 additions & 15 deletions src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "kafka/data/rpc/deps.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/configuration.h"
#include "pandaproxy/schema_registry/kafka_client_transport.h"
#include "pandaproxy/schema_registry/schema_id_cache.h"
#include "pandaproxy/schema_registry/service.h"
#include "pandaproxy/schema_registry/sharded_store.h"
Expand Down Expand Up @@ -83,38 +84,36 @@ ss::future<> api::start() {
return config::shard_local_cfg()
.kafka_schema_id_validation_cache_capacity.bind();
}));
co_await _client.start(
config::to_yaml(_client_cfg, config::redact_secrets::no),
[this](std::exception_ptr ex) {
return _service.local().mitigate_error(ex);
});
co_await _transport.start(
std::ref(_client_cfg),
std::ref(*_controller),
ss::sharded_parameter([this]() {
return kafka::data::rpc::topic_creator::make_default(
_controller.get());
}));
co_await _sequencer.start(
_node_id,
_sg,
std::ref(_client),
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
std::ref(*_store),
ss::sharded_parameter([this] {
return std::make_unique<sequence_state_checker_impl>(_controller);
}));
co_await _service.start(
config::to_yaml(_cfg, config::redact_secrets::no),
config::to_yaml(_client_cfg, config::redact_secrets::no),
_sg,
_max_memory,
std::ref(_client),
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
std::ref(*_store),
std::ref(_sequencer),
ss::sharded_parameter([this]() {
return kafka::data::rpc::topic_metadata_cache::make_default(
_metadata_cache);
}),
ss::sharded_parameter([this]() {
return kafka::data::rpc::topic_creator::make_default(
_controller.get());
}),
std::ref(_controller),
std::ref(_audit_mgr));

co_await _transport.invoke_on_all(&kafka_client_transport::configure);
co_await _service.invoke_on_all(&service::start);

if (ss::this_shard_id() == 0) {
Expand Down Expand Up @@ -143,10 +142,10 @@ ss::future<> api::stop() {
// Reset gate to support api restart
_metrics_gate = ss::gate{};
}
co_await _client.invoke_on_all(&kafka::client::client::stop);
co_await _transport.invoke_on_all(&kafka_client_transport::stop);
co_await _service.stop();
co_await _sequencer.stop();
co_await _client.stop();
co_await _transport.stop();
co_await _schema_id_cache.stop();
co_await _schema_id_validation_probe.stop();
if (_store) {
Expand All @@ -171,7 +170,7 @@ const kafka::client::configuration& api::get_client_config() const {
}

bool api::has_ephemeral_credentials() const {
return _service.local().has_ephemeral_credentials();
return _transport.local().has_ephemeral_credentials();
}

ss::future<> api::contribute_metrics(
Expand Down
4 changes: 2 additions & 2 deletions src/v/pandaproxy/schema_registry/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

#include "base/seastarx.h"
#include "cluster/metrics_reporter.h"
#include "kafka/client/fwd.h"
#include "kafka/client/configuration.h"
#include "model/metadata.h"
#include "pandaproxy/schema_registry/fwd.h"
#include "security/fwd.h"
Expand Down Expand Up @@ -73,7 +73,7 @@ class api {
ss::sharded<cluster::metadata_cache>* _metadata_cache;
std::unique_ptr<cluster::controller>& _controller;

ss::sharded<kafka::client::client> _client;
ss::sharded<kafka_client_transport> _transport;
std::unique_ptr<pandaproxy::schema_registry::sharded_store> _store;
ss::sharded<schema_id_validation_probe> _schema_id_validation_probe;
ss::sharded<schema_id_cache> _schema_id_cache;
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/schema_registry/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ class seq_writer;
class service;
class sharded_store;
class store;
class transport;
class kafka_client_transport;

} // namespace pandaproxy::schema_registry
Loading
Loading