diff --git a/src/v/pandaproxy/schema_registry/BUILD b/src/v/pandaproxy/schema_registry/BUILD index cb21018dbe960..f6b6d59d91a65 100644 --- a/src/v/pandaproxy/schema_registry/BUILD +++ b/src/v/pandaproxy/schema_registry/BUILD @@ -66,6 +66,53 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "transport", + hdrs = [ + "transport.h", + ], + visibility = ["//visibility:public"], + deps = [ + "//src/v/cluster:errc", + "//src/v/cluster:topic_configuration", + "//src/v/model", + "@seastar", + ], +) + +redpanda_cc_library( + name = "kafka_client_transport", + srcs = [ + "kafka_client_transport.cc", + ], + hdrs = [ + "kafka_client_transport.h", + ], + implementation_deps = [ + ":exceptions", + "//src/v/cluster", + "//src/v/cluster:ephemeral_credential_frontend", + "//src/v/cluster:members_table", + "//src/v/config", + "//src/v/kafka/client:config_utils", + "//src/v/kafka/client:exceptions", + "//src/v/kafka/data/rpc", + "//src/v/kafka/protocol:create_topics", + "//src/v/kafka/server:topic_config_utils", + "//src/v/model", + "//src/v/pandaproxy:logger", + "//src/v/security", + ], + visibility = ["//visibility:public"], + deps = [ + ":transport", + "//src/v/cluster:fwd", + "//src/v/kafka/client", + "//src/v/kafka/client:configuration", + "@seastar", + ], +) + redpanda_cc_library( name = "core", srcs = [ @@ -99,32 +146,19 @@ redpanda_cc_library( "util.h", ], implementation_deps = [ - "//src/v/bytes:iobuf_parser", "//src/v/bytes:streambuf", "//src/v/cluster", "//src/v/container:chunked_hash_map", - "//src/v/container:chunked_vector", "//src/v/container:json", "//src/v/hashing:jump_consistent", "//src/v/hashing:xx", - "//src/v/json", "//src/v/kafka/protocol", - "//src/v/metrics", - "//src/v/model", - "//src/v/model:batch_compression", "//src/v/pandaproxy:core", - "//src/v/pandaproxy:logger", - "//src/v/random:time_jitter", "//src/v/schema/protobuf:confluent_type_cc_proto", "//src/v/schema/protobuf:google_type_cc_proto", "//src/v/ssx:future_util", - "//src/v/ssx:semaphore", - "//src/v/ssx:sformat", - "//src/v/storage:record_batch_builder", - "//src/v/strings:string_switch", "//src/v/utils:base64", "//src/v/utils:named_type", - "//src/v/utils:retry", "//src/v/utils:to_string", "@abseil-cpp//absl/algorithm:container", "@abseil-cpp//absl/container:btree", @@ -137,7 +171,6 @@ redpanda_cc_library( "@boost//:algorithm", "@boost//:graph", "@boost//:math", - "@boost//:multi_index", "@boost//:outcome", "@boost//:range", "@fmt", @@ -155,16 +188,27 @@ redpanda_cc_library( ":config", ":rjson", ":subject_name_strategy", + ":transport", ":types", "//src/v/base", + "//src/v/bytes:iobuf_parser", "//src/v/config", "//src/v/config:startup_config", - "//src/v/kafka/client", - "//src/v/kafka/client:config_utils", - "//src/v/kafka/client:configuration", - "//src/v/kafka/client:exceptions", + "//src/v/container:chunked_vector", + "//src/v/json", + "//src/v/metrics", + "//src/v/model", + "//src/v/model:batch_compression", "//src/v/pandaproxy:json", + "//src/v/pandaproxy:logger", "//src/v/pandaproxy:parsing", + "//src/v/random:time_jitter", + "//src/v/ssx:semaphore", + "//src/v/ssx:sformat", + "//src/v/storage:record_batch_builder", + "//src/v/strings:string_switch", + "//src/v/utils:retry", + "@boost//:multi_index", "@seastar", ], ) @@ -203,6 +247,7 @@ redpanda_cc_library( ], implementation_deps = [ ":core", + ":kafka_client_transport", "//src/v/base", "//src/v/bytes:iobuf_parser", "//src/v/cluster", @@ -221,9 +266,7 @@ redpanda_cc_library( "//src/v/container:chunked_vector", "//src/v/container:json", "//src/v/json", - "//src/v/kafka/client:config_utils", "//src/v/kafka/client:configuration", - "//src/v/kafka/client:exceptions", "//src/v/kafka/data/rpc", "//src/v/kafka/protocol", "//src/v/kafka/protocol:create_topics", @@ -260,6 +303,7 @@ redpanda_cc_library( ":config", ":rjson", ":subject_name_strategy", + ":transport", ":types", "//src/v/cluster:client_quota_backend", "//src/v/cluster:cluster_link_table", @@ -271,7 +315,6 @@ redpanda_cc_library( "//src/v/cluster:plugin_table", "//src/v/cluster:rpc_utils", "//src/v/config:startup_config", - "//src/v/kafka/client", "//src/v/utils:variant", ], ) @@ -301,6 +344,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ ":server", + ":transport", "//src/v/config:startup_config", "//src/v/pandaproxy:core", ], diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index f9ddb17532a96..915b6e74ed98f 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -16,6 +16,7 @@ #include "kafka/data/rpc/deps.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/configuration.h" +#include "pandaproxy/schema_registry/kafka_client_transport.h" #include "pandaproxy/schema_registry/schema_id_cache.h" #include "pandaproxy/schema_registry/service.h" #include "pandaproxy/schema_registry/sharded_store.h" @@ -83,38 +84,36 @@ ss::future<> api::start() { return config::shard_local_cfg() .kafka_schema_id_validation_cache_capacity.bind(); })); - co_await _client.start( - config::to_yaml(_client_cfg, config::redact_secrets::no), - [this](std::exception_ptr ex) { - return _service.local().mitigate_error(ex); - }); + co_await _transport.start( + std::ref(_client_cfg), + std::ref(*_controller), + ss::sharded_parameter([this]() { + return kafka::data::rpc::topic_creator::make_default( + _controller.get()); + })); co_await _sequencer.start( _node_id, _sg, - std::ref(_client), + ss::sharded_parameter([this] { return std::ref(_transport.local()); }), std::ref(*_store), ss::sharded_parameter([this] { return std::make_unique(_controller); })); co_await _service.start( config::to_yaml(_cfg, config::redact_secrets::no), - config::to_yaml(_client_cfg, config::redact_secrets::no), _sg, _max_memory, - std::ref(_client), + ss::sharded_parameter([this] { return std::ref(_transport.local()); }), std::ref(*_store), std::ref(_sequencer), ss::sharded_parameter([this]() { return kafka::data::rpc::topic_metadata_cache::make_default( _metadata_cache); }), - ss::sharded_parameter([this]() { - return kafka::data::rpc::topic_creator::make_default( - _controller.get()); - }), std::ref(_controller), std::ref(_audit_mgr)); + co_await _transport.invoke_on_all(&kafka_client_transport::configure); co_await _service.invoke_on_all(&service::start); if (ss::this_shard_id() == 0) { @@ -143,10 +142,10 @@ ss::future<> api::stop() { // Reset gate to support api restart _metrics_gate = ss::gate{}; } - co_await _client.invoke_on_all(&kafka::client::client::stop); + co_await _transport.invoke_on_all(&kafka_client_transport::stop); co_await _service.stop(); co_await _sequencer.stop(); - co_await _client.stop(); + co_await _transport.stop(); co_await _schema_id_cache.stop(); co_await _schema_id_validation_probe.stop(); if (_store) { @@ -171,7 +170,7 @@ const kafka::client::configuration& api::get_client_config() const { } bool api::has_ephemeral_credentials() const { - return _service.local().has_ephemeral_credentials(); + return _transport.local().has_ephemeral_credentials(); } ss::future<> api::contribute_metrics( diff --git a/src/v/pandaproxy/schema_registry/api.h b/src/v/pandaproxy/schema_registry/api.h index 1cc36f95e62e1..5626cbf17ec81 100644 --- a/src/v/pandaproxy/schema_registry/api.h +++ b/src/v/pandaproxy/schema_registry/api.h @@ -13,7 +13,7 @@ #include "base/seastarx.h" #include "cluster/metrics_reporter.h" -#include "kafka/client/fwd.h" +#include "kafka/client/configuration.h" #include "model/metadata.h" #include "pandaproxy/schema_registry/fwd.h" #include "security/fwd.h" @@ -73,7 +73,7 @@ class api { ss::sharded* _metadata_cache; std::unique_ptr& _controller; - ss::sharded _client; + ss::sharded _transport; std::unique_ptr _store; ss::sharded _schema_id_validation_probe; ss::sharded _schema_id_cache; diff --git a/src/v/pandaproxy/schema_registry/fwd.h b/src/v/pandaproxy/schema_registry/fwd.h index b73d44dfb35f0..3b5f4cf11e5c8 100644 --- a/src/v/pandaproxy/schema_registry/fwd.h +++ b/src/v/pandaproxy/schema_registry/fwd.h @@ -22,5 +22,7 @@ class seq_writer; class service; class sharded_store; class store; +class transport; +class kafka_client_transport; } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/kafka_client_transport.cc b/src/v/pandaproxy/schema_registry/kafka_client_transport.cc new file mode 100644 index 0000000000000..c1f23f91b8624 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/kafka_client_transport.cc @@ -0,0 +1,334 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "pandaproxy/schema_registry/kafka_client_transport.h" + +#include "cluster/cluster_link/frontend.h" +#include "cluster/controller.h" +#include "cluster/ephemeral_credential_frontend.h" +#include "cluster/members_table.h" +#include "cluster/security_frontend.h" +#include "config/broker_authn_endpoint.h" +#include "config/configuration.h" +#include "kafka/client/client.h" +#include "kafka/client/client_fetch_batch_reader.h" +#include "kafka/client/config_utils.h" +#include "kafka/client/exceptions.h" +#include "kafka/data/rpc/deps.h" +#include "kafka/protocol/create_topics.h" +#include "kafka/server/handlers/topics/types.h" +#include "model/namespace.h" +#include "pandaproxy/logger.h" +#include "pandaproxy/schema_registry/exceptions.h" +#include "security/acl.h" +#include "security/credential_store.h" +#include "security/ephemeral_credential_store.h" + +#include +#include + +#include + +using namespace std::chrono_literals; + +namespace pandaproxy::schema_registry { + +namespace { + +ss::future<> create_acls(cluster::security_frontend& security_fe) { + std::vector principal_acl_binding{ + security::acl_binding{ + security::resource_pattern{ + security::resource_type::topic, + model::schema_registry_internal_tp.topic, + security::pattern_type::literal}, + security::acl_entry{ + security::schema_registry_principal, + security::acl_host::wildcard_host(), + security::acl_operation::all, + security::acl_permission::allow}}}; + + auto err_vec = co_await security_fe.create_acls(principal_acl_binding, 5s); + auto it = std::find_if(err_vec.begin(), err_vec.end(), [](const auto& err) { + return err != cluster::errc::success; + }); + + if (it != err_vec.end()) { + vlog( + srlog.warn, + "Failed to create ACLs for {}, err {} - {}", + security::schema_registry_principal, + *it, + cluster::make_error_code(*it).message()); + } else { + vlog( + srlog.debug, + "Successfully created ACLs for {}", + security::schema_registry_principal); + } +} + +[[noreturn]] void rethrow_partition_error(std::exception_ptr ep) { + try { + std::rethrow_exception(std::move(ep)); + } catch (kafka::client::partition_error& ex) { + if ( + ex.error == kafka::error_code::unknown_topic_or_partition + && ex.tp.topic == model::schema_registry_internal_tp.topic) { + throw exception( + kafka::error_code::unknown_server_error, + "_schemas topic does not exist"); + } + throw; + } +} + +} // namespace + +kafka_client_transport::kafka_client_transport( + kafka::client::configuration& client_cfg, + cluster::controller& controller, + std::unique_ptr topic_creator) + : _client_config(client_cfg) + , _controller(controller) + , _client( + std::make_unique( + config::to_yaml(_client_config, config::redact_secrets::no), + [this](std::exception_ptr ex) { + return mitigate_error(std::move(ex)); + })) + , _topic_creator(std::move(topic_creator)) {} + +kafka_client_transport::~kafka_client_transport() = default; + +ss::future<> kafka_client_transport::stop() { + _as.request_abort(); + co_await _client->stop(); +} + +ss::future +kafka_client_transport::produce(model::record_batch batch) { + auto res_f = co_await ss::coroutine::as_future( + _client->produce_record_batch( + model::schema_registry_internal_tp, std::move(batch))); + if (res_f.failed()) { + rethrow_partition_error(res_f.get_exception()); + } + auto res = std::move(res_f).get(); + if (res.error_code != kafka::error_code::none) { + throw kafka::exception(res.error_code, res.error_message.value_or("")); + } + co_return produce_result{.base_offset = res.base_offset}; +} + +ss::future kafka_client_transport::get_high_watermark() { + auto offsets_f = co_await ss::coroutine::as_future( + _client->list_offsets(model::schema_registry_internal_tp)); + if (offsets_f.failed()) { + rethrow_partition_error(offsets_f.get_exception()); + } + auto offsets = std::move(offsets_f).get(); + if ( + offsets.data.topics.size() != 1 + || offsets.data.topics[0].partitions.size() != 1) { + throw kafka::exception( + kafka::error_code::unknown_server_error, + "Malformed ListOffsets Kafka response for internal topic"); + } + co_return offsets.data.topics[0].partitions[0].offset; +} + +ss::future<> kafka_client_transport::consume_range( + model::offset start, + model::offset end, + ss::noncopyable_function(model::record_batch)> + consumer) { + struct batch_consumer { + ss::noncopyable_function( + model::record_batch)> + fn; + ss::future operator()(model::record_batch batch) { + return fn(std::move(batch)); + } + void end_of_stream() {} + }; + auto rdr = kafka::client::make_client_fetch_batch_reader( + *_client, model::schema_registry_internal_tp, start, end); + auto fut = co_await ss::coroutine::as_future( + std::move(rdr).consume( + batch_consumer{std::move(consumer)}, model::no_timeout)); + if (fut.failed()) { + rethrow_partition_error(fut.get_exception()); + } +} + +ss::future<> kafka_client_transport::configure() { + auto sasl_config = co_await kafka::client::create_client_credentials( + _controller, _client_config, security::schema_registry_principal); + _client->set_credentials(std::move(sasl_config)); + + const auto& store = _controller.get_ephemeral_credential_store().local(); + _has_ephemeral_credentials = store.has( + store.find(security::schema_registry_principal)); + + if (_has_ephemeral_credentials) { + vlog(srlog.info, "[configure] Creating ACLs for ephemeral credentials"); + co_await create_acls(_controller.get_security_frontend().local()); + } +} + +ss::future<> kafka_client_transport::mitigate_error(std::exception_ptr eptr) { + if (_as.abort_requested()) { + // Return so that the client doesn't try to mitigate. + return ss::now(); + } + vlog(srlog.warn, "mitigate_error: {}", eptr); + return ss::make_exception_future<>(eptr) + .handle_exception_type( + [this, eptr](const kafka::client::broker_error& ex) { + if ( + ex.error == kafka::error_code::sasl_authentication_failed + && _has_ephemeral_credentials) { + return inform(ex.node_id).then([this]() { + // This fully mitigates, don't rethrow. + return _client->connect(); + }); + } + + // Rethrow unhandled exceptions + return ss::make_exception_future<>(eptr); + }) + .handle_exception_type( + [this, eptr](const kafka::client::partition_error& ex) { + if ( + (ex.error == kafka::error_code::topic_authorization_failed + || ex.error == kafka::error_code::unknown_topic_or_partition) + && _has_ephemeral_credentials) { + vlog( + srlog.info, + "Creating ACLs to mitigate partition error: {}", + ex); + return create_acls(_controller.get_security_frontend().local()) + .then([this]() { return _client->update_metadata(); }); + } + + return ss::make_exception_future<>(eptr); + }) + .handle_exception_type([this, + eptr](const kafka::client::topic_error& ex) { + if ( + (ex.error == kafka::error_code::topic_authorization_failed + || ex.error == kafka::error_code::unknown_topic_or_partition) + && _has_ephemeral_credentials) { + vlog(srlog.info, "Creating ACLs to mitigate topic error: {}", ex); + return create_acls(_controller.get_security_frontend().local()) + .then([this]() { return _client->update_metadata(); }); + } + + return ss::make_exception_future<>(eptr); + }); +} + +ss::future<> kafka_client_transport::inform(model::node_id id) { + vlog(srlog.trace, "inform: {}", id); + + // Inform a particular node + if (id != kafka::client::unknown_node_id) { + co_await do_inform(id); + } else { + // Inform all nodes + co_await ss::coroutine::parallel_for_each( + _controller.get_members_table().local().node_ids(), + [this](model::node_id id) { return do_inform(id); }); + } +} + +ss::future<> kafka_client_transport::do_inform(model::node_id id) { + auto& fe = _controller.get_ephemeral_credential_frontend().local(); + auto ec = co_await fe.inform(id, security::schema_registry_principal); + vlog(srlog.info, "Informed: broker: {}, ec: {}", id, ec); +} + +ss::future<> kafka_client_transport::validate_topic_creation_authorization( + int16_t replication_factor) { + kafka::metadata_request req; + req.data.topics = {kafka::metadata_request_topic{ + .name = model::schema_registry_internal_tp.topic}}; + req.data.include_topic_authorized_operations = true; + auto resp = co_await _client->fetch_metadata(std::move(req)); + vlog(srlog.trace, "Validating topic creation authorization"); + // If authz is not enabled on the cluster, then no need to validate + // authn/authz + if (!config::kafka_authz_enabled()) { + co_return; + } + + // If the client is not configured with a SCRAM user, it will be using + // ephemeral credentials which are assumed to work + if (!kafka::client::is_scram_configured(_client_config)) { + co_return; + } + + kafka::creatable_topic ct{ + .name{model::schema_registry_internal_tp.topic}, + .num_partitions = 1, + .replication_factor = replication_factor, + }; + + auto res = co_await _client->create_topic( + std::move(ct), kafka::client::client::validate_only_t::yes); + + if (res.data.topics.size() != 1) { + throw kafka::exception( + kafka::error_code::unknown_server_error, + "Malformed CreateTopics Kafka response for internal topic"); + } + + const auto& topic_res = res.data.topics[0]; + if ( + topic_res.error_code == kafka::error_code::none + || topic_res.error_code == kafka::error_code::topic_already_exists + || (topic_res.error_code == kafka::error_code::topic_authorization_failed && shadow_linking_active())) { + // if shadow linking is active, then the user must be a superuser to + // create the topic via the Kafka API. To continue with normal + // operations, we will assume the user is authorized to create the + // topic. + vlog(srlog.trace, "User is properly authorized"); + co_return; + } + throw kafka::exception( + topic_res.error_code, + fmt::format( + "User is not authorized to create internal schema registry topic " + "'{}'", + model::schema_registry_internal_tp.topic)); +} + +ss::future kafka_client_transport::create_topic( + model::topic_namespace_view tp_ns, + int32_t partition_count, + cluster::topic_properties properties, + int16_t replication_factor) { + co_await validate_topic_creation_authorization(replication_factor); + co_return co_await _topic_creator->create_topic( + tp_ns, partition_count, std::move(properties), replication_factor); +} + +bool kafka_client_transport::has_ephemeral_credentials() const { + return _has_ephemeral_credentials; +} + +bool kafka_client_transport::shadow_linking_active() const { + const auto& clfe = _controller.get_cluster_link_frontend().local(); + return clfe.cluster_linking_enabled() && clfe.cluster_link_active(); +} + +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/kafka_client_transport.h b/src/v/pandaproxy/schema_registry/kafka_client_transport.h new file mode 100644 index 0000000000000..d7872b2dd999f --- /dev/null +++ b/src/v/pandaproxy/schema_registry/kafka_client_transport.h @@ -0,0 +1,74 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/fwd.h" +#include "kafka/client/configuration.h" +#include "kafka/client/fwd.h" +#include "pandaproxy/schema_registry/transport.h" + +#include +#include + +#include + +namespace kafka::data::rpc { +class topic_creator; +} // namespace kafka::data::rpc + +namespace pandaproxy::schema_registry { + +/// Transport implementation that wraps kafka::client::client for schema +/// registry internal topic I/O. This is the legacy/fallback transport. +class kafka_client_transport final : public transport { +public: + kafka_client_transport( + kafka::client::configuration& client_config, + cluster::controller& controller, + std::unique_ptr topic_creator); + ~kafka_client_transport() final; + + ss::future<> stop() final; + + ss::future produce(model::record_batch batch) override; + ss::future get_high_watermark() override; + ss::future<> consume_range( + model::offset start, + model::offset end, + ss::noncopyable_function< + ss::future(model::record_batch)> consumer) override; + + ss::future<> configure(); + ss::future create_topic( + model::topic_namespace_view, + int32_t partition_count, + cluster::topic_properties, + int16_t replication_factor) override; + bool has_ephemeral_credentials() const; + +private: + ss::future<> mitigate_error(std::exception_ptr eptr); + ss::future<> inform(model::node_id); + ss::future<> do_inform(model::node_id); + ss::future<> + validate_topic_creation_authorization(int16_t replication_factor); + bool shadow_linking_active() const; + + kafka::client::configuration& _client_config; + cluster::controller& _controller; + std::unique_ptr _client; + std::unique_ptr _topic_creator; + bool _has_ephemeral_credentials{false}; + ss::abort_source _as; +}; + +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/seq_writer.cc b/src/v/pandaproxy/schema_registry/seq_writer.cc index 1d44c494061f2..22f50a2e1c697 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.cc +++ b/src/v/pandaproxy/schema_registry/seq_writer.cc @@ -11,7 +11,6 @@ #include "base/vassert.h" #include "base/vlog.h" -#include "kafka/client/client_fetch_batch_reader.h" #include "model/namespace.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/error.h" @@ -19,6 +18,7 @@ #include "pandaproxy/schema_registry/exceptions.h" #include "pandaproxy/schema_registry/sharded_store.h" #include "pandaproxy/schema_registry/storage.h" +#include "pandaproxy/schema_registry/transport.h" #include "pandaproxy/schema_registry/types.h" #include "storage/record_batch_builder.h" @@ -106,17 +106,7 @@ struct batch_builder : public storage::record_batch_builder { /// a REST API endpoint that requires global knowledge of latest /// data (i.e. any listings) ss::future<> seq_writer::read_sync() { - auto offsets = co_await _client.local().list_offsets( - model::schema_registry_internal_tp); - if ( - offsets.data.topics.size() != 1 - || offsets.data.topics[0].partitions.size() != 1) { - throw kafka::exception( - kafka::error_code::unknown_server_error, - "Malformed ListOffsets Kafka response for internal topic"); - } - - auto max_offset = offsets.data.topics[0].partitions[0].offset; + auto max_offset = co_await _transport->get_high_watermark(); co_await wait_for(max_offset - model::offset{1}); co_await _store.process_marked_schemas(); } @@ -145,14 +135,10 @@ ss::future<> seq_writer::wait_for(model::offset offset) { "wait_for dirty! Reading {}..{}", seq._loaded_offset, offset); - - return kafka::client::make_client_fetch_batch_reader( - seq._client.local(), - model::schema_registry_internal_tp, - seq._loaded_offset + model::offset{1}, - offset + model::offset{1}) - .consume( - consume_to_store{seq._store, seq}, model::no_timeout); + return seq._transport->consume_range( + seq._loaded_offset + model::offset{1}, + offset + model::offset{1}, + consume_to_store{seq._store, seq}); } else { vlog(srlog.trace, "wait_for clean (offset {})", offset); return ss::make_ready_future<>(); @@ -174,29 +160,25 @@ ss::future seq_writer::produce_and_apply( write_at.value_or(batch.base_offset()) == batch.base_offset(), "Set the base_offset to the expected write_at"); - kafka::partition_produce_response res - = co_await _client.local().produce_record_batch( - model::schema_registry_internal_tp, batch.copy()); - - if (res.error_code != kafka::error_code::none) { - throw kafka::exception(res.error_code, res.error_message.value_or("")); - } + auto result = co_await _transport->produce(batch.copy()); - auto success = write_at.value_or(res.base_offset) == res.base_offset; + auto success = write_at.value_or(result.base_offset) == result.base_offset; if (success) { vlog( - srlog.debug, "seq_writer: Successful write at {}", res.base_offset); + srlog.debug, + "seq_writer: Successful write at {}", + result.base_offset); co_await consume_to_store(_store, *this)(std::move(batch)); co_await _store.process_marked_schemas(); } else { vlog( srlog.debug, "seq_writer: Failed write at {} (wrote at {})", - write_at, - res.base_offset); + write_at.value_or(model::offset{-1}), + result.base_offset); } co_return success; -}; +} ss::future<> seq_writer::advance_offset(model::offset offset) { auto remote = [offset](seq_writer& s) { s.advance_offset_inner(offset); }; diff --git a/src/v/pandaproxy/schema_registry/seq_writer.h b/src/v/pandaproxy/schema_registry/seq_writer.h index 6b0b6b50143af..8199112003b27 100644 --- a/src/v/pandaproxy/schema_registry/seq_writer.h +++ b/src/v/pandaproxy/schema_registry/seq_writer.h @@ -10,12 +10,12 @@ #pragma once #include "base/outcome.h" -#include "kafka/client/client.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/exceptions.h" #include "pandaproxy/schema_registry/sharded_store.h" +#include "pandaproxy/schema_registry/transport.h" #include "pandaproxy/schema_registry/types.h" #include "random/simple_time_jitter.h" #include "ssx/semaphore.h" @@ -48,11 +48,11 @@ class seq_writer final : public ss::peering_sharded_service { seq_writer( model::node_id node_id, ss::smp_service_group smp_group, - ss::sharded& client, + transport& transport, sharded_store& store, std::unique_ptr state_checker) : _smp_opts(ss::smp_submit_to_options{smp_group}) - , _client(client) + , _transport(&transport) , _store(store) , _node_id(node_id) , _state_checker(std::move(state_checker)) {} @@ -92,7 +92,7 @@ class seq_writer final : public ss::peering_sharded_service { private: ss::smp_submit_to_options _smp_opts; - ss::sharded& _client; + transport* _transport; sharded_store& _store; model::node_id _node_id; diff --git a/src/v/pandaproxy/schema_registry/service.cc b/src/v/pandaproxy/schema_registry/service.cc index b993efaabaffc..792521f8517b6 100644 --- a/src/v/pandaproxy/schema_registry/service.cc +++ b/src/v/pandaproxy/schema_registry/service.cc @@ -11,17 +11,10 @@ #include "cluster/cluster_link/frontend.h" #include "cluster/controller.h" -#include "cluster/ephemeral_credential_frontend.h" -#include "cluster/members_table.h" -#include "cluster/security_frontend.h" -#include "config/broker_authn_endpoint.h" #include "config/configuration.h" -#include "kafka/client/client_fetch_batch_reader.h" -#include "kafka/client/config_utils.h" -#include "kafka/client/exceptions.h" #include "kafka/data/rpc/deps.h" #include "kafka/protocol/errors.h" -#include "kafka/protocol/list_offset.h" +#include "kafka/protocol/exceptions.h" #include "kafka/server/handlers/topics/types.h" #include "model/fundamental.h" #include "model/namespace.h" @@ -31,6 +24,7 @@ #include "pandaproxy/schema_registry/auth.h" #include "pandaproxy/schema_registry/configuration.h" #include "pandaproxy/schema_registry/context_router.h" +#include "pandaproxy/schema_registry/exceptions.h" #include "pandaproxy/schema_registry/handlers.h" #include "pandaproxy/schema_registry/storage.h" #include "pandaproxy/schema_registry/types.h" @@ -38,16 +32,15 @@ #include "security/acl.h" #include "security/audit/audit_log_manager.h" #include "security/authorizer.h" -#include "security/ephemeral_credential_store.h" #include "security/request_auth.h" #include "ssx/semaphore.h" #include "utils/tristate.h" #include #include +#include #include #include -#include #include @@ -80,33 +73,22 @@ class wrap { co_await _os(); auto guard = _g.hold(); - try { - co_return co_await ss::visit( - _h, - [&](const auth::regular_function_handler& h) { - vassert( - !auth_result.has_value(), - "Authorization must not be deferred for non-deferred " - "endpoints"); - return h(std::move(rq), std::move(rp)); - }, - [&](const auth::deferred_function_handler& h) { - return h( - std::move(rq), - std::move(rp), - std::move(auth_result), - _operation_name); - }); - } catch (const kafka::client::partition_error& ex) { - if ( - ex.error == kafka::error_code::unknown_topic_or_partition - && ex.tp.topic == model::schema_registry_internal_tp.topic) { - throw exception( - kafka::error_code::unknown_server_error, - "_schemas topic does not exist"); - } - throw; - } + co_return co_await ss::visit( + _h, + [&](const auth::regular_function_handler& h) { + vassert( + !auth_result.has_value(), + "Authorization must not be deferred for non-deferred " + "endpoints"); + return h(std::move(rq), std::move(rp)); + }, + [&](const auth::deferred_function_handler& h) { + return h( + std::move(rq), + std::move(rp), + std::move(auth_result), + _operation_name); + }); } private: @@ -620,142 +602,60 @@ ss::future<> service::do_start() { std::current_exception()); throw; } - co_await container().invoke_on_all(_ctx.smp_sg, [](service& s) { - s._is_started = true; - return ss::this_shard_id() == seq_writer::reader_shard - ? s.fetch_internal_topic() - : ss::now(); - }); -} - -ss::future<> create_acls(cluster::security_frontend& security_fe) { - std::vector princpal_acl_binding{ - security::acl_binding{ - security::resource_pattern{ - security::resource_type::topic, - model::schema_registry_internal_tp.topic, - security::pattern_type::literal}, - security::acl_entry{ - security::schema_registry_principal, - security::acl_host::wildcard_host(), - security::acl_operation::all, - security::acl_permission::allow}}}; - - auto err_vec = co_await security_fe.create_acls(princpal_acl_binding, 5s); - auto it = std::find_if(err_vec.begin(), err_vec.end(), [](const auto& err) { - return err != cluster::errc::success; - }); - - if (it != err_vec.end()) { - vlog( - srlog.warn, - "Failed to create ACLs for {}, err {} - {}", - security::schema_registry_principal, - *it, - cluster::make_error_code(*it).message()); - } else { - vlog( - srlog.debug, - "Successfully created ACLs for {}", - security::schema_registry_principal); - } -} - -ss::future<> service::configure() { - auto sasl_config = co_await kafka::client::create_client_credentials( - *_controller, _client_config, security::schema_registry_principal); - co_await _client.invoke_on_all( - [sasl_config = std::move(sasl_config)](kafka::client::client& c) { - c.set_credentials(sasl_config); - }); - - const auto& store = _controller->get_ephemeral_credential_store().local(); - bool has_ephemeral_credentials = store.has( - store.find(security::schema_registry_principal)); co_await container().invoke_on_all( - _ctx.smp_sg, [has_ephemeral_credentials](service& s) { - s._has_ephemeral_credentials = has_ephemeral_credentials; + _ctx.smp_sg, [](this auto, service& s) -> ss::future<> { + s._is_started = true; + if (ss::this_shard_id() != seq_writer::reader_shard) { + co_return; + } + + using namespace std::chrono_literals; + + // create_internal_topic returns once the controller commits + // the topic, but the metadata cache and partition leadership + // are established asynchronously. Retry transient errors in + // that window with exponential backoff (100ms..5s, ~26s total). + constexpr int max_attempts = 10; + constexpr auto max_backoff = 5000ms; + auto backoff = 100ms; + for (int attempts = 0;; ++attempts) { + auto fut = co_await ss::coroutine::as_future( + s.fetch_internal_topic()); + if (!fut.failed()) { + co_return; + } + auto eptr = fut.get_exception(); + if (attempts >= max_attempts) { + std::rethrow_exception(eptr); + } + try { + std::rethrow_exception(eptr); + } catch (const kafka::exception_base& e) { + if (!kafka::is_retriable(e.error)) { + throw; + } + } catch (const exception& e) { + // kafka_client_transport wraps "topic missing" as + // unknown_server_error via schema_registry::exception. + // treat this as retriable and rethrow anything else. + if (e.code() != kafka::error_code::unknown_server_error) { + throw; + } + } + vlog( + srlog.info, + "Retriable error encountered while initializing the schemas " + "topic: {}. Retrying in {}ms", + eptr, + backoff.count()); + try { + co_await ss::sleep_abortable(backoff, s._as); + } catch (const ss::sleep_aborted&) { + std::rethrow_exception(eptr); + } + backoff = std::min(backoff * 2, max_backoff); + } }); - - if (_has_ephemeral_credentials) { - vlog(srlog.info, "[configure] Creating ACLs for ephemeral credentials"); - co_await create_acls(_controller->get_security_frontend().local()); - } -} - -ss::future<> service::mitigate_error(std::exception_ptr eptr) { - if (_gate.is_closed()) { - // Return so that the client doesn't try to mitigate. - return ss::now(); - } - vlog(srlog.warn, "mitigate_error: {}", eptr); - return ss::make_exception_future<>(eptr) - .handle_exception_type( - [this, eptr](const kafka::client::broker_error& ex) { - if ( - ex.error == kafka::error_code::sasl_authentication_failed - && _has_ephemeral_credentials) { - return inform(ex.node_id).then([this]() { - // This fully mitigates, don't rethrow. - return _client.local().connect(); - }); - } - - // Rethrow unhandled exceptions - return ss::make_exception_future<>(eptr); - }) - .handle_exception_type( - [this, eptr](const kafka::client::partition_error& ex) { - if ( - (ex.error == kafka::error_code::topic_authorization_failed - || ex.error == kafka::error_code::unknown_topic_or_partition) - && _has_ephemeral_credentials) { - vlog( - srlog.info, - "Creating ACLs to mitigate partition error: {}", - ex); - return create_acls(_controller->get_security_frontend().local()) - .then([this]() { return _client.local().update_metadata(); }); - } - - return ss::make_exception_future<>(eptr); - }) - .handle_exception_type( - [this, eptr](const kafka::client::topic_error& ex) { - if ( - (ex.error == kafka::error_code::topic_authorization_failed - || ex.error == kafka::error_code::unknown_topic_or_partition) - && _has_ephemeral_credentials) { - vlog( - srlog.info, - "Creating ACLs to mitigate partition error: {}", - ex); - return create_acls(_controller->get_security_frontend().local()) - .then([this]() { return _client.local().update_metadata(); }); - } - - return ss::make_exception_future<>(eptr); - }); -} - -ss::future<> service::inform(model::node_id id) { - vlog(srlog.trace, "inform: {}", id); - - // Inform a particular node - if (id != kafka::client::unknown_node_id) { - return do_inform(id); - } - - // Inform all nodes - return seastar::parallel_for_each( - _controller->get_members_table().local().node_ids(), - [this](model::node_id id) { return do_inform(id); }); -} - -ss::future<> service::do_inform(model::node_id id) { - auto& fe = _controller->get_ephemeral_credential_frontend().local(); - auto ec = co_await fe.inform(id, security::schema_registry_principal); - vlog(srlog.info, "Informed: broker: {}, ec: {}", id, ec); } ss::future<> service::create_internal_topic() { @@ -765,15 +665,16 @@ ss::future<> service::create_internal_topic() { vlog(srlog.debug, "Schema registry: found internal topic"); co_return; } - co_await validate_topic_creation_authorization(); - // If shadow linking is active and a link is actively mirroring the schema - // registry topic, then we will not create the topic and we will throw an - // error. This is so the oneshot doesn't become 'completed'. + + // If shadow linking is active and a link is actively mirroring the + // schema registry topic, then we will not create the topic and we will + // throw an error. This is so the oneshot doesn't become 'completed'. if (active_sr_mirroring()) { throw std::runtime_error( "Shadow Linking actively mirroring schema " "registry topic. Topic will not be created"); } + // Use the default topic replica count, unless our specific setting // for the schema registry chooses to override it. int16_t replication_factor @@ -806,12 +707,13 @@ ss::future<> service::create_internal_topic() { vlog( srlog.debug, - "Schema registry: attempting to create internal topic (replication={}, " + "Schema registry: attempting to create internal topic " + "(replication={}, " "properties={})", replication_factor, base_topic_config.properties); - auto res = co_await _topic_creator->create_topic( + auto res = co_await _transport->create_topic( {model::kafka_namespace, model::schema_registry_internal_tp.topic}, 1, std::move(base_topic_config.properties), @@ -835,25 +737,11 @@ ss::future<> service::fetch_internal_topic() { // TODO: should check the replication_factor of the topic is // what our config calls for - auto offset_res = co_await _client.local().list_offsets( - model::schema_registry_internal_tp); - if ( - offset_res.data.topics.size() != 1 - || offset_res.data.topics[0].partitions.size() != 1) { - throw kafka::exception( - kafka::error_code::unknown_server_error, - "Malformed ListOffsets Kafka response for internal topic"); - } - - auto max_offset = offset_res.data.topics[0].partitions[0].offset; + auto max_offset = co_await _transport->get_high_watermark(); vlog(srlog.debug, "Schema registry: _schemas max_offset: {}", max_offset); - co_await kafka::client::make_client_fetch_batch_reader( - _client.local(), - model::schema_registry_internal_tp, - model::offset{0}, - max_offset) - .consume(consume_to_store{_store, writer()}, model::no_timeout); + co_await _transport->consume_range( + model::offset{0}, max_offset, consume_to_store{_store, writer()}); // If a schema failed to be compiled, it will be marked. We attempt to // reprocess them once now that the whole topic has been read, in case @@ -861,90 +749,23 @@ ss::future<> service::fetch_internal_topic() { co_await _store.process_marked_schemas(); } -ss::future<> service::validate_topic_creation_authorization() { - kafka::metadata_request req; - req.data.topics = {kafka::metadata_request_topic{ - .name = model::schema_registry_internal_tp.topic}}; - req.data.include_topic_authorized_operations = true; - auto resp = co_await _client.local().fetch_metadata(std::move(req)); - vlog(srlog.trace, "Validating topic creation authorization"); - // If authz is not enabled on the cluster, then no need to validate - // authn/authz - if (!config::kafka_authz_enabled()) { - co_return; - } - - // If the client is not configured with a SCRAM user, it will be using - // ephemeral credentials which are assumed to work - if (!kafka::client::is_scram_configured(_client_config)) { - co_return; - } - - int16_t replication_factor - = _config.schema_registry_replication_factor().value_or( - _controller->internal_topic_replication()); - - kafka::creatable_topic ct{ - .name{model::schema_registry_internal_tp.topic}, - .num_partitions = 1, - .replication_factor = replication_factor, - }; - - auto res = co_await _client.local().create_topic( - std::move(ct), kafka::client::client::validate_only_t::yes); - - if (res.data.topics.size() != 1) { - throw kafka::exception( - kafka::error_code::unknown_server_error, - "Malformed CreateTopics Kafka response for internal topic"); - } - - const auto& topic_res = res.data.topics[0]; - if ( - topic_res.error_code == kafka::error_code::none - || topic_res.error_code == kafka::error_code::topic_already_exists - || (topic_res.error_code == kafka::error_code::topic_authorization_failed && shadow_linking_active())) { - // if shadow linking is active, then the user must be a superuser to - // create the topic via the Kafka API. To continue with normal - // operations, we will assume the user is authorized to create the - // topic. - vlog(srlog.trace, "User is properly authorized"); - co_return; - } - throw kafka::exception( - topic_res.error_code, - fmt::format( - "User is not authorized to create internal schema registry topic " - "'{}'", - model::schema_registry_internal_tp.topic)); -} - bool service::active_sr_mirroring() const { return _controller->get_cluster_link_frontend() .local() .schema_registry_shadowing_active(); } -bool service::shadow_linking_active() const { - const auto& clfe = _controller->get_cluster_link_frontend().local(); - - return clfe.cluster_linking_enabled() && clfe.cluster_link_active(); -} - service::service( const YAML::Node& config, - const YAML::Node& client_config, ss::smp_service_group smp_sg, size_t max_memory, - ss::sharded& client, + schema_registry::transport& transport, sharded_store& store, ss::sharded& sequencer, std::unique_ptr topic_metadata_cache, - std::unique_ptr topic_creator, std::unique_ptr& controller, ss::sharded& audit_mgr) : _config(config) - , _client_config(client_config) , _mem_sem(max_memory, "pproxy/schema-svc") , _inflight_sem( config::shard_local_cfg() @@ -952,7 +773,7 @@ service::service( , _inflight_config_binding( config::shard_local_cfg() .max_in_flight_schema_registry_requests_per_shard.bind()) - , _client(client) + , _transport(&transport) , _ctx{{{}, max_memory, _mem_sem, _inflight_config_binding(), _inflight_sem, {}, smp_sg}, *this} , _server( "schema_registry", // server_name @@ -967,7 +788,6 @@ service::service( , _store(store) , _writer(sequencer) , _topic_metadata_cache(std::move(topic_metadata_cache)) - , _topic_creator(std::move(topic_creator)) , _controller(controller) , _audit_mgr(audit_mgr) , _ensure_started{[this]() { return do_start(); }} @@ -983,7 +803,6 @@ service::service( } ss::future<> service::start() { - co_await configure(); static std::vector not_advertised{}; _server.routes(get_schema_registry_routes(_gate, _ensure_started)); co_return co_await _server.start( @@ -993,6 +812,7 @@ ss::future<> service::start() { } ss::future<> service::stop() { + _as.request_abort(); co_await _gate.close(); co_await _server.stop(); } diff --git a/src/v/pandaproxy/schema_registry/service.h b/src/v/pandaproxy/schema_registry/service.h index 9776768fa5630..55871b237b5c3 100644 --- a/src/v/pandaproxy/schema_registry/service.h +++ b/src/v/pandaproxy/schema_registry/service.h @@ -12,16 +12,17 @@ #pragma once #include "base/seastarx.h" -#include "kafka/client/client.h" #include "pandaproxy/schema_registry/configuration.h" #include "pandaproxy/schema_registry/seq_writer.h" #include "pandaproxy/schema_registry/sharded_store.h" +#include "pandaproxy/schema_registry/transport.h" #include "pandaproxy/server.h" #include "pandaproxy/util.h" #include "security/fwd.h" #include "security/request_auth.h" #include "utils/adjustable_semaphore.h" +#include #include #include #include @@ -33,7 +34,6 @@ namespace cluster { class controller; } namespace kafka::data::rpc { -class topic_creator; class topic_metadata_cache; } // namespace kafka::data::rpc namespace pandaproxy::schema_registry { @@ -42,15 +42,13 @@ class service : public ss::peering_sharded_service { public: service( const YAML::Node& config, - const YAML::Node& client_config, ss::smp_service_group smp_sg, size_t max_memory, - ss::sharded& client, + transport& transport, sharded_store& store, ss::sharded& sequencer, std::unique_ptr topic_metadata_cache, - std::unique_ptr topic_creator, std::unique_ptr&, ss::sharded& audit_mgr); @@ -58,12 +56,10 @@ class service : public ss::peering_sharded_service { ss::future<> stop(); configuration& config(); - ss::sharded& client() { return _client; } seq_writer& writer() { return _writer.local(); } sharded_store& schema_store() { return _store; } request_authenticator& authenticator() { return _auth; } security::authorizer& authorizor(); - ss::future<> mitigate_error(std::exception_ptr); ss::future<> ensure_started() { return _ensure_started(); } security::audit::audit_log_manager& audit_mgr() { return _audit_mgr.local(); @@ -71,40 +67,29 @@ class service : public ss::peering_sharded_service { std::unique_ptr& controller() { return _controller; } - bool has_ephemeral_credentials() const { - return _has_ephemeral_credentials; - } - private: ss::future<> do_start(); - ss::future<> configure(); - ss::future<> inform(model::node_id); - ss::future<> do_inform(model::node_id); ss::future<> create_internal_topic(); ss::future<> fetch_internal_topic(); - ss::future<> validate_topic_creation_authorization(); bool active_sr_mirroring() const; - bool shadow_linking_active() const; configuration _config; - kafka::client::configuration _client_config; ssx::semaphore _mem_sem; adjustable_semaphore _inflight_sem; config::binding _inflight_config_binding; ss::gate _gate; - ss::sharded& _client; + transport* _transport; ctx_server::context_t _ctx; ctx_server _server; sharded_store& _store; ss::sharded& _writer; std::unique_ptr _topic_metadata_cache; - std::unique_ptr _topic_creator; std::unique_ptr& _controller; ss::sharded& _audit_mgr; + ss::abort_source _as; one_shot _ensure_started; request_authenticator _auth; - bool _has_ephemeral_credentials{false}; bool _is_started{false}; }; diff --git a/src/v/pandaproxy/schema_registry/test/BUILD b/src/v/pandaproxy/schema_registry/test/BUILD index 626beeea996c4..8430a8029a458 100644 --- a/src/v/pandaproxy/schema_registry/test/BUILD +++ b/src/v/pandaproxy/schema_registry/test/BUILD @@ -201,6 +201,7 @@ redpanda_test_cc_library( visibility = ["//visibility:public"], deps = [ "//src/v/pandaproxy/schema_registry:core", + "//src/v/pandaproxy/schema_registry:transport", ], ) diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc b/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc index aa8803161943f..a6b4ad8effb66 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc @@ -79,23 +79,17 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_3rdparty) { auto fixture = pandaproxy::schema_registry::test_utils::store_fixture{}; auto& s = fixture.store(); - // This kafka client will not be used by the sequencer + // This transport will not be used by the sequencer // (which itself is only instantiated to receive consume_to_store's - // offset updates), is just needed for constructor; - ss::sharded dummy_kafka_client; - dummy_kafka_client - .start( - to_yaml(kafka::client::configuration{}, config::redact_secrets::no)) - .get(); - auto stop_kafka_client = ss::defer( - [&dummy_kafka_client]() { dummy_kafka_client.stop().get(); }); + // offset updates), is just needed for constructor. + noop_transport dummy_transport; ss::sharded seq; seq .start( model::node_id{0}, ss::default_smp_service_group(), - std::reference_wrapper(dummy_kafka_client), + std::ref(dummy_transport), std::reference_wrapper(s), ss::sharded_parameter( [] { return std::make_unique(); })) diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 437789479a7fb..48ef8207fc8cd 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -93,23 +93,17 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { s.start(pps::is_mutable::yes, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); - // This kafka client will not be used by the sequencer + // This transport will not be used by the sequencer // (which itself is only instantiated to receive consume_to_store's - // offset updates), is just needed for constructor; - ss::sharded dummy_kafka_client; - dummy_kafka_client - .start( - to_yaml(kafka::client::configuration{}, config::redact_secrets::no)) - .get(); - auto stop_kafka_client = ss::defer( - [&dummy_kafka_client]() { dummy_kafka_client.stop().get(); }); + // offset updates), is just needed for constructor. + noop_transport dummy_transport; ss::sharded seq; seq .start( model::node_id{0}, ss::default_smp_service_group(), - std::reference_wrapper(dummy_kafka_client), + std::ref(dummy_transport), std::reference_wrapper(s), ss::sharded_parameter( [] { return std::make_unique(); })) @@ -220,23 +214,17 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store_after_compaction) { s.start(pps::is_mutable::no, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); - // This kafka client will not be used by the sequencer + // This transport will not be used by the sequencer // (which itself is only instantiated to receive consume_to_store's - // offset updates), is just needed for constructor; - ss::sharded dummy_kafka_client; - dummy_kafka_client - .start( - to_yaml(kafka::client::configuration{}, config::redact_secrets::no)) - .get(); - auto stop_kafka_client = ss::defer( - [&dummy_kafka_client]() { dummy_kafka_client.stop().get(); }); + // offset updates), is just needed for constructor. + noop_transport dummy_transport; ss::sharded seq; seq .start( model::node_id{0}, ss::default_smp_service_group(), - std::reference_wrapper(dummy_kafka_client), + std::ref(dummy_transport), std::reference_wrapper(s), ss::sharded_parameter( [] { return std::make_unique(); })) @@ -280,23 +268,17 @@ SEASTAR_THREAD_TEST_CASE(test_writes_disabled) { s.start(pps::is_mutable::no, ss::default_smp_service_group()).get(); auto stop_store = ss::defer([&s]() { s.stop().get(); }); - // This kafka client will not be used by the sequencer + // This transport will not be used by the sequencer // (which itself is only instantiated to receive consume_to_store's - // offset updates), is just needed for constructor; - ss::sharded dummy_kafka_client; - dummy_kafka_client - .start( - to_yaml(kafka::client::configuration{}, config::redact_secrets::no)) - .get(); - auto stop_kafka_client = ss::defer( - [&dummy_kafka_client]() { dummy_kafka_client.stop().get(); }); + // offset updates), is just needed for constructor. + noop_transport dummy_transport; ss::sharded seq; seq .start( model::node_id{0}, ss::default_smp_service_group(), - std::reference_wrapper(dummy_kafka_client), + std::ref(dummy_transport), std::reference_wrapper(s), ss::sharded_parameter([] { return std::make_unique( diff --git a/src/v/pandaproxy/schema_registry/test/utils.h b/src/v/pandaproxy/schema_registry/test/utils.h index c226e710ff622..ccc45fae7a2c6 100644 --- a/src/v/pandaproxy/schema_registry/test/utils.h +++ b/src/v/pandaproxy/schema_registry/test/utils.h @@ -9,6 +9,9 @@ #pragma once #include "pandaproxy/schema_registry/seq_writer.h" +#include "pandaproxy/schema_registry/transport.h" + +#include class sequence_state_checker_test : public pandaproxy::schema_registry::sequence_state_checker { @@ -21,3 +24,34 @@ class sequence_state_checker_test private: writes_disabled_t _wd; }; + +/// No-op transport used in tests where seq_writer is only instantiated to +/// receive consume_to_store offset updates. +class noop_transport final : public pandaproxy::schema_registry::transport { +public: + ss::future<> stop() final { return ss::now(); } + ss::future + produce(model::record_batch) override { + throw std::runtime_error("noop_transport::produce not implemented"); + } + ss::future get_high_watermark() override { + throw std::runtime_error( + "noop_transport::get_high_watermark not implemented"); + } + ss::future<> consume_range( + model::offset, + model::offset, + ss::noncopyable_function< + ss::future(model::record_batch)>) override { + throw std::runtime_error( + "noop_transport::consume_range not implemented"); + } + ss::future create_topic( + model::topic_namespace_view, + int32_t, + cluster::topic_properties, + int16_t) final { + throw std::runtime_error( + "noop_transport::create_topic not implemented"); + } +}; diff --git a/src/v/pandaproxy/schema_registry/transport.h b/src/v/pandaproxy/schema_registry/transport.h new file mode 100644 index 0000000000000..cc2b348543c62 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/transport.h @@ -0,0 +1,69 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "cluster/errc.h" +#include "cluster/topic_properties.h" +#include "model/fundamental.h" +#include "model/record.h" + +#include +#include +#include +#include + +namespace pandaproxy::schema_registry { + +/// Result of a produce operation — includes offset for collision detection. +struct produce_result { + model::offset base_offset; +}; + +/// Abstract transport for schema registry's internal topic I/O. +/// +/// Currently the only implementation wraps kafka::client. A kafka::data::rpc +/// based transport (no auth overhead) is planned. +class transport { +public: + transport() = default; + virtual ~transport() = default; + transport(const transport&) = delete; + transport& operator=(const transport&) = delete; + transport(transport&&) = delete; + transport& operator=(transport&&) = delete; + + virtual ss::future<> stop() = 0; + + /// Produce a batch to the _schemas topic. Returns the base_offset. + virtual ss::future produce(model::record_batch batch) = 0; + + /// Get the high watermark (next offset) for the _schemas topic. + virtual ss::future get_high_watermark() = 0; + + /// Consume batches from [start, end) on the _schemas topic. + /// Calls consumer(batch) for each batch. Handles pagination internally. + /// Returning stop_iteration::yes halts consumption early. + virtual ss::future<> consume_range( + model::offset start, + model::offset end, + ss::noncopyable_function< + ss::future(model::record_batch)> consumer) = 0; + + /// Create the internal schema registry topic. + virtual ss::future create_topic( + model::topic_namespace_view, + int32_t partition_count, + cluster::topic_properties, + int16_t replication_factor) = 0; +}; + +} // namespace pandaproxy::schema_registry