Skip to content

Commit d5a2b2d

Browse files
authored
Batch merkle service watch calls (#146)
* Batch merkle service watch calls * Fix linter * Fix linting on new golangci
1 parent 7dab540 commit d5a2b2d

4 files changed

Lines changed: 359 additions & 27 deletions

File tree

merkleservice/client_test.go

Lines changed: 191 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ import (
1313
"time"
1414
)
1515

16+
// testCallbackURL is a sentinel callback URL used across registration tests.
17+
// Centralized so goconst stays quiet — the URL itself is never dialed.
18+
const testCallbackURL = "http://cb"
19+
1620
func TestRegister(t *testing.T) {
1721
var gotBody map[string]string
1822
var gotAuth string
@@ -126,7 +130,7 @@ func TestRegisterBatch_AllSucceed(t *testing.T) {
126130
client := NewClient(server.URL, "", 0)
127131
regs := make([]Registration, 10)
128132
for i := range regs {
129-
regs[i] = Registration{TxID: "tx" + string(rune('0'+i)), CallbackURL: "http://cb"}
133+
regs[i] = Registration{TxID: "tx" + string(rune('0'+i)), CallbackURL: testCallbackURL}
130134
}
131135

132136
err := client.RegisterBatch(context.Background(), regs, 5)
@@ -153,7 +157,7 @@ func TestRegisterBatch_FailFast(t *testing.T) {
153157

154158
client := NewClient(server.URL, "", 0)
155159
regs := []Registration{
156-
{TxID: "fail-tx", CallbackURL: "http://cb"},
160+
{TxID: "fail-tx", CallbackURL: testCallbackURL},
157161
}
158162

159163
err := client.RegisterBatch(context.Background(), regs, 1)
@@ -184,7 +188,7 @@ func TestRegisterBatch_ConcurrencyBounded(t *testing.T) {
184188
client := NewClient(server.URL, "", 0)
185189
regs := make([]Registration, 50)
186190
for i := range regs {
187-
regs[i] = Registration{TxID: "tx", CallbackURL: "http://cb"}
191+
regs[i] = Registration{TxID: "tx", CallbackURL: testCallbackURL}
188192
}
189193

190194
err := client.RegisterBatch(context.Background(), regs, 3)
@@ -204,6 +208,186 @@ func TestRegisterBatch_EmptyReturnsNil(t *testing.T) {
204208
}
205209
}
206210

211+
func TestRegisterBatchWithResults_AllSucceed(t *testing.T) {
212+
var count atomic.Int32
213+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
214+
count.Add(1)
215+
w.WriteHeader(http.StatusOK)
216+
}))
217+
defer server.Close()
218+
219+
client := NewClient(server.URL, "", 0)
220+
regs := make([]Registration, 10)
221+
for i := range regs {
222+
regs[i] = Registration{TxID: "tx" + string(rune('0'+i)), CallbackURL: testCallbackURL}
223+
}
224+
225+
errs := client.RegisterBatchWithResults(context.Background(), regs, 5)
226+
if len(errs) != len(regs) {
227+
t.Fatalf("expected len(errs)=%d, got %d", len(regs), len(errs))
228+
}
229+
for i, e := range errs {
230+
if e != nil {
231+
t.Errorf("errs[%d]=%v want nil", i, e)
232+
}
233+
}
234+
if count.Load() != 10 {
235+
t.Errorf("expected 10 requests, got %d", count.Load())
236+
}
237+
}
238+
239+
// TestRegisterBatchWithResults_PartialFailure pins the key contract difference
240+
// versus RegisterBatch: this variant does NOT fail-fast — successes survive a
241+
// sibling's failure so the propagator can partition the batch and broadcast
242+
// the successes while routing failures to PENDING_RETRY.
243+
func TestRegisterBatchWithResults_PartialFailure(t *testing.T) {
244+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
245+
body, _ := io.ReadAll(r.Body)
246+
if strings.Contains(string(body), "fail") {
247+
w.WriteHeader(http.StatusInternalServerError)
248+
return
249+
}
250+
w.WriteHeader(http.StatusOK)
251+
}))
252+
defer server.Close()
253+
254+
client := NewClient(server.URL, "", 0)
255+
regs := []Registration{
256+
{TxID: "ok-1", CallbackURL: testCallbackURL},
257+
{TxID: "fail-1", CallbackURL: testCallbackURL},
258+
{TxID: "ok-2", CallbackURL: testCallbackURL},
259+
{TxID: "fail-2", CallbackURL: testCallbackURL},
260+
{TxID: "ok-3", CallbackURL: testCallbackURL},
261+
}
262+
263+
errs := client.RegisterBatchWithResults(context.Background(), regs, 3)
264+
if len(errs) != len(regs) {
265+
t.Fatalf("expected len(errs)=%d, got %d", len(regs), len(errs))
266+
}
267+
wantNil := []int{0, 2, 4}
268+
wantErr := []int{1, 3}
269+
for _, i := range wantNil {
270+
if errs[i] != nil {
271+
t.Errorf("errs[%d]=%v want nil (tx %s)", i, errs[i], regs[i].TxID)
272+
}
273+
}
274+
for _, i := range wantErr {
275+
if errs[i] == nil {
276+
t.Errorf("errs[%d]=nil want non-nil (tx %s)", i, regs[i].TxID)
277+
}
278+
}
279+
}
280+
281+
func TestRegisterBatchWithResults_AllFail(t *testing.T) {
282+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
283+
w.WriteHeader(http.StatusInternalServerError)
284+
}))
285+
defer server.Close()
286+
287+
client := NewClient(server.URL, "", 0)
288+
regs := make([]Registration, 4)
289+
for i := range regs {
290+
regs[i] = Registration{TxID: "tx", CallbackURL: testCallbackURL}
291+
}
292+
293+
errs := client.RegisterBatchWithResults(context.Background(), regs, 2)
294+
if len(errs) != len(regs) {
295+
t.Fatalf("expected len(errs)=%d, got %d", len(regs), len(errs))
296+
}
297+
for i, e := range errs {
298+
if e == nil {
299+
t.Errorf("errs[%d]=nil want non-nil", i)
300+
}
301+
}
302+
}
303+
304+
// TestRegisterBatchWithResults_ContextCanceled verifies that a cancellation
305+
// mid-batch still produces a per-index error slice the caller can partition —
306+
// the propagator relies on len(errs)==len(regs) to align failures with
307+
// pendingMsgs entries when routing to PENDING_RETRY.
308+
func TestRegisterBatchWithResults_ContextCanceled(t *testing.T) {
309+
release := make(chan struct{})
310+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
311+
select {
312+
case <-release:
313+
case <-r.Context().Done():
314+
}
315+
w.WriteHeader(http.StatusOK)
316+
}))
317+
defer server.Close()
318+
defer close(release)
319+
320+
client := NewClient(server.URL, "", 0)
321+
regs := make([]Registration, 8)
322+
for i := range regs {
323+
regs[i] = Registration{TxID: "tx", CallbackURL: testCallbackURL}
324+
}
325+
326+
ctx, cancel := context.WithCancel(context.Background())
327+
cancel()
328+
329+
errs := client.RegisterBatchWithResults(ctx, regs, 2)
330+
if len(errs) != len(regs) {
331+
t.Fatalf("expected len(errs)=%d, got %d", len(regs), len(errs))
332+
}
333+
var sawErr bool
334+
for _, e := range errs {
335+
if e != nil {
336+
sawErr = true
337+
break
338+
}
339+
}
340+
if !sawErr {
341+
t.Errorf("expected at least one non-nil error after context cancel, got all nil")
342+
}
343+
}
344+
345+
func TestRegisterBatchWithResults_EmptyReturnsNil(t *testing.T) {
346+
client := NewClient("http://unused", "", 0)
347+
errs := client.RegisterBatchWithResults(context.Background(), nil, 5)
348+
if errs != nil {
349+
t.Errorf("expected nil for empty batch, got %v", errs)
350+
}
351+
}
352+
353+
// TestRegisterBatchWithResults_ConcurrencyBounded covers the semaphore code
354+
// path at client.go:236-251, which is structurally different from
355+
// RegisterBatch's errgroup.SetLimit path and warrants its own assertion.
356+
func TestRegisterBatchWithResults_ConcurrencyBounded(t *testing.T) {
357+
var concurrent atomic.Int32
358+
var maxConcurrent atomic.Int32
359+
360+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
361+
cur := concurrent.Add(1)
362+
for {
363+
old := maxConcurrent.Load()
364+
if cur <= old || maxConcurrent.CompareAndSwap(old, cur) {
365+
break
366+
}
367+
}
368+
time.Sleep(5 * time.Millisecond)
369+
concurrent.Add(-1)
370+
w.WriteHeader(http.StatusOK)
371+
}))
372+
defer server.Close()
373+
374+
client := NewClient(server.URL, "", 0)
375+
regs := make([]Registration, 30)
376+
for i := range regs {
377+
regs[i] = Registration{TxID: "tx", CallbackURL: testCallbackURL}
378+
}
379+
380+
errs := client.RegisterBatchWithResults(context.Background(), regs, 3)
381+
for i, e := range errs {
382+
if e != nil {
383+
t.Errorf("errs[%d]=%v want nil", i, e)
384+
}
385+
}
386+
if maxConcurrent.Load() > 3 {
387+
t.Errorf("expected max concurrency <= 3, got %d", maxConcurrent.Load())
388+
}
389+
}
390+
207391
func TestReprocess_AcceptedReturnsNil(t *testing.T) {
208392
var gotPath, gotAuth string
209393
var gotBody map[string]string
@@ -248,7 +432,7 @@ func TestReprocess_OmitsEmptyCallbackToken(t *testing.T) {
248432
defer server.Close()
249433

250434
client := NewClient(server.URL, "", 0)
251-
if err := client.Reprocess(context.Background(), "blockhash123", "http://cb", ""); err != nil {
435+
if err := client.Reprocess(context.Background(), "blockhash123", testCallbackURL, ""); err != nil {
252436
t.Fatalf("unexpected error: %v", err)
253437
}
254438
if strings.Contains(string(rawBody), "callbackToken") {
@@ -264,7 +448,7 @@ func TestReprocess_4xxReturnsTypedFailure(t *testing.T) {
264448
defer server.Close()
265449

266450
client := NewClient(server.URL, "", 0)
267-
err := client.Reprocess(context.Background(), "blockhash123", "http://cb", "")
451+
err := client.Reprocess(context.Background(), "blockhash123", testCallbackURL, "")
268452
if err == nil {
269453
t.Fatal("expected error, got nil")
270454
}
@@ -287,7 +471,7 @@ func TestReprocess_5xxReturnsTypedFailure(t *testing.T) {
287471
defer server.Close()
288472

289473
client := NewClient(server.URL, "", 0)
290-
err := client.Reprocess(context.Background(), "blockhash123", "http://cb", "")
474+
err := client.Reprocess(context.Background(), "blockhash123", testCallbackURL, "")
291475
if err == nil {
292476
t.Fatal("expected error, got nil")
293477
}
@@ -310,7 +494,7 @@ func TestReprocess_ContextCanceled(t *testing.T) {
310494
client := NewClient(server.URL, "", 100*time.Millisecond)
311495
ctx, cancel := context.WithCancel(context.Background())
312496
cancel()
313-
err := client.Reprocess(ctx, "blockhash", "http://cb", "")
497+
err := client.Reprocess(ctx, "blockhash", testCallbackURL, "")
314498
if err == nil {
315499
t.Fatal("expected error from canceled context")
316500
}

metrics/metrics.go

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ import (
4141
"github.com/prometheus/client_golang/prometheus/promauto"
4242
)
4343

44+
// labelOutcome is the conventional label name for counters that partition
45+
// their measurements by a coarse success/failure-class enum. Centralized so
46+
// every metric uses the same label key (and goconst stays quiet).
47+
const labelOutcome = "outcome"
48+
4449
// Standard latency buckets for histograms measuring durations from very
4550
// short (DB lookup, validate) up to long (reaper tick, bump build).
4651
var latencyBuckets = []float64{
@@ -83,7 +88,7 @@ var TxValidatorFlushDuration = promauto.NewHistogramVec(prometheus.HistogramOpts
8388
Name: "arcade_tx_validator_flush_duration_seconds",
8489
Help: "Wall time of one tx_validator flush window, by outcome.",
8590
Buckets: latencyBuckets,
86-
}, []string{"outcome"}) // success, publish_failed
91+
}, []string{labelOutcome}) // success, publish_failed
8792

8893
// TxValidatorFlushSize captures how many txs each flush processed. Tracking the
8994
// distribution surfaces whether parallelism is being applied at all (a stuck
@@ -98,7 +103,7 @@ var TxValidatorFlushSize = promauto.NewHistogram(prometheus.HistogramOpts{
98103
var TxValidatorOutcomeTotal = promauto.NewCounterVec(prometheus.CounterOpts{
99104
Name: "arcade_tx_validator_outcome_total",
100105
Help: "Per-tx validation outcome counts.",
101-
}, []string{"outcome"}) // accepted, rejected, duplicate, parse_fail, store_error
106+
}, []string{labelOutcome}) // accepted, rejected, duplicate, parse_fail, store_error
102107

103108
// ---------------------------------------------------------------------------
104109
// propagation
@@ -147,7 +152,7 @@ var PropagationBroadcastDuration = promauto.NewHistogramVec(prometheus.Histogram
147152
var PropagationOutcomeTotal = promauto.NewCounterVec(prometheus.CounterOpts{
148153
Name: "arcade_propagation_outcome_total",
149154
Help: "Per-tx propagation outcome counts.",
150-
}, []string{"outcome"}) // accepted, rejected, retryable, no_verdict
155+
}, []string{labelOutcome}) // accepted, rejected, retryable, no_verdict
151156

152157
// PropagationChunkTotal counts how many chunk broadcasts were issued. Combined
153158
// with PropagationBatchSize this surfaces whether teranode_max_batch_size is
@@ -163,28 +168,41 @@ var PropagationChunkTotal = promauto.NewCounterVec(prometheus.CounterOpts{
163168
var PropagationInlineRetryTotal = promauto.NewCounterVec(prometheus.CounterOpts{
164169
Name: "arcade_propagation_inline_retry_total",
165170
Help: "Inline retry attempts on broadcastSingleToEndpoints.",
166-
}, []string{"outcome"}) // recovered, exhausted
171+
}, []string{labelOutcome}) // recovered, exhausted
167172

168-
// PropagationMerkleRegisterDuration measures the merkle-service per-message
169-
// registration round-trip. Slow merkle calls are a common bottleneck.
173+
// PropagationMerkleRegisterDuration measures the merkle-service registration
174+
// wall time for one flushBatch — a single bounded-concurrency fan-out over
175+
// every tx in the batch, observed once per batch. Slow merkle calls are a
176+
// common bottleneck; under burst ingest this histogram is the canonical
177+
// p50/p99 signal.
170178
var PropagationMerkleRegisterDuration = promauto.NewHistogram(prometheus.HistogramOpts{
171179
Name: "arcade_propagation_merkle_register_duration_seconds",
172-
Help: "Duration of merkle-service Register calls.",
180+
Help: "Wall time of one batched merkle-service registration fan-out.",
173181
Buckets: latencyBuckets,
174182
})
175183

176-
// PropagationMerkleRegisterFailures counts per-message merkle-service
177-
// registration failures by reason. Sustained values indicate the merkle
178-
// service is unhealthy — without this metric a registration outage was
179-
// previously masked by silent broadcast continuation. Reasons map to the
180-
// failure mode observed by handleMessage; today only "register_error" is
181-
// emitted, but the label is kept open so future error-class splits (e.g.
182-
// "timeout", "5xx", "auth") can be added without renaming the metric.
184+
// PropagationMerkleRegisterFailures counts per-tx merkle-service registration
185+
// failures by reason. Sustained values indicate the merkle service is
186+
// unhealthy — without this metric a registration outage was previously
187+
// masked by silent broadcast continuation. The label is kept open so future
188+
// error-class splits (e.g. "timeout", "5xx", "auth") can be added without
189+
// renaming the metric.
183190
var PropagationMerkleRegisterFailures = promauto.NewCounterVec(prometheus.CounterOpts{
184191
Name: "arcade_propagation_merkle_register_failures_total",
185-
Help: "Per-message merkle-service Register failures, by reason.",
192+
Help: "Per-tx merkle-service Register failures, by reason.",
186193
}, []string{"reason"})
187194

195+
// PropagationMerkleRegisterBatchOutcomeTotal counts each flushBatch's merkle
196+
// registration result. "fully_ok" = every tx registered, "partial" = some
197+
// succeeded and some routed to PENDING_RETRY, "all_failed" = nothing
198+
// broadcast this flush. Lets dashboards distinguish a one-off RTT blip
199+
// (single "partial" tick) from a sustained outage (rising "all_failed"
200+
// rate) — a signal the per-tx failure counter alone obscures.
201+
var PropagationMerkleRegisterBatchOutcomeTotal = promauto.NewCounterVec(prometheus.CounterOpts{
202+
Name: "arcade_propagation_merkle_register_batch_outcome_total",
203+
Help: "Per-batch merkle-service registration outcome.",
204+
}, []string{labelOutcome}) // fully_ok, partial, all_failed
205+
188206
// PropagationReaperLease is 1 when this pod holds the reaper lease, 0 otherwise.
189207
// In K8s, sum across pods should always equal 1 (or 0 during failover).
190208
var PropagationReaperLease = promauto.NewGauge(prometheus.GaugeOpts{
@@ -196,7 +214,7 @@ var PropagationReaperLease = promauto.NewGauge(prometheus.GaugeOpts{
196214
var PropagationReaperTickTotal = promauto.NewCounterVec(prometheus.CounterOpts{
197215
Name: "arcade_propagation_reaper_tick_total",
198216
Help: "Reaper tick outcomes.",
199-
}, []string{"outcome"}) // ran, skipped_no_leader, lease_error
217+
}, []string{labelOutcome}) // ran, skipped_no_leader, lease_error
200218

201219
// PropagationReaperReadyDepth is the count of PENDING_RETRY rows that the last
202220
// reaper tick observed as ready. Sustained high values indicate a struggling
@@ -216,7 +234,7 @@ var BumpBuilderBuildDuration = promauto.NewHistogramVec(prometheus.HistogramOpts
216234
Name: "arcade_bump_builder_build_duration_seconds",
217235
Help: "Time to build and persist one compound BUMP, by outcome.",
218236
Buckets: latencyBuckets,
219-
}, []string{"outcome"}) // success, no_stumps, fetch_failed, validation_failed, store_failed
237+
}, []string{labelOutcome}) // success, no_stumps, fetch_failed, validation_failed, store_failed
220238

221239
// BumpBuilderBlocksProcessedTotal counts BLOCK_PROCESSED messages handled.
222240
var BumpBuilderBlocksProcessedTotal = promauto.NewCounter(prometheus.CounterOpts{
@@ -291,7 +309,7 @@ var BumpBuilderUntrackedTxidsTotal = promauto.NewCounter(prometheus.CounterOpts{
291309
var WatchdogTickTotal = promauto.NewCounterVec(prometheus.CounterOpts{
292310
Name: "arcade_watchdog_tick_total",
293311
Help: "Watchdog tick outcomes.",
294-
}, []string{"outcome"}) // ran, skipped_no_leader, lease_error
312+
}, []string{labelOutcome}) // ran, skipped_no_leader, lease_error
295313

296314
// WatchdogStaleCount is the number of stale block_processing rows the last
297315
// tick observed (post-recency-window filter, pre-backoff filter).
@@ -304,7 +322,7 @@ var WatchdogStaleCount = promauto.NewGauge(prometheus.GaugeOpts{
304322
var WatchdogReprocessTotal = promauto.NewCounterVec(prometheus.CounterOpts{
305323
Name: "arcade_watchdog_reprocess_total",
306324
Help: "Watchdog /reprocess call outcomes.",
307-
}, []string{"outcome"}) // success, err_4xx, err_5xx, err_network
325+
}, []string{labelOutcome}) // success, err_4xx, err_5xx, err_network
308326

309327
// WatchdogBackoffDepth is the size of the in-memory attempts map.
310328
// Sustained growth implies blocks are persistently failing to recover —
@@ -475,7 +493,7 @@ var P2PNodeStatusMessagesTotal = promauto.NewCounter(prometheus.CounterOpts{
475493
var P2PEndpointDiscoveryTotal = promauto.NewCounterVec(prometheus.CounterOpts{
476494
Name: "arcade_p2p_endpoint_discovery_total",
477495
Help: "Datahub URL discovery outcomes from peer announcements.",
478-
}, []string{"outcome"}) // registered, duplicate, invalid, no_url
496+
}, []string{labelOutcome}) // registered, duplicate, invalid, no_url
479497

480498
// ObserveStatusClass returns the bucket label ("2xx", "3xx", "4xx", "5xx",
481499
// "transport_error") for a given HTTP status code. Used by HTTP-latency

0 commit comments

Comments
 (0)