From 88245d71f2f56f07e8c8684cbf93375e8df5bdc6 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 1 Apr 2026 15:53:43 -0300 Subject: [PATCH 1/2] base: add oncore_auto helper An oncore that debug-asserts same-shard in its destructor. Embed as a member to catch cross-shard destruction in debug builds. --- src/v/base/oncore.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/v/base/oncore.h b/src/v/base/oncore.h index 0109d9daeeb3c..5d1a745597429 100644 --- a/src/v/base/oncore.h +++ b/src/v/base/oncore.h @@ -48,3 +48,11 @@ class oncore final { do { \ expression_in_debug_mode((member).assert_shard_source_location()); \ } while (0) + +/// An oncore that debug-asserts same-shard in its destructor. +/// Embed as a member to catch cross-shard destruction in debug builds. +// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions,hicpp-special-member-functions) +struct oncore_auto final { + ~oncore_auto() noexcept { oncore_debug_verify(_oncore); } + oncore _oncore; +}; From f3a9454502622d7528e9bd073d28e76b3e2851f7 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 1 Apr 2026 16:30:09 -0300 Subject: [PATCH 2/2] kafka/server: remove foreign_ptr from response_ptr The response object is always created and destroyed on the connection's shard. The foreign_ptr wrapper was a historical artifact or oversight, since in general a response may contain data from many shards so a single-response level foreign pointer doesn't work. Today cross shard deletion is handled by things _inside_ the response, e.g., by foreign_record_batch_readers nested somewhere inside the response. The respond() path in request_context always serializes into fresh local buffers, so the response iobuf never contains foreign fragments. Replace foreign_ptr with a plain unique_ptr and add an oncore_auto member to response to assert same-shard destruction in debug builds. --- src/v/kafka/server/connection_context.h | 3 +-- src/v/kafka/server/protocol_utils.cc | 3 ++- src/v/kafka/server/response.h | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 2dd40c0909a3c..4e6441fce8418 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -19,6 +19,7 @@ #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/handler_probe.h" #include "kafka/server/logger.h" +#include "kafka/server/response.h" #include "net/connection.h" #include "net/server_probe.h" #include "proto/redpanda/core/admin/v2/kafka_connections.proto.h" @@ -49,8 +50,6 @@ namespace kafka { -using response_ptr = ss::foreign_ptr>; - using closed_connections_t = std::deque>; diff --git a/src/v/kafka/server/protocol_utils.cc b/src/v/kafka/server/protocol_utils.cc index 706fcb765cee4..06cc13c2af01e 100644 --- a/src/v/kafka/server/protocol_utils.cc +++ b/src/v/kafka/server/protocol_utils.cc @@ -155,7 +155,8 @@ ss::scattered_message response_as_scattered(response_ptr response) { msg.append_static(src, sz); return ss::stop_iteration::no; }); - // MUST be the foreign ptr not the iobuf + // The response must outlive the scattered message since the message + // references the iobuf fragments directly via append_static. msg.on_delete([response = std::move(response)] {}); return msg; } diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index d13d4c0b473ce..8efa7f975ff26 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -11,6 +11,7 @@ #pragma once +#include "base/oncore.h" #include "base/seastarx.h" #include "bytes/iobuf.h" #include "kafka/protocol/types.h" @@ -63,9 +64,10 @@ class response { std::optional _tags; iobuf _buf; protocol::encoder _writer; + oncore_auto _oncore; }; -using response_ptr = ss::foreign_ptr>; +using response_ptr = std::unique_ptr; struct process_result_stages { process_result_stages(