Skip to content

Commit 64a6b6a

Browse files
committed
kafka/server: remove foreign_ptr from response_ptr
The response object is always created and destroyed on the connection's shard. The foreign_ptr wrapper was a historical artifact or oversight, since in general a response may contain data from many shards so a single-response level foreign pointer doesn't work. Today cross shard deletion is handled by things _inside_ the response, e.g., by foreign_record_batch_readers nested somewhere inside the response. The respond() path in request_context always serializes into fresh local buffers, so the response iobuf never contains foreign fragments. Replace foreign_ptr with a plain unique_ptr and add an oncore_auto member to response to assert same-shard destruction in debug builds.
1 parent 88245d7 commit 64a6b6a

3 files changed

Lines changed: 6 additions & 4 deletions

File tree

src/v/kafka/server/connection_context.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "kafka/server/handlers/details/security.h"
2020
#include "kafka/server/handlers/handler_probe.h"
2121
#include "kafka/server/logger.h"
22+
#include "kafka/server/response.h"
2223
#include "net/connection.h"
2324
#include "net/server_probe.h"
2425
#include "proto/redpanda/core/admin/v2/kafka_connections.proto.h"
@@ -49,8 +50,6 @@
4950

5051
namespace kafka {
5152

52-
using response_ptr = ss::foreign_ptr<std::unique_ptr<response>>;
53-
5453
using closed_connections_t
5554
= std::deque<ss::lw_shared_ptr<const proto::admin::kafka_connection>>;
5655

src/v/kafka/server/protocol_utils.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ ss::scattered_message<char> response_as_scattered(response_ptr response) {
155155
msg.append_static(src, sz);
156156
return ss::stop_iteration::no;
157157
});
158-
// MUST be the foreign ptr not the iobuf
158+
// The response must outlive the scattered message since the message
159+
// references the iobuf fragments directly via append_static.
159160
msg.on_delete([response = std::move(response)] {});
160161
return msg;
161162
}

src/v/kafka/server/response.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#pragma once
1313

14+
#include "base/oncore.h"
1415
#include "base/seastarx.h"
1516
#include "bytes/iobuf.h"
1617
#include "kafka/protocol/types.h"
@@ -63,9 +64,10 @@ class response {
6364
std::optional<tagged_fields> _tags;
6465
iobuf _buf;
6566
protocol::encoder _writer;
67+
[[no_unique_address]] oncore_auto _oncore;
6668
};
6769

68-
using response_ptr = ss::foreign_ptr<std::unique_ptr<response>>;
70+
using response_ptr = std::unique_ptr<response>;
6971

7072
struct process_result_stages {
7173
process_result_stages(

0 commit comments

Comments
 (0)