Skip to content

Commit 7cb0d02

Browse files
Use protobuf Arena for fanout response deserialization (valkey-io#1040)
Using protobuf Arena for GRPC fanout response deserialization ### Problem `SearchIndexPartitionResponse` is heap-allocated in the gRPC client. Each response contains N `NeighborEntry` sub-messages, each individually malloc'd during deserialization and individually freed on destruction. At large result sets, this creates thousands of malloc/free calls per query per remote shard. ### Fix Allocate `SearchIndexPartitionResponse` on a `google::protobuf::Arena`. All sub-messages use bulk arena allocation and are freed in one shot when the arena is destroyed. ### Benchmark 3-shard CME cluster, 10K docs, TEXT index, 500 concurrent clients querying one node (fanout to 2 remote shards). | Workload | Before | After | Improvement | |---|---|---|---| | LIMIT 100 CONTENT | 2,496 req/s | 2,572 req/s | +3% throughput | | LIMIT 500 CONTENT | 306 req/s | 510 req/s | **+67% throughput, -25% p50** | | LIMIT 1000 CONTENT | 88 req/s | 272 req/s | **+209% throughput, -44% p50** | | NOCONTENT LIMIT 100 | 2,943 req/s | 6,969 req/s | **+137% throughput, -65% p50** | Signed-off-by: Karthik Subbarao <karthikrs2021@gmail.com>
1 parent 1b85d1a commit 7cb0d02

1 file changed

Lines changed: 9 additions & 4 deletions

File tree

src/coordinator/client.cc

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "absl/strings/string_view.h"
1717
#include "absl/time/clock.h"
1818
#include "absl/time/time.h"
19+
#include "google/protobuf/arena.h"
1920
#include "grpc/grpc.h"
2021
#include "grpcpp/channel.h"
2122
#include "grpcpp/client_context.h"
@@ -142,11 +143,15 @@ void ClientImpl::SearchIndexPartition(
142143
struct SearchIndexPartitionArgs {
143144
::grpc::ClientContext context;
144145
std::unique_ptr<SearchIndexPartitionRequest> request;
145-
SearchIndexPartitionResponse response;
146+
google::protobuf::Arena arena;
147+
SearchIndexPartitionResponse* response;
146148
SearchIndexPartitionCallback callback;
147149
std::unique_ptr<vmsdk::StopWatch> latency_sample;
148150
};
149151
auto args = std::make_unique<SearchIndexPartitionArgs>();
152+
args->response =
153+
google::protobuf::Arena::Create<SearchIndexPartitionResponse>(
154+
&args->arena);
150155
args->context.set_deadline(absl::ToChronoTime(
151156
absl::Now() + absl::Seconds(query_connection_timeout->GetValue())));
152157
args->callback = std::move(done);
@@ -156,20 +161,20 @@ void ClientImpl::SearchIndexPartition(
156161
Metrics::GetStats().coordinator_bytes_out.fetch_add(
157162
args_raw->request->ByteSizeLong(), std::memory_order_relaxed);
158163
stub_->async()->SearchIndexPartition(
159-
&args_raw->context, args_raw->request.get(), &args_raw->response,
164+
&args_raw->context, args_raw->request.get(), args_raw->response,
160165
// std::function is not move-only.
161166
[args_raw](grpc::Status s) mutable {
162167
GRPCSuspensionGuard guard(GRPCSuspender::Instance());
163168
auto args = std::unique_ptr<SearchIndexPartitionArgs>(args_raw);
164-
args->callback(s, args->response);
169+
args->callback(s, *args->response);
165170
if (s.ok()) {
166171
Metrics::GetStats()
167172
.coordinator_client_search_index_partition_success_cnt++;
168173
Metrics::GetStats()
169174
.coordinator_client_search_index_partition_success_latency
170175
.SubmitSample(std::move(args->latency_sample));
171176
Metrics::GetStats().coordinator_bytes_in.fetch_add(
172-
args->response.ByteSizeLong(), std::memory_order_relaxed);
177+
args->response->ByteSizeLong(), std::memory_order_relaxed);
173178
} else {
174179
Metrics::GetStats()
175180
.coordinator_client_search_index_partition_failure_cnt++;

0 commit comments

Comments
 (0)