Skip to content

Commit 5b0ba8d

Browse files
lupin012claude
andauthored
rpc: compression with libdeflate (#20665)
close #17112 This PR replaces the existing single-path compress/gzip middleware in node/rpcstack.go with a dual-path implementation that distinguishes between streaming and non-streaming responses. a) Non-streaming responses (standard JSON-RPC calls such as eth_getBlockByNumber): - are now compressed in one shot using go-libdeflate, a lightweight CGo wrapper around ebiggers/libdeflate. Benchmarks show ~1.75x speedup vs stdlib gzip on ~30 KB JSON payloads (4.6 ms vs 8.0 ms, 122 MB/s vs 70 MB/s); under high concurrency the advantage is larger due to lower CPU usage per request. - Since the compressed size is known before writing, Content-Length is now set to the exact compressed size, avoiding unnecessary Transfer-Encoding: chunked. - Responses smaller than 1 KB are sent uncompressed: at that size the CPU overhead of initialising the compressor outweighs the benefit, and for very small payloads the compressed output can exceed the input size due to gzip framing overhead. b) Streaming responses (e.g. debug_traceTransaction, trace_filter) are detected via http.Flusher: when the RPC handler calls Flush() before writing, the middleware switches to stdlib compress/gzip in streaming mode, compressing trace data incrementally without buffering the full response. Changes: - node/rpcstack.go: new gzipResponseWriter with buffer/stream dual mode; 4 sync.Pool instances for zero-allocation hot path - rpc/http.go: injects http.Flusher into request context via httpFlusherContextKey - rpc/handler.go: streaming methods call flush before writing to activate streaming mode - go.mod: adds github.com/erigontech/go-libdeflate v0.1.0 # 🚀 Performance Benchmarks: Gzip Optimization --- <details> <summary><b>1. Isolated Compression Benchmarks (libdeflate vs stdlib)</b></summary> Diff Benchmark (Gzip isolation) Latency Throughput Mem Alloc - BenchmarkStdlibGzip 8.0ms/req 70 MB/s 66KB + BenchmarkLibdeflateGzip 4.6ms/req 122 MB/s 63KB (2x faster) Note: We observed a ~1.75x speedup on a single thread. Under high concurrency, the advantage is even greater due to reduced CPU overhead. </details> <details> <summary><b>2. eth_getBlockByNumber with txs (Old SW vs Main SW)</b></summary> Old SW - Results with instability and errors: Diff - [2. 1] qps: 2000 -> [R=97.74% max=9.103s error=503 Service Unavailable] - [3. 3] qps: 2300 -> [R=98.50% max=5.841s error=503 Service Unavailable] - [3. 4] qps: 2300 -> [R=99.80% max=5.634s error=503 Service Unavailable] - [3. 5] qps: 2300 -> [R=98.25% max=5.877s error=503 Service Unavailable] Main SW - Stable results with 100% success rate: Diff + [2. 1] qps: 2000 -> [R=100.00% max=141.22ms] + [2. 5] qps: 2000 -> [R=100.00% max=157.7ms] + [3. 1] qps: 2300 -> [R=100.00% max=217.23ms] + [3. 5] qps: 2300 -> [R=100.00% max=224.174ms] </details> <details> <summary><b>3. trace_block </b></summary> | Metric | Old SW | Main SW | Speedup | | :--- | :--- | :--- | :--- | | **Throughput** | 375.9 r/s | **455.0 r/s** | **+21%** | | **p50 Latency** | 114.4 ms | **96.9 ms** | **1.18x** | | **p90 Latency** | 232.0 ms | **181.2 ms** | **1.28x** | | **Mean Latency** | 132.4 ms | **109.5 ms** | **1.21x** | </details> <details> <summary><b>4. Executes all RPC using http, http-compressed and websockets</b></summary> ./run_all.sh -T http,http_comp,websocket Run tests in parallel on localhost:8545/localhost:8551 Result directory: /home/simon/silkworm/tests/rpc-tests3/integration/results Time: 2026-04-19 08:58:05.257624 Total round_trip time: 3:40:01.359044 Total marshalling time: 0:00:00.063308 Total unmarshalling time: 0:01:40.509521 No of json Diffs: 0 Test time-elapsed: 0:08:37.467460 Available tests: 1436 Available tested api: 112 Number of loop: 1 Number of executed tests: 4152 Number of NOT executed tests: 156 Number of success tests: 4152 Number of failed tests: 0 </details> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent d7b2a63 commit 5b0ba8d

6 files changed

Lines changed: 409 additions & 15 deletions

File tree

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ replace github.com/holiman/bloomfilter/v2 => github.com/AskAlexSharov/bloomfilte
66

77
require (
88
github.com/erigontech/erigon-snapshot v1.3.1-0.20260402120223-7bb412bc89cd
9+
github.com/erigontech/go-libdeflate v0.1.0
910
github.com/erigontech/mdbx-go v0.39.17
1011
github.com/erigontech/secp256k1 v1.2.1-0.20260218182123-377cc1bd6410
1112
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ github.com/erigontech/fastkeccak v0.1.1-0.20260408010752-08e7b6602268 h1:NHl4+Dj
344344
github.com/erigontech/fastkeccak v0.1.1-0.20260408010752-08e7b6602268/go.mod h1:CwJFJVKFVWpQyKSfRrQyY56IqV16sR5xnQoFThGHiZA=
345345
github.com/erigontech/go-eth-kzg v0.0.0-20260401161010-070339460d07 h1:0VpFIthaJzGs3iwyrKCj67wptrNq7KKiLhilDCRPqwk=
346346
github.com/erigontech/go-eth-kzg v0.0.0-20260401161010-070339460d07/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI=
347+
github.com/erigontech/go-libdeflate v0.1.0 h1:WtKiuNpuAP/Qyq3mbPwFB0DoQikabtOeZFUgo9VGRsY=
348+
github.com/erigontech/go-libdeflate v0.1.0/go.mod h1:OCQBWlynNqNCYb1ckqNjY69H1LAhg2TyyfYZalO03BM=
347349
github.com/erigontech/mdbx-go v0.39.17 h1:+FQMaCuH/IB+t9HEXVDxTJaV5jvZTwC/6YgDHHKDDMo=
348350
github.com/erigontech/mdbx-go v0.39.17/go.mod h1:VTGwYzepS9Wg38jVfreOsSVlh73OBGPZluu7kHo6X6g=
349351
github.com/erigontech/secp256k1 v1.2.1-0.20260218182123-377cc1bd6410 h1:5YD7JJ5PaqOdjKA84lTDtQby9nbI4podqrkUhyIyFDw=

node/rpcstack.go

Lines changed: 172 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package node
2121

2222
import (
23+
"bytes"
2324
"compress/gzip"
2425
"context"
2526
"errors"
@@ -28,10 +29,13 @@ import (
2829
"net"
2930
"net/http"
3031
"slices"
32+
"strconv"
3133
"strings"
3234
"sync"
3335
"sync/atomic"
3436

37+
libdeflate "github.com/erigontech/go-libdeflate"
38+
3539
"github.com/rs/cors"
3640

3741
"github.com/erigontech/erigon/common/log/v3"
@@ -508,25 +512,181 @@ func (h *virtualHostHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
508512
http.Error(w, "invalid host specified", http.StatusForbidden)
509513
}
510514

515+
// gzPoolBufCap is the maximum buffer capacity retained in pools to bound RSS growth.
516+
const gzPoolBufCap = 1 << 20
517+
518+
// minGzipBodySize is the minimum response body size to compress. Responses
519+
// smaller than this are sent as-is: gzip framing overhead would exceed savings.
520+
const minGzipBodySize = 1024
521+
511522
var gzPool = sync.Pool{
523+
New: func() any { w, _ := gzip.NewWriterLevel(io.Discard, gzip.BestSpeed); return w },
524+
}
525+
526+
var libdeflateWarnOnce sync.Once
527+
var libdeflateCompressWarnOnce sync.Once
528+
var libdeflateDisabled atomic.Bool
529+
530+
var gzCompressorPool = sync.Pool{
512531
New: func() any {
513-
w := gzip.NewWriter(io.Discard)
514-
return w
532+
c, err := libdeflate.NewCompressor(libdeflate.DefaultCompression)
533+
if err != nil {
534+
libdeflateDisabled.Store(true)
535+
libdeflateWarnOnce.Do(func() {
536+
log.Warn("libdeflate unavailable, falling back to stdlib gzip", "err", err)
537+
})
538+
return nil
539+
}
540+
return c
515541
},
516542
}
517543

544+
var gzBufPool = sync.Pool{
545+
New: func() any { return new(bytes.Buffer) },
546+
}
547+
548+
var gzDstPool = sync.Pool{
549+
New: func() any { return make([]byte, 0, 64*1024) },
550+
}
551+
552+
func getBuf() *bytes.Buffer {
553+
buf := gzBufPool.Get().(*bytes.Buffer)
554+
buf.Reset()
555+
return buf
556+
}
557+
558+
func putBuf(buf *bytes.Buffer) {
559+
if buf.Cap() <= gzPoolBufCap {
560+
gzBufPool.Put(buf)
561+
}
562+
}
563+
564+
func putDst(dst []byte) {
565+
if cap(dst) <= gzPoolBufCap {
566+
gzDstPool.Put(dst)
567+
}
568+
}
569+
570+
func gzDstGrow(b []byte, wantLen int) []byte {
571+
if cap(b) >= wantLen {
572+
return b[:wantLen]
573+
}
574+
return make([]byte, wantLen, max(wantLen, 2*cap(b)))
575+
}
576+
518577
type gzipResponseWriter struct {
519-
io.Writer
578+
buf *bytes.Buffer
579+
gzw *gzip.Writer
580+
status int
520581
http.ResponseWriter
521582
}
522583

523584
func (w *gzipResponseWriter) WriteHeader(status int) {
524-
w.Header().Del("Content-Length")
525-
w.ResponseWriter.WriteHeader(status)
585+
if w.gzw != nil {
586+
w.ResponseWriter.WriteHeader(status)
587+
} else {
588+
w.status = status
589+
}
526590
}
527591

528592
func (w *gzipResponseWriter) Write(b []byte) (int, error) {
529-
return w.Writer.Write(b)
593+
if w.gzw != nil {
594+
return w.gzw.Write(b)
595+
}
596+
return w.buf.Write(b)
597+
}
598+
599+
// Flush switches to streaming gzip on first call; subsequent calls flush incrementally.
600+
func (w *gzipResponseWriter) Flush() {
601+
if w.gzw == nil {
602+
w.ResponseWriter.Header().Set("Content-Encoding", "gzip")
603+
w.ResponseWriter.Header().Del("Content-Length")
604+
if w.status != 0 {
605+
w.ResponseWriter.WriteHeader(w.status)
606+
}
607+
w.gzw = gzPool.Get().(*gzip.Writer)
608+
w.gzw.Reset(w.ResponseWriter)
609+
if w.buf.Len() > 0 {
610+
_, _ = w.gzw.Write(w.buf.Bytes())
611+
w.buf.Reset()
612+
}
613+
}
614+
_ = w.gzw.Flush()
615+
if f, ok := w.ResponseWriter.(http.Flusher); ok {
616+
f.Flush()
617+
}
618+
}
619+
620+
func writeStdlibGzip(w http.ResponseWriter, src []byte, status int) {
621+
gz := gzPool.Get().(*gzip.Writer)
622+
defer gzPool.Put(gz)
623+
gz.Reset(w)
624+
w.Header().Set("Content-Encoding", "gzip")
625+
w.Header().Del("Content-Length")
626+
if status != 0 {
627+
w.WriteHeader(status)
628+
}
629+
_, _ = gz.Write(src)
630+
_ = gz.Close()
631+
}
632+
633+
// compressLibdeflate tries to compress src with libdeflate and write the response.
634+
// Returns false if libdeflate is unavailable or compression fails; the caller
635+
// should then fall back to writeStdlibGzip.
636+
func compressLibdeflate(w http.ResponseWriter, src []byte, status int) bool {
637+
if libdeflateDisabled.Load() {
638+
return false
639+
}
640+
raw := gzCompressorPool.Get()
641+
if raw == nil {
642+
return false
643+
}
644+
c := raw.(*libdeflate.Compressor)
645+
defer gzCompressorPool.Put(c)
646+
647+
dst := gzDstPool.Get().([]byte)
648+
dst = gzDstGrow(dst, c.GzipCompressBound(len(src)))
649+
defer putDst(dst)
650+
651+
n, err := c.CompressGzip(dst, src)
652+
if err != nil {
653+
libdeflateCompressWarnOnce.Do(func() {
654+
log.Warn("libdeflate compression failed, falling back to stdlib gzip", "err", err)
655+
})
656+
return false
657+
}
658+
659+
w.Header().Set("Content-Encoding", "gzip")
660+
w.Header().Set("Content-Length", strconv.Itoa(n))
661+
if status != 0 {
662+
w.WriteHeader(status)
663+
}
664+
w.Write(dst[:n]) //nolint:errcheck
665+
return true
666+
}
667+
668+
func sendGzipResponse(w http.ResponseWriter, grw *gzipResponseWriter) {
669+
defer putBuf(grw.buf)
670+
671+
if grw.gzw != nil {
672+
defer gzPool.Put(grw.gzw)
673+
defer grw.gzw.Close() //nolint:errcheck
674+
return
675+
}
676+
677+
src := grw.buf.Bytes()
678+
if len(src) < minGzipBodySize {
679+
w.Header().Set("Content-Length", strconv.Itoa(len(src)))
680+
if grw.status != 0 {
681+
w.WriteHeader(grw.status)
682+
}
683+
w.Write(src) //nolint:errcheck
684+
return
685+
}
686+
687+
if !compressLibdeflate(w, src, grw.status) {
688+
writeStdlibGzip(w, src, grw.status)
689+
}
530690
}
531691

532692
func newGzipHandler(next http.Handler) http.Handler {
@@ -536,15 +696,12 @@ func newGzipHandler(next http.Handler) http.Handler {
536696
return
537697
}
538698

539-
w.Header().Set("Content-Encoding", "gzip")
540-
541-
gz := gzPool.Get().(*gzip.Writer)
542-
defer gzPool.Put(gz)
543-
544-
gz.Reset(w)
545-
defer gz.Close()
546-
547-
next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
699+
grw := &gzipResponseWriter{buf: getBuf(), ResponseWriter: w}
700+
// The hook activates streaming mode before the first write; absent when gzip
701+
// is off so it cannot prematurely commit HTTP headers (e.g. 200 before 503).
702+
r = r.WithContext(rpc.WithGzipStreamingHook(r.Context(), grw.Flush))
703+
next.ServeHTTP(grw, r)
704+
sendGzipResponse(w, grw)
548705
})
549706
}
550707

0 commit comments

Comments
 (0)