Skip to content

Commit 6878804

Browse files
authored
Merge pull request #498 from MDA2AV/grpc-stream
grpc streams
2 parents 88da1ba + 07cf622 commit 6878804

18 files changed

Lines changed: 529 additions & 14 deletions

File tree

frameworks/aspnet-grpc/Protos/benchmark.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,30 @@ syntax = "proto3";
33
package benchmark;
44

55
service BenchmarkService {
6+
// Unary — baseline
67
rpc GetSum (SumRequest) returns (SumReply);
8+
9+
// Server streaming — 1 request, server emits `count` replies
10+
rpc StreamSum (StreamRequest) returns (stream SumReply);
11+
12+
// Client streaming — N requests streamed in, 1 reply with the total
13+
rpc CollectSum (stream SumRequest) returns (SumReply);
14+
15+
// Bidirectional streaming — echo: 1 reply per request over a persistent stream
16+
rpc EchoSum (stream SumRequest) returns (stream SumReply);
717
}
818

919
message SumRequest {
1020
int32 a = 1;
1121
int32 b = 2;
1222
}
1323

24+
message StreamRequest {
25+
int32 a = 1;
26+
int32 b = 2;
27+
int32 count = 3;
28+
}
29+
1430
message SumReply {
1531
int32 result = 1;
1632
}

frameworks/aspnet-grpc/Services/BenchmarkServiceImpl.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,43 @@ public override Task<SumReply> GetSum(SumRequest request, ServerCallContext cont
99
{
1010
return Task.FromResult(new SumReply { Result = request.A + request.B });
1111
}
12+
13+
// Server streaming — emit `count` replies for one request.
14+
public override async Task StreamSum(
15+
StreamRequest request,
16+
IServerStreamWriter<SumReply> responseStream,
17+
ServerCallContext context)
18+
{
19+
var sum = request.A + request.B;
20+
var count = request.Count <= 0 ? 1 : request.Count;
21+
for (var i = 0; i < count; i++)
22+
{
23+
await responseStream.WriteAsync(new SumReply { Result = sum + i });
24+
}
25+
}
26+
27+
// Client streaming — aggregate every incoming request into one final total.
28+
public override async Task<SumReply> CollectSum(
29+
IAsyncStreamReader<SumRequest> requestStream,
30+
ServerCallContext context)
31+
{
32+
var total = 0;
33+
await foreach (var req in requestStream.ReadAllAsync())
34+
{
35+
total += req.A + req.B;
36+
}
37+
return new SumReply { Result = total };
38+
}
39+
40+
// Bidirectional streaming — one reply per incoming request on a persistent stream.
41+
public override async Task EchoSum(
42+
IAsyncStreamReader<SumRequest> requestStream,
43+
IServerStreamWriter<SumReply> responseStream,
44+
ServerCallContext context)
45+
{
46+
await foreach (var req in requestStream.ReadAllAsync())
47+
{
48+
await responseStream.WriteAsync(new SumReply { Result = req.A + req.B });
49+
}
50+
}
1251
}

frameworks/aspnet-grpc/meta.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
"enabled": true,
99
"tests": [
1010
"unary-grpc",
11-
"unary-grpc-tls"
11+
"unary-grpc-tls",
12+
"stream-grpc",
13+
"stream-grpc-tls"
1214
],
1315
"maintainers": []
1416
}

requests/benchmark.proto

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Canonical benchmark.proto used by ghz for gRPC streaming tests.
2+
// Each gRPC framework must implement the RPCs it subscribes to in its own
3+
// service; the shapes must match this file exactly.
4+
5+
syntax = "proto3";
6+
7+
package benchmark;
8+
9+
service BenchmarkService {
10+
// Unary — 1 request, 1 reply. Used by unary-grpc (h2load).
11+
rpc GetSum (SumRequest) returns (SumReply);
12+
13+
// Server streaming — 1 request, server emits `count` replies. Used by
14+
// stream-grpc / stream-grpc-tls (ghz).
15+
rpc StreamSum (StreamRequest) returns (stream SumReply);
16+
17+
// Client streaming — N requests streamed in, 1 reply with the total.
18+
rpc CollectSum (stream SumRequest) returns (SumReply);
19+
20+
// Bidirectional streaming — echo: 1 reply per request over a persistent stream.
21+
rpc EchoSum (stream SumRequest) returns (stream SumReply);
22+
}
23+
24+
message SumRequest {
25+
int32 a = 1;
26+
int32 b = 2;
27+
}
28+
29+
message StreamRequest {
30+
int32 a = 1;
31+
int32 b = 2;
32+
int32 count = 3;
33+
}
34+
35+
message SumReply {
36+
int32 result = 1;
37+
}

scripts/benchmark.sh

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,13 @@ declare -A PROFILES=(
5757
[static-h3]="1|0|0-31,64-95|64|static-h3"
5858
[unary-grpc]="1|0|0-31,64-95|256,1024|grpc"
5959
[unary-grpc-tls]="1|0|0-31,64-95|256,1024|grpc-tls"
60+
[stream-grpc]="1|0|0-31,64-95|64|grpc-stream"
61+
[stream-grpc-tls]="1|0|0-31,64-95|64|grpc-stream-tls"
6062
[gateway-64]="1|0|0-31,64-95|256,1024|gateway-64"
6163
[echo-ws]="1|0|0-31,64-95|512,4096,16384|ws-echo"
6264
[async-db]="1|0|0-31,64-95|1024|async-db"
6365
)
64-
PROFILE_ORDER=(baseline pipelined limited-conn json json-comp json-tls upload api-4 api-16 static async-db baseline-h2 static-h2 baseline-h3 static-h3 gateway-64 unary-grpc unary-grpc-tls echo-ws)
66+
PROFILE_ORDER=(baseline pipelined limited-conn json json-comp json-tls upload api-4 api-16 static async-db baseline-h2 static-h2 baseline-h3 static-h3 gateway-64 unary-grpc unary-grpc-tls stream-grpc stream-grpc-tls echo-ws)
6567

6668
# Parse flags
6769
SAVE_RESULTS=false
@@ -564,15 +566,19 @@ for profile in "${profiles_to_run[@]}"; do
564566

565567
# Wait for server
566568
echo "[wait] Waiting for server..."
567-
if [ "$endpoint" = "grpc" ] || [ "$endpoint" = "grpc-tls" ]; then
568-
PROTO_FILE=$(find "$ROOT_DIR/frameworks/$FRAMEWORK" -name 'benchmark.proto' -type f | head -1)
569-
if [ "$endpoint" = "grpc-tls" ]; then
569+
if [ "$endpoint" = "grpc" ] || [ "$endpoint" = "grpc-tls" ] || \
570+
[ "$endpoint" = "grpc-stream" ] || [ "$endpoint" = "grpc-stream-tls" ]; then
571+
PROTO_FILE="$REQUESTS_DIR/benchmark.proto"
572+
[ -f "$PROTO_FILE" ] || PROTO_FILE=$(find "$ROOT_DIR/frameworks/$FRAMEWORK" -name 'benchmark.proto' -type f | head -1)
573+
if [[ "$endpoint" == *-tls ]]; then
570574
local_grpc_check="localhost:$H2PORT"
575+
local_grpc_flag="--skipTLS"
571576
else
572577
local_grpc_check="localhost:$PORT"
578+
local_grpc_flag="--insecure"
573579
fi
574580
for i in $(seq 1 30); do
575-
if $GHZ --insecure --proto "$PROTO_FILE" \
581+
if $GHZ $local_grpc_flag --proto "$PROTO_FILE" \
576582
--call benchmark.BenchmarkService/GetSum \
577583
-d '{"a":1,"b":2}' -c 1 -n 1 \
578584
"$local_grpc_check" >/dev/null 2>&1; then
@@ -626,6 +632,7 @@ for profile in "${profiles_to_run[@]}"; do
626632
USE_H2LOAD=false
627633
USE_OHA=false
628634
USE_WRK=false
635+
USE_GHZ=false
629636
if [ "$endpoint" = "ws-echo" ]; then
630637
gc_args=("http://localhost:$PORT/ws"
631638
--ws
@@ -646,6 +653,28 @@ for profile in "${profiles_to_run[@]}"; do
646653
-H 'content-type: application/grpc'
647654
-H 'te: trailers'
648655
-c "$CONNS" -m 100 -t "$H2THREADS" -D "$DURATION")
656+
elif [ "$endpoint" = "grpc-stream" ] || [ "$endpoint" = "grpc-stream-tls" ]; then
657+
USE_GHZ=true
658+
# 4 streams multiplexed per TCP connection with count=5000. Empirically
659+
# the optimal clean shape under TLS: ~8.6M msgs/sec with <2% error rate
660+
# and ~145ms average latency. Denser ratios (8:1) push the error rate
661+
# to 10% under TLS without meaningfully raising throughput.
662+
ghz_workers=$((CONNS * 4))
663+
ghz_msgs_per_call=5000
664+
if [ "$endpoint" = "grpc-stream-tls" ]; then
665+
ghz_target="localhost:$H2PORT"
666+
ghz_tls_flag=--skipTLS
667+
else
668+
ghz_target="localhost:$PORT"
669+
ghz_tls_flag=--insecure
670+
fi
671+
gc_args=("$GHZ" "$ghz_tls_flag"
672+
--proto "$REQUESTS_DIR/benchmark.proto"
673+
--call benchmark.BenchmarkService/StreamSum
674+
-d "{\"a\":1,\"b\":2,\"count\":$ghz_msgs_per_call}"
675+
--connections "$CONNS" -c "$ghz_workers"
676+
-z "$DURATION"
677+
"$ghz_target")
649678
elif [ "$endpoint" = "static-h3" ]; then
650679
USE_H2LOAD=true
651680
gc_args=("${H2LOAD_H3_CMD[@]}" --alpn-list=h3
@@ -717,10 +746,24 @@ for profile in "${profiles_to_run[@]}"; do
717746
--recv-buf 512
718747
-c "$CONNS" -t "$THREADS" -d "$DURATION" -p "$pipeline")
719748
fi
720-
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_WRK" = "false" ] && [ "$req_per_conn" -gt 0 ] 2>/dev/null; then
749+
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_WRK" = "false" ] && [ "$USE_GHZ" = "false" ] && [ "$req_per_conn" -gt 0 ] 2>/dev/null; then
721750
gc_args+=(-r "$req_per_conn")
722751
fi
723752

753+
# Warm-up for ghz: Kestrel (and most gRPC servers) need a few seconds of
754+
# traffic before their thread pool and accept loop are fully hot. Without
755+
# this, the first measured run can burst 128 connections into a cold
756+
# accept backlog and see "connection refused" errors on most of them.
757+
if [ "$USE_GHZ" = "true" ]; then
758+
echo "[warmup] ghz 2s"
759+
taskset -c "$GCANNON_CPUS" "$GHZ" "$ghz_tls_flag" \
760+
--proto "$REQUESTS_DIR/benchmark.proto" \
761+
--call benchmark.BenchmarkService/StreamSum \
762+
-d "{\"a\":1,\"b\":2,\"count\":$ghz_msgs_per_call}" \
763+
--connections "$CONNS" -c "$ghz_workers" \
764+
-z 2s "$ghz_target" >/dev/null 2>&1 || true
765+
fi
766+
724767
# Best-of-N runs
725768
best_rps=0
726769
best_output=""
@@ -759,6 +802,8 @@ for profile in "${profiles_to_run[@]}"; do
759802
rm -f "$oha_out"
760803
elif [ "$USE_H2LOAD" = "true" ]; then
761804
output=$(timeout 45 taskset -c "$GCANNON_CPUS" "${gc_args[@]}" 2>&1) || true
805+
elif [ "$USE_GHZ" = "true" ]; then
806+
output=$(timeout 45 taskset -c "$GCANNON_CPUS" "${gc_args[@]}" 2>&1) || true
762807
elif [ "$GCANNON_MODE" = "native" ]; then
763808
output=$(timeout 45 taskset -c "$GCANNON_CPUS" \
764809
env LD_LIBRARY_PATH=/usr/lib "$GCANNON" "${gc_args[@]}" 2>&1) || true
@@ -793,6 +838,20 @@ for profile in "${profiles_to_run[@]}"; do
793838
# h2load: "finished in 5.00s, 123456.78 req/s, 45.67MB/s"
794839
rps_int=$(echo "$output" | grep -oP 'finished in [\d.]+s, \K[\d.]+' | cut -d. -f1 || echo "0")
795840
rps_int=${rps_int:-0}
841+
elif [ "$USE_GHZ" = "true" ]; then
842+
# ghz's "Requests/sec" counts *all* attempts including errors, so
843+
# runs where the server rejected connections (connect refused during
844+
# ramp-up) get a misleadingly high headline number. Use only OK
845+
# responses divided by the configured duration, then multiply by
846+
# msgs_per_call to get real successful messages/sec.
847+
ok_count=$(echo "$output" | grep -oP '\[OK\]\s+\K\d+' | head -1 || echo "0")
848+
dur_s=${DURATION%s}
849+
if [ -z "$ok_count" ] || [ -z "$dur_s" ] || [ "$dur_s" = "0" ]; then
850+
rps_int=0
851+
else
852+
rps_int=$(python3 -c "print(int(($ok_count / $dur_s) * ${ghz_msgs_per_call:-1}))" 2>/dev/null || echo "0")
853+
fi
854+
rps_int=${rps_int:-0}
796855
else
797856
duration_secs=$(echo "$output" | grep -oP '(?:requests|frames sent) in ([\d.]+)s' | grep -oP '[\d.]+' || echo "1")
798857
if [ "$endpoint" = "caching" ]; then
@@ -847,6 +906,12 @@ for profile in "${profiles_to_run[@]}"; do
847906
fi
848907
reconnects="0"
849908
bandwidth=$(echo "$best_output" | grep -oP 'finished in [\d.]+s, [\d.]+ req/s, \K[\d.]+[KMGT]?B/s' || echo "0")
909+
elif [ "$USE_GHZ" = "true" ]; then
910+
# ghz: "Average: 12.34 ms", "Slowest: 56.78 ms" — per-call latency (not per-message).
911+
avg_lat=$(echo "$best_output" | awk '/^\s*Average:/{print $2 $3; exit}')
912+
p99_lat=$(echo "$best_output" | awk '/^\s*Slowest:/{print $2 $3; exit}')
913+
reconnects="0"
914+
bandwidth="0"
850915
else
851916
avg_lat=$(echo "$best_output" | grep "Latency" | head -1 | awk '{print $2}')
852917
p99_lat=$(echo "$best_output" | grep "Latency" | head -1 | awk '{print $5}')
@@ -870,6 +935,12 @@ for profile in "${profiles_to_run[@]}"; do
870935
status_3xx=$(echo "$best_output" | grep -oP '\d+(?= 3xx)' || echo "0")
871936
status_4xx=$(echo "$best_output" | grep -oP '\d+(?= 4xx)' || echo "0")
872937
status_5xx=$(echo "$best_output" | grep -oP '\d+(?= 5xx)' || echo "0")
938+
elif [ "$USE_GHZ" = "true" ]; then
939+
# ghz prints "[OK] N responses" for successful RPC calls. We surface this
940+
# as status_2xx so it feeds into the same run_ok bookkeeping as other tools.
941+
status_2xx=$(echo "$best_output" | grep -oP '\[OK\]\s+\K\d+' | head -1 || echo "0")
942+
status_3xx=0; status_4xx=0
943+
status_5xx=$(echo "$best_output" | grep -oP '\[Unavailable\]\s+\K\d+' | head -1 || echo "0")
873944
else
874945
if [ "$endpoint" = "ws-echo" ]; then
875946
status_2xx=$(echo "$best_output" | grep -oP 'WS frames:\s+\K\d+' || echo "0")
@@ -884,7 +955,7 @@ for profile in "${profiles_to_run[@]}"; do
884955

885956
# Compute input bandwidth from raw template sizes × RPS
886957
input_bw=""
887-
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_OHA" = "false" ] && [ "$USE_WRK" = "false" ]; then
958+
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_OHA" = "false" ] && [ "$USE_WRK" = "false" ] && [ "$USE_GHZ" = "false" ]; then
888959
raw_arg=""
889960
prev_was_raw=false
890961
for arg in "${gc_args[@]}"; do
@@ -912,7 +983,7 @@ else: print(f'{bps}B/s')
912983

913984
# Parse per-template response counts (gcannon mixed/multi-template output)
914985
tpl_json=""
915-
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_OHA" = "false" ]; then
986+
if [ "$USE_H2LOAD" = "false" ] && [ "$USE_OHA" = "false" ] && [ "$USE_GHZ" = "false" ]; then
916987
tpl_line=$(echo "$best_output" | grep -oP 'Per-template-ok: \K.*' || echo "")
917988
if [ -n "$tpl_line" ] && ([ "$endpoint" = "api-4" ] || [ "$endpoint" = "api-16" ]); then
918989
# API-4 templates: get×3, json-get×3, async-db×2

0 commit comments

Comments
 (0)