Skip to content

Commit 2ab4295

Browse files
authored
feat(blocklog): implement reth slow_block log parser (#140)
Parse reth's `reth::slow_block: Slow block` key=value log lines into the nested JSON structure expected by the block log collector and indexer. Strips ANSI escape codes that reth embeds in container output.
1 parent 090d915 commit 2ab4295

2 files changed

Lines changed: 302 additions & 6 deletions

File tree

pkg/blocklog/reth.go

Lines changed: 137 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,159 @@ package blocklog
22

33
import (
44
"encoding/json"
5+
"regexp"
6+
"strconv"
7+
"strings"
58

69
"github.com/ethpandaops/benchmarkoor/pkg/client"
710
)
811

9-
// rethParser is a stub parser for Reth client logs.
10-
// Returns nil, false until the log format is known.
12+
// rethLogPattern matches reth slow_block log lines (after ANSI stripping).
13+
// Format: <timestamp> WARN reth::slow_block: Slow block <key=value pairs>
14+
var rethLogPattern = regexp.MustCompile(
15+
`^\S+\s+WARN\s+reth::slow_block:\s+Slow block\s+(.+)$`,
16+
)
17+
18+
// ansiPattern matches ANSI escape sequences (colors, styles, etc.).
19+
var ansiPattern = regexp.MustCompile(`\x1b\[[0-9;]*m`)
20+
21+
// rethParser parses key=value pairs from Reth client slow_block logs.
1122
type rethParser struct{}
1223

13-
// NewRethParser creates a new Reth log parser (stub).
24+
// NewRethParser creates a new Reth log parser.
1425
func NewRethParser() Parser {
1526
return &rethParser{}
1627
}
1728

1829
// Ensure interface compliance.
1930
var _ Parser = (*rethParser)(nil)
2031

21-
// ParseLine is a stub that always returns nil, false.
22-
func (p *rethParser) ParseLine(_ string) (json.RawMessage, bool) {
23-
return nil, false
32+
// ParseLine extracts metrics from a Reth slow_block log line and
33+
// returns them as a nested JSON structure.
34+
func (p *rethParser) ParseLine(line string) (json.RawMessage, bool) {
35+
// Strip ANSI escape codes — reth logs include color/style sequences.
36+
line = ansiPattern.ReplaceAllString(line, "")
37+
38+
matches := rethLogPattern.FindStringSubmatch(line)
39+
if len(matches) < 2 {
40+
return nil, false
41+
}
42+
43+
kvPart := matches[1]
44+
result := map[string]any{
45+
"level": "warn",
46+
"msg": "Slow block",
47+
}
48+
49+
for _, token := range parseKVTokens(kvPart) {
50+
key, value, ok := parseKVPair(token)
51+
if !ok {
52+
continue
53+
}
54+
55+
setNested(result, strings.Split(key, "."), value)
56+
}
57+
58+
data, err := json.Marshal(result)
59+
if err != nil {
60+
return nil, false
61+
}
62+
63+
return json.RawMessage(data), true
2464
}
2565

2666
// ClientType returns the client type.
2767
func (p *rethParser) ClientType() client.ClientType {
2868
return client.ClientReth
2969
}
70+
71+
// parseKVTokens splits a key=value string into individual tokens,
72+
// handling quoted values that may contain spaces.
73+
func parseKVTokens(s string) []string {
74+
var tokens []string
75+
76+
var current strings.Builder
77+
78+
inQuote := false
79+
80+
for i := 0; i < len(s); i++ {
81+
ch := s[i]
82+
83+
switch {
84+
case ch == '"':
85+
inQuote = !inQuote
86+
current.WriteByte(ch)
87+
case ch == ' ' && !inQuote:
88+
if current.Len() > 0 {
89+
tokens = append(tokens, current.String())
90+
current.Reset()
91+
}
92+
default:
93+
current.WriteByte(ch)
94+
}
95+
}
96+
97+
if current.Len() > 0 {
98+
tokens = append(tokens, current.String())
99+
}
100+
101+
return tokens
102+
}
103+
104+
// parseKVPair splits a "key=value" token and parses the value into
105+
// the appropriate Go type (int64, float64, or string).
106+
func parseKVPair(token string) (string, any, bool) {
107+
key, raw, ok := strings.Cut(token, "=")
108+
if !ok {
109+
return "", nil, false
110+
}
111+
112+
// Strip surrounding quotes.
113+
raw = strings.Trim(raw, "\"")
114+
115+
return key, parseValue(raw), true
116+
}
117+
118+
// parseValue attempts to parse a string as int64, then float64,
119+
// falling back to string.
120+
func parseValue(s string) any {
121+
if i, err := strconv.ParseInt(s, 10, 64); err == nil {
122+
return i
123+
}
124+
125+
if f, err := strconv.ParseFloat(s, 64); err == nil {
126+
return f
127+
}
128+
129+
return s
130+
}
131+
132+
// setNested inserts a value into a nested map structure following
133+
// the given key path (e.g. ["block", "number"] → {"block": {"number": v}}).
134+
func setNested(m map[string]any, keys []string, value any) {
135+
for i, key := range keys {
136+
if i == len(keys)-1 {
137+
m[key] = value
138+
139+
return
140+
}
141+
142+
sub, ok := m[key]
143+
if !ok {
144+
child := make(map[string]any, 4)
145+
m[key] = child
146+
m = child
147+
148+
continue
149+
}
150+
151+
child, ok := sub.(map[string]any)
152+
if !ok {
153+
// Key already has a non-map value; overwrite with map.
154+
child = make(map[string]any, 4)
155+
m[key] = child
156+
}
157+
158+
m = child
159+
}
160+
}

pkg/blocklog/reth_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package blocklog
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestRethParser_ParseLine(t *testing.T) {
12+
parser := NewRethParser()
13+
14+
tests := []struct {
15+
name string
16+
line string
17+
wantOK bool
18+
// checkJSON is called when wantOK is true to verify the parsed output.
19+
checkJSON func(t *testing.T, data map[string]any)
20+
}{
21+
{
22+
name: "valid slow_block line with all fields",
23+
line: `2026-03-10T10:29:20.098444Z WARN reth::slow_block: Slow block block.number=1 block.hash=0xc957abc123 block.gas_used=100000000 block.tx_count=6 timing.execution_ms=91 timing.state_read_ms=5 timing.state_hash_ms=10 timing.commit_ms=3 timing.total_ms=109 throughput.mgas_per_sec="1091.46" state_reads.accounts=8 state_reads.storage_slots=12 state_reads.code=2 state_reads.code_bytes=1024 state_writes.accounts=4 state_writes.accounts_deleted=0 state_writes.storage_slots=6 state_writes.code=1 cache.account.hits=1 cache.account.misses=7 cache.storage.hits=3 cache.storage.misses=9 cache.code.hits=0 cache.code.misses=2`,
24+
wantOK: true,
25+
checkJSON: func(t *testing.T, data map[string]any) {
26+
t.Helper()
27+
28+
assert.Equal(t, "warn", data["level"])
29+
assert.Equal(t, "Slow block", data["msg"])
30+
31+
block := data["block"].(map[string]any)
32+
assert.Equal(t, float64(1), block["number"])
33+
assert.Equal(t, "0xc957abc123", block["hash"])
34+
assert.Equal(t, float64(100000000), block["gas_used"])
35+
assert.Equal(t, float64(6), block["tx_count"])
36+
37+
timing := data["timing"].(map[string]any)
38+
assert.Equal(t, float64(91), timing["execution_ms"])
39+
assert.Equal(t, float64(5), timing["state_read_ms"])
40+
assert.Equal(t, float64(10), timing["state_hash_ms"])
41+
assert.Equal(t, float64(3), timing["commit_ms"])
42+
assert.Equal(t, float64(109), timing["total_ms"])
43+
44+
throughput := data["throughput"].(map[string]any)
45+
assert.Equal(t, 1091.46, throughput["mgas_per_sec"])
46+
47+
stateReads := data["state_reads"].(map[string]any)
48+
assert.Equal(t, float64(8), stateReads["accounts"])
49+
assert.Equal(t, float64(12), stateReads["storage_slots"])
50+
assert.Equal(t, float64(2), stateReads["code"])
51+
assert.Equal(t, float64(1024), stateReads["code_bytes"])
52+
53+
stateWrites := data["state_writes"].(map[string]any)
54+
assert.Equal(t, float64(4), stateWrites["accounts"])
55+
assert.Equal(t, float64(0), stateWrites["accounts_deleted"])
56+
assert.Equal(t, float64(6), stateWrites["storage_slots"])
57+
assert.Equal(t, float64(1), stateWrites["code"])
58+
59+
cache := data["cache"].(map[string]any)
60+
account := cache["account"].(map[string]any)
61+
assert.Equal(t, float64(1), account["hits"])
62+
assert.Equal(t, float64(7), account["misses"])
63+
64+
storage := cache["storage"].(map[string]any)
65+
assert.Equal(t, float64(3), storage["hits"])
66+
assert.Equal(t, float64(9), storage["misses"])
67+
68+
code := cache["code"].(map[string]any)
69+
assert.Equal(t, float64(0), code["hits"])
70+
assert.Equal(t, float64(2), code["misses"])
71+
},
72+
},
73+
{
74+
name: "quoted float values parsed correctly",
75+
line: `2026-03-10T10:29:20.098444Z WARN reth::slow_block: Slow block throughput.mgas_per_sec="12.50" timing.execution_ms="91.3"`,
76+
wantOK: true,
77+
checkJSON: func(t *testing.T, data map[string]any) {
78+
t.Helper()
79+
80+
throughput := data["throughput"].(map[string]any)
81+
assert.Equal(t, 12.50, throughput["mgas_per_sec"])
82+
83+
timing := data["timing"].(map[string]any)
84+
assert.Equal(t, 91.3, timing["execution_ms"])
85+
},
86+
},
87+
{
88+
name: "non-slow-block reth log line",
89+
line: `2026-03-10T10:29:20.098444Z INFO reth::engine: Block received block.number=1`,
90+
wantOK: false,
91+
},
92+
{
93+
name: "empty line",
94+
line: "",
95+
wantOK: false,
96+
},
97+
{
98+
name: "random text",
99+
line: "some random log output that does not match",
100+
wantOK: false,
101+
},
102+
{
103+
name: "line with ANSI escape codes",
104+
line: "\x1b[2m2026-03-10T10:50:19.731231Z\x1b[0m \x1b[33m WARN\x1b[0m \x1b[2mreth::slow_block\x1b[0m\x1b[2m:\x1b[0m Slow block \x1b[3mblock.number\x1b[0m\x1b[2m=\x1b[0m1 \x1b[3mblock.hash\x1b[0m\x1b[2m=\x1b[0m0x9f566dc9f8beb533db8611872f4ed57847d147224b59586d2c86e1bf957b8809 \x1b[3mblock.gas_used\x1b[0m\x1b[2m=\x1b[0m184074778176 \x1b[3mblock.tx_count\x1b[0m\x1b[2m=\x1b[0m12339 \x1b[3mtiming.execution_ms\x1b[0m\x1b[2m=\x1b[0m2783 \x1b[3mthroughput.mgas_per_sec\x1b[0m\x1b[2m=\x1b[0m\"66126.09\"",
105+
wantOK: true,
106+
checkJSON: func(t *testing.T, data map[string]any) {
107+
t.Helper()
108+
109+
assert.Equal(t, "warn", data["level"])
110+
assert.Equal(t, "Slow block", data["msg"])
111+
112+
block := data["block"].(map[string]any)
113+
assert.Equal(t, float64(1), block["number"])
114+
assert.Equal(t, "0x9f566dc9f8beb533db8611872f4ed57847d147224b59586d2c86e1bf957b8809", block["hash"])
115+
assert.Equal(t, float64(184074778176), block["gas_used"])
116+
assert.Equal(t, float64(12339), block["tx_count"])
117+
118+
timing := data["timing"].(map[string]any)
119+
assert.Equal(t, float64(2783), timing["execution_ms"])
120+
121+
throughput := data["throughput"].(map[string]any)
122+
assert.Equal(t, 66126.09, throughput["mgas_per_sec"])
123+
},
124+
},
125+
{
126+
name: "extra unknown fields are preserved",
127+
line: `2026-03-10T10:29:20.098444Z WARN reth::slow_block: Slow block block.number=42 eip7702_delegations.set=5`,
128+
wantOK: true,
129+
checkJSON: func(t *testing.T, data map[string]any) {
130+
t.Helper()
131+
132+
block := data["block"].(map[string]any)
133+
assert.Equal(t, float64(42), block["number"])
134+
135+
eip := data["eip7702_delegations"].(map[string]any)
136+
assert.Equal(t, float64(5), eip["set"])
137+
},
138+
},
139+
}
140+
141+
for _, tt := range tests {
142+
t.Run(tt.name, func(t *testing.T) {
143+
result, ok := parser.ParseLine(tt.line)
144+
145+
assert.Equal(t, tt.wantOK, ok)
146+
147+
if tt.wantOK {
148+
require.NotNil(t, result)
149+
150+
var parsed map[string]any
151+
err := json.Unmarshal(result, &parsed)
152+
require.NoError(t, err)
153+
154+
tt.checkJSON(t, parsed)
155+
} else {
156+
assert.Nil(t, result)
157+
}
158+
})
159+
}
160+
}
161+
162+
func TestRethParser_ClientType(t *testing.T) {
163+
parser := NewRethParser()
164+
assert.Equal(t, "reth", string(parser.ClientType()))
165+
}

0 commit comments

Comments
 (0)