diff --git a/src/v/base/oncore.h b/src/v/base/oncore.h index 0109d9daeeb3c..5d1a745597429 100644 --- a/src/v/base/oncore.h +++ b/src/v/base/oncore.h @@ -48,3 +48,11 @@ class oncore final { do { \ expression_in_debug_mode((member).assert_shard_source_location()); \ } while (0) + +/// An oncore that debug-asserts same-shard in its destructor. +/// Embed as a member to catch cross-shard destruction in debug builds. +// NOLINTNEXTLINE(cppcoreguidelines-special-member-functions,hicpp-special-member-functions) +struct oncore_auto final { + ~oncore_auto() noexcept { oncore_debug_verify(_oncore); } + oncore _oncore; +}; diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 2dd40c0909a3c..4e6441fce8418 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -19,6 +19,7 @@ #include "kafka/server/handlers/details/security.h" #include "kafka/server/handlers/handler_probe.h" #include "kafka/server/logger.h" +#include "kafka/server/response.h" #include "net/connection.h" #include "net/server_probe.h" #include "proto/redpanda/core/admin/v2/kafka_connections.proto.h" @@ -49,8 +50,6 @@ namespace kafka { -using response_ptr = ss::foreign_ptr>; - using closed_connections_t = std::deque>; diff --git a/src/v/kafka/server/protocol_utils.cc b/src/v/kafka/server/protocol_utils.cc index 706fcb765cee4..06cc13c2af01e 100644 --- a/src/v/kafka/server/protocol_utils.cc +++ b/src/v/kafka/server/protocol_utils.cc @@ -155,7 +155,8 @@ ss::scattered_message response_as_scattered(response_ptr response) { msg.append_static(src, sz); return ss::stop_iteration::no; }); - // MUST be the foreign ptr not the iobuf + // The response must outlive the scattered message since the message + // references the iobuf fragments directly via append_static. msg.on_delete([response = std::move(response)] {}); return msg; } diff --git a/src/v/kafka/server/response.h b/src/v/kafka/server/response.h index d13d4c0b473ce..8efa7f975ff26 100644 --- a/src/v/kafka/server/response.h +++ b/src/v/kafka/server/response.h @@ -11,6 +11,7 @@ #pragma once +#include "base/oncore.h" #include "base/seastarx.h" #include "bytes/iobuf.h" #include "kafka/protocol/types.h" @@ -63,9 +64,10 @@ class response { std::optional _tags; iobuf _buf; protocol::encoder _writer; + oncore_auto _oncore; }; -using response_ptr = ss::foreign_ptr>; +using response_ptr = std::unique_ptr; struct process_result_stages { process_result_stages(