[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
Conversation
There was a problem hiding this comment.
Pull request overview
Refactors Schema Registry’s internal _schemas topic I/O to go through a new transport abstraction, enabling alternative backends (e.g., RPC-based) while keeping existing Kafka-client behavior available via kafka_client_transport.
Changes:
- Introduce
pandaproxy::schema_registry::transportand factor internal topic operations (produce/consume/HWM/auth/mitigation) behind it. - Add
kafka_client_transportimplementation and wireapi,service, andseq_writerto use the transport pointer. - Update tests to use a
noop_transportand add a collision-simulation transport to exercise delete retry behavior.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/v/pandaproxy/schema_registry/transport.h | Adds the new transport interface used by SR components. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.h | Declares Kafka-client-backed transport implementation. |
| src/v/pandaproxy/schema_registry/kafka_client_transport.cc | Implements Kafka-client-backed transport behaviors. |
| src/v/pandaproxy/schema_registry/service.h | Switches SR service dependency from kafka client to transport. |
| src/v/pandaproxy/schema_registry/service.cc | Routes internal topic init/load through transport and adds retry logic. |
| src/v/pandaproxy/schema_registry/seq_writer.h | Switches sequencer dependency from kafka client to transport; adds delete retry cache state. |
| src/v/pandaproxy/schema_registry/seq_writer.cc | Uses transport for produce/consume/HWM and adds delete collision retry behavior. |
| src/v/pandaproxy/schema_registry/api.h | Replaces sharded Kafka client with sharded Kafka transport. |
| src/v/pandaproxy/schema_registry/api.cc | Wires up transport lifecycle and passes transport pointers to service/sequencer. |
| src/v/pandaproxy/schema_registry/fwd.h | Adds forward declarations for new transport types. |
| src/v/pandaproxy/schema_registry/BUILD | Adds new transport and kafka_client_transport Bazel targets and updates deps. |
| src/v/pandaproxy/schema_registry/test/utils.h | Adds noop_transport for tests that don’t use topic I/O. |
| src/v/pandaproxy/schema_registry/test/consume_to_store.cc | Updates tests to use noop_transport; adds collision transport test. |
| src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc | Updates test to use noop_transport instead of dummy Kafka client. |
| src/v/pandaproxy/schema_registry/test/BUILD | Updates test deps to include new transport targets. |
0e4a6d3 to
4502f20
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/v/pandaproxy/schema_registry/seq_writer.cc:620
_delete_versions_cacheis populated before several operations that can throw (e.g.,is_referenced, tombstone lookups). If an exception is thrown after caching, the cache persists beyond this delete attempt and a later call for the same subject (when it is already soft-deleted) can incorrectly return this stale cached version list. Consider clearing the cache on any exception path after it is set (e.g., a scope guard / try-catch that resets_delete_versions_cachebefore rethrowing), or delaying cache population until after all non-retriable validations have passed.
// Cache versions for potential retry — after a subject-level soft
// delete all versions are marked deleted and the pre-delete list
// cannot be reconstructed from the store. Tagged with subject so
// stale entries from a prior delete of a different subject are ignored.
_delete_versions_cache.emplace(delete_version_cache{sub, versions.copy()});
// Check that the subject is not referenced
if (co_await _store.is_referenced(sub, std::nullopt)) {
throw as_exception(has_references(sub, versions.back()));
}
4502f20 to
7c45444
Compare
7c45444 to
b73bcef
Compare
b73bcef to
23114c7
Compare
Review summaryNice refactor — the Bugs / correctness
Nits
Design observations (no action needed)
Overall looks good — main ask is to take a look at the retry budget / predicate in |
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
65cad38 to
01aa966
Compare
CI test resultstest results on build#83603
test results on build#83618
|
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
do_delete_subject_impermanent called get_versions(include_deleted::no) before is_subject_deleted(). On retry after a write collision where the subject was already soft-deleted by the winning writer, get_versions throws subject_not_found, which propagates as HTTP 404 to the client. Fix by checking is_subject_deleted first and returning the full version list via include_deleted::yes on that branch. Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
01aa966 to
709a3c7
Compare
|
@dotnwat @pgellert @nguyen-andrew - I think this one is pretty clean. if you want to see how it maps into the rpc stuff check out #30046 |
|
/ci-repeat 1 |
Introduces a pluggable
transportinterface for schema registry's internal_schemastopic I/O, replacing the hardcodedkafka::clientdependency withkafka_client_transport.Bugs discovered during development
seq_writer::delete_subject_impermanent- retry after write collisioncould 404 because the subject's version list was no longer available from
the store after a concurrent soft-delete. The code as written looked like it
meant to return the deleted versions of a soft-deleted subject, so we do
that instead.
Backports Required
Release Notes