Skip to content

[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254

Open
oleiman wants to merge 6 commits intodevfrom
sr/noticket/kafka-transport
Open

[CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface#30254
oleiman wants to merge 6 commits intodevfrom
sr/noticket/kafka-transport

Conversation

@oleiman
Copy link
Copy Markdown
Member

@oleiman oleiman commented Apr 22, 2026

Introduces a pluggable transport interface for schema registry's internal
_schemas topic I/O, replacing the hardcoded kafka::client dependency with kafka_client_transport.

Bugs discovered during development

  • seq_writer::delete_subject_impermanent - retry after write collision
    could 404 because the subject's version list was no longer available from
    the store after a concurrent soft-delete. The code as written looked like it
    meant to return the deleted versions of a soft-deleted subject, so we do
    that instead.

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v26.1.x
  • v25.3.x
  • v25.2.x

Release Notes

  • none

@oleiman oleiman self-assigned this Apr 22, 2026
@oleiman oleiman added the claude-review Adding this label to a PR will trigger a workflow to review the code using claude. label Apr 22, 2026
@oleiman oleiman requested a review from Copilot April 22, 2026 23:37
@oleiman oleiman mentioned this pull request Apr 22, 2026
7 tasks
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Schema Registry’s internal _schemas topic I/O to go through a new transport abstraction, enabling alternative backends (e.g., RPC-based) while keeping existing Kafka-client behavior available via kafka_client_transport.

Changes:

  • Introduce pandaproxy::schema_registry::transport and factor internal topic operations (produce/consume/HWM/auth/mitigation) behind it.
  • Add kafka_client_transport implementation and wire api, service, and seq_writer to use the transport pointer.
  • Update tests to use a noop_transport and add a collision-simulation transport to exercise delete retry behavior.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/v/pandaproxy/schema_registry/transport.h Adds the new transport interface used by SR components.
src/v/pandaproxy/schema_registry/kafka_client_transport.h Declares Kafka-client-backed transport implementation.
src/v/pandaproxy/schema_registry/kafka_client_transport.cc Implements Kafka-client-backed transport behaviors.
src/v/pandaproxy/schema_registry/service.h Switches SR service dependency from kafka client to transport.
src/v/pandaproxy/schema_registry/service.cc Routes internal topic init/load through transport and adds retry logic.
src/v/pandaproxy/schema_registry/seq_writer.h Switches sequencer dependency from kafka client to transport; adds delete retry cache state.
src/v/pandaproxy/schema_registry/seq_writer.cc Uses transport for produce/consume/HWM and adds delete collision retry behavior.
src/v/pandaproxy/schema_registry/api.h Replaces sharded Kafka client with sharded Kafka transport.
src/v/pandaproxy/schema_registry/api.cc Wires up transport lifecycle and passes transport pointers to service/sequencer.
src/v/pandaproxy/schema_registry/fwd.h Adds forward declarations for new transport types.
src/v/pandaproxy/schema_registry/BUILD Adds new transport and kafka_client_transport Bazel targets and updates deps.
src/v/pandaproxy/schema_registry/test/utils.h Adds noop_transport for tests that don’t use topic I/O.
src/v/pandaproxy/schema_registry/test/consume_to_store.cc Updates tests to use noop_transport; adds collision transport test.
src/v/pandaproxy/schema_registry/test/compatibility_3rdparty.cc Updates test to use noop_transport instead of dummy Kafka client.
src/v/pandaproxy/schema_registry/test/BUILD Updates test deps to include new transport targets.

Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 0e4a6d3 to 4502f20 Compare April 23, 2026 04:01
@oleiman oleiman changed the title Schema Registry: Factor kafka client usage into new transport interface [CORE-16135]: Schema Registry: Factor kafka client usage into new transport interface Apr 23, 2026
@oleiman oleiman requested a review from Copilot April 23, 2026 04:10
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.

Comments suppressed due to low confidence (1)

src/v/pandaproxy/schema_registry/seq_writer.cc:620

  • _delete_versions_cache is populated before several operations that can throw (e.g., is_referenced, tombstone lookups). If an exception is thrown after caching, the cache persists beyond this delete attempt and a later call for the same subject (when it is already soft-deleted) can incorrectly return this stale cached version list. Consider clearing the cache on any exception path after it is set (e.g., a scope guard / try-catch that resets _delete_versions_cache before rethrowing), or delaying cache population until after all non-retriable validations have passed.
    // Cache versions for potential retry — after a subject-level soft
    // delete all versions are marked deleted and the pre-delete list
    // cannot be reconstructed from the store. Tagged with subject so
    // stale entries from a prior delete of a different subject are ignored.
    _delete_versions_cache.emplace(delete_version_cache{sub, versions.copy()});

    // Check that the subject is not referenced
    if (co_await _store.is_referenced(sub, std::nullopt)) {
        throw as_exception(has_references(sub, versions.back()));
    }

Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/seq_writer.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 4502f20 to 7c45444 Compare April 23, 2026 04:22
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Comment thread src/v/pandaproxy/schema_registry/seq_writer.cc Outdated
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 7c45444 to b73bcef Compare April 23, 2026 07:14
@oleiman oleiman marked this pull request as ready for review April 23, 2026 07:14
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from b73bcef to 23114c7 Compare April 23, 2026 07:45
@oleiman oleiman requested a review from Copilot April 23, 2026 07:48
@oleiman oleiman added claude-review Adding this label to a PR will trigger a workflow to review the code using claude. and removed claude-review Adding this label to a PR will trigger a workflow to review the code using claude. labels Apr 23, 2026
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc Outdated
Comment thread src/v/pandaproxy/schema_registry/seq_writer.h
Comment thread src/v/pandaproxy/schema_registry/kafka_client_transport.cc
Comment thread src/v/pandaproxy/schema_registry/transport.h
@claude
Copy link
Copy Markdown

claude Bot commented Apr 23, 2026

Review summary

Nice refactor — the transport abstraction is clean and the separation between service/seq_writer (transport consumers) and kafka_client_transport (the concrete implementation) is a clear improvement. The bundled do_delete_subject_impermanent fix is well-scoped and the new colliding_transport test exercises exactly the race described in the PR body, including the assertion that produce is only called once. The noop_transport utility also cuts real boilerplate out of the existing tests.

Bugs / correctness

  • Retry budget in service::do_start is 10 * 100ms = 1s, and the retry predicate keys off unknown_server_error (a catch-all). On a loaded cluster metadata propagation after create_topic can take longer than 1s, and non-metadata paths inside fetch_internal_topic (malformed ListOffsets response etc.) also surface as unknown_server_error and would be silently retried. Inline comment on service.cc.
  • _gate.is_closed() guard in mitigate_error dropped in the move to kafka_client_transport. Probably fine given the stop order in api::stop, but worth confirming the _client->stop() → "no more error-callback invocations" invariant. Inline comment on kafka_client_transport.cc.

Nits

  • Log message in the topic_error handler still says "partition error" — copy-paste from the partition_error branch just above. Inline suggestion.
  • transport* in seq_writer / service is always non-null (verified by vassert). A transport& would encode that in the type and drop the vassert.
  • The transport base interface currently mixes I/O (produce/consume/hwm) with auth/topic-management concerns (configure, has_ephemeral_credentials, create_topic). These are explicitly no-ops for the future RPC transport — splitting into two interfaces would avoid empty-stub methods when the RPC transport lands. Not blocking.

Design observations (no action needed)

  • Lambda coroutines in seq_writer::wait_for and service::fetch_internal_topic correctly use the C++23 this auto pattern to move captures into the coroutine frame — matches the guidance in CLAUDE.md.
  • The ss::sharded<kafka_client_transport> lives on the concrete type, but service/seq_writer hold the abstract transport*. That's a reasonable compromise — a future RPC transport will need its own sharded container, with the abstract pointer being the shared injection point. The api::stop() call _transport.invoke_on_all(&kafka_client_transport::stop) is concrete-typed as a result; likely fine, since transport::stop is already virtual.
  • Compared to the old path, configure() running per-shard is slightly more efficient now (previously each shard's service::configure did _client.invoke_on_all(set_credentials) credential propagations; now each shard's transport just sets its own local credential → N).

Overall looks good — main ask is to take a look at the retry budget / predicate in do_start and confirm the stop-race invariant for mitigate_error.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.

Comment thread src/v/pandaproxy/schema_registry/transport.h Outdated
Comment thread src/v/pandaproxy/schema_registry/service.cc
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch 2 times, most recently from 65cad38 to 01aa966 Compare April 23, 2026 21:09
@vbotbuildovich
Copy link
Copy Markdown
Collaborator

vbotbuildovich commented Apr 23, 2026

CI test results

test results on build#83603
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_with_restart {"storage_mode": "tiered_cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83603#019dbc38-a9ce-4b1f-8235-3bb62893af5a 28/31 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0199, p0=0.1192, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.4114, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_with_restart
test results on build#83618
test_status test_class test_method test_arguments test_kind job_url passed reason test_history
FLAKY(PASS) ShadowLinkingReplicationTests test_replication_basic {"shuffle_leadership": true, "source_cluster_spec": {"cluster_type": "redpanda"}, "storage_mode": "cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8d-6fb9-4b80-b5d6-6fec5b2d2f30 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0000, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_basic
FLAKY(PASS) ShadowLinkingReplicationTests test_replication_with_failures {"storage_mode": "tiered_cloud"} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8d-6fb8-48d0-9286-b75a5867962a 19/21 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0020, p0=0.0386, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3917, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=ShadowLinkingReplicationTests&test_method=test_replication_with_failures
FLAKY(PASS) SimpleEndToEndTest test_relaxed_acks {"write_caching": false} integration https://buildkite.com/redpanda/redpanda/builds/83618#019dbd8e-ea63-44b0-a33f-dd9d47b96693 10/11 Test PASSES after retries.No significant increase in flaky rate(baseline=0.0023, p0=1.0000, reject_threshold=0.0100. adj_baseline=0.1000, p1=0.3487, trust_threshold=0.5000) https://redpanda.metabaseapp.com/dashboard/87-tests?tab=142-dt-individual-test-history&test_class=SimpleEndToEndTest&test_method=test_relaxed_acks

oleiman added 5 commits April 23, 2026 20:16
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
do_delete_subject_impermanent called get_versions(include_deleted::no)
before is_subject_deleted(). On retry after a write collision where
the subject was already soft-deleted by the winning writer,
get_versions throws subject_not_found, which propagates as HTTP 404
to the client.

Fix by checking is_subject_deleted first and returning the full
version list via include_deleted::yes on that branch.

Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
@oleiman oleiman force-pushed the sr/noticket/kafka-transport branch from 01aa966 to 709a3c7 Compare April 24, 2026 03:17
@oleiman oleiman marked this pull request as draft April 24, 2026 08:23
@oleiman oleiman marked this pull request as ready for review April 25, 2026 02:18
@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 25, 2026

@dotnwat @pgellert @nguyen-andrew - I think this one is pretty clean. if you want to see how it maps into the rpc stuff check out #30046

@oleiman
Copy link
Copy Markdown
Member Author

oleiman commented Apr 25, 2026

/ci-repeat 1
skip-redpanda-build
skip-units
skip-rebase
dt-repeat=5
tests/rptest/tests/schema_registry_test.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/build area/redpanda claude-review Adding this label to a PR will trigger a workflow to review the code using claude.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants