Skip to content

Commit 70c3707

Browse files
authored
fix: broadcast fan-out waits for first success instead of first result (#42)
* fix: broadcast fan-out waits for first success instead of first result When multiple teranode endpoints are configured, the broadcast now returns on the first successful response (ACCEPTED/SENT). Rejections from one endpoint no longer short-circuit if another endpoint may accept the transaction. Status is only persisted and published once after the final result is determined. * feat: distinguish SERVICE_ERROR from REJECTED in broadcast results 4xx responses from teranode are genuine rejections (invalid tx) and return immediately. 5xx responses are service errors — other endpoints may still accept the transaction. Fan-out only waits on service errors, not rejections. * fix: parse coinbase from block data to fix merkle proof construction fetchBlockSubtrees was downloading the full block binary from the datahub but only extracting subtree hashes, discarding the coinbase transaction. The coinbase txid is needed to replace the all-FF placeholder at subtree index 0 — without it, every hash on the left side of the merkle tree is computed incorrectly, producing invalid proofs. Renamed fetchBlockSubtrees to fetchBlock, now returns a blockData struct with both SubtreeHashes and CoinbaseTxID. processBlockTransactions uses the coinbase from block data directly instead of relying on the P2P BlockMessage.Coinbase field, which may not always be set (e.g. during catch-up processing). Removed parseCoinbaseTxID as it's no longer needed. * fix: address linter issues from broadcast and coinbase changes * refactor: replace nolint:exhaustive with explicit status cases * chore: bump go-jose to v4.1.4 and grpc to v1.80.0 to fix CVE-2026-34986
1 parent d0ca1b4 commit 70c3707

6 files changed

Lines changed: 163 additions & 104 deletions

File tree

arcade.go

Lines changed: 42 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package arcade
33

44
import (
55
"context"
6-
"encoding/hex"
76
"errors"
87
"fmt"
98
"io"
@@ -37,6 +36,7 @@ var (
3736
errChaintracksNoTip = errors.New("chaintracks has no tip")
3837
errUnexpectedStatusCode = errors.New("unexpected status code")
3938
errInvalidHashSize = errors.New("invalid hash size")
39+
errMissingCoinbaseTxID = errors.New("block data missing coinbase txid")
4040
)
4141

4242
// Config holds configuration for Arcade
@@ -335,18 +335,24 @@ func (a *Arcade) processBlockMessage(ctx context.Context, blockMsg teranode.Bloc
335335
}
336336

337337
func (a *Arcade) processBlockTransactions(ctx context.Context, blockMsg teranode.BlockMessage) error { //nolint:gocyclo // complex business logic for merkle path construction
338-
subtreeHashes, err := a.fetchBlockSubtreeHashes(ctx, blockMsg.DataHubURL, blockMsg.Hash)
338+
block, err := a.fetchBlockData(ctx, blockMsg.DataHubURL, blockMsg.Hash)
339339
if err != nil {
340-
return fmt.Errorf("failed to fetch subtree hashes: %w", err)
340+
return fmt.Errorf("failed to fetch block data: %w", err)
341341
}
342342

343-
numSubtrees := len(subtreeHashes)
343+
numSubtrees := len(block.SubtreeHashes)
344344
if numSubtrees == 0 {
345345
a.logger.Debug("block has no subtrees",
346346
slog.String("hash", blockMsg.Hash))
347347
return nil
348348
}
349349

350+
if block.CoinbaseTxID == nil {
351+
a.logger.Error("block data missing coinbase txid, merkle proofs will be incorrect",
352+
slog.String("hash", blockMsg.Hash))
353+
return fmt.Errorf("%w: %s", errMissingCoinbaseTxID, blockMsg.Hash)
354+
}
355+
350356
a.logger.Debug("processing block transactions",
351357
slog.String("hash", blockMsg.Hash),
352358
slog.Int("subtrees", numSubtrees),
@@ -356,7 +362,7 @@ func (a *Arcade) processBlockTransactions(ctx context.Context, blockMsg teranode
356362
totalTxsScanned := 0
357363
totalMatched := 0
358364

359-
for subtreeIdx, subtreeHash := range subtreeHashes {
365+
for subtreeIdx, subtreeHash := range block.SubtreeHashes {
360366
txHashes, err := a.fetchSubtreeHashes(ctx, blockMsg.DataHubURL, subtreeHash.String())
361367
if err != nil {
362368
a.logger.Error("failed to fetch subtree txids",
@@ -367,12 +373,10 @@ func (a *Arcade) processBlockTransactions(ctx context.Context, blockMsg teranode
367373

368374
totalTxsScanned += len(txHashes)
369375

370-
// Subtree 0 contains a coinbase placeholder that must be replaced with the
371-
// real coinbase txid. The subtree 0 root will be computed by ComputeMissingHashes.
376+
// Subtree 0 contains a coinbase placeholder that must be replaced
377+
// with the real coinbase txid from the block data.
372378
if subtreeIdx == 0 && len(txHashes) > 0 {
373-
if coinbaseTxID := a.parseCoinbaseTxID(blockMsg); coinbaseTxID != nil {
374-
txHashes[0] = *coinbaseTxID
375-
}
379+
txHashes[0] = *block.CoinbaseTxID
376380
}
377381

378382
tracked := a.txTracker.FilterTrackedHashes(txHashes)
@@ -395,7 +399,7 @@ func (a *Arcade) processBlockTransactions(ctx context.Context, blockMsg teranode
395399
continue
396400
}
397401

398-
a.buildMerklePathsForSubtree(ctx, blockMsg, subtreeIdx, subtreeRootLayer, subtreeHashes, txHashes, tracked)
402+
a.buildMerklePathsForSubtree(ctx, blockMsg, subtreeIdx, subtreeRootLayer, block.SubtreeHashes, txHashes, tracked)
399403
}
400404

401405
a.logger.Debug("block transaction scan complete",
@@ -407,32 +411,6 @@ func (a *Arcade) processBlockTransactions(ctx context.Context, blockMsg teranode
407411
return nil
408412
}
409413

410-
// parseCoinbaseTxID extracts the txid from the coinbase transaction in a block message.
411-
// Returns nil if the coinbase cannot be parsed.
412-
func (a *Arcade) parseCoinbaseTxID(blockMsg teranode.BlockMessage) *chainhash.Hash {
413-
if blockMsg.Coinbase == "" {
414-
return nil
415-
}
416-
417-
coinbaseBytes, err := hex.DecodeString(blockMsg.Coinbase)
418-
if err != nil {
419-
a.logger.Error("failed to decode coinbase hex",
420-
slog.String("blockHash", blockMsg.Hash),
421-
slog.String("error", err.Error()))
422-
return nil
423-
}
424-
425-
tx, err := transaction.NewTransactionFromBytes(coinbaseBytes)
426-
if err != nil {
427-
a.logger.Error("failed to parse coinbase transaction",
428-
slog.String("blockHash", blockMsg.Hash),
429-
slog.String("error", err.Error()))
430-
return nil
431-
}
432-
433-
return tx.TxID()
434-
}
435-
436414
//nolint:gocyclo // complex business logic for merkle path construction
437415
func (a *Arcade) buildMerklePathsForSubtree(
438416
ctx context.Context,
@@ -939,11 +917,11 @@ func (a *Arcade) processRejectedTxMessage(ctx context.Context, rejectedMsg teran
939917

940918
// HTTP fetching methods
941919

942-
func (a *Arcade) fetchBlockSubtreeHashes(ctx context.Context, dataHubURL, blockHash string) ([]chainhash.Hash, error) {
920+
func (a *Arcade) fetchBlockData(ctx context.Context, dataHubURL, blockHash string) (*blockData, error) {
943921
url := fmt.Sprintf("%s/block/%s", strings.TrimSuffix(dataHubURL, "/"), blockHash)
944-
hashes, err := a.fetchBlockSubtrees(ctx, url)
922+
data, err := a.fetchBlock(ctx, url)
945923
if err == nil {
946-
return hashes, nil
924+
return data, nil
947925
}
948926

949927
// Try fallback URLs
@@ -952,27 +930,32 @@ func (a *Arcade) fetchBlockSubtreeHashes(ctx context.Context, dataHubURL, blockH
952930
continue // Skip if same as original
953931
}
954932
url = fmt.Sprintf("%s/block/%s", strings.TrimSuffix(fallbackURL, "/"), blockHash)
955-
hashes, fallbackErr := a.fetchBlockSubtrees(ctx, url)
933+
data, fallbackErr := a.fetchBlock(ctx, url)
956934
if fallbackErr == nil {
957-
return hashes, nil
935+
return data, nil
958936
}
959937
}
960938

961939
return nil, err // Return original error
962940
}
963941

964-
// fetchBlockSubtrees fetches a block from teranode and extracts subtree hashes from the binary format.
942+
// blockData holds the parsed result of a datahub block response.
943+
type blockData struct {
944+
SubtreeHashes []chainhash.Hash
945+
CoinbaseTxID *chainhash.Hash
946+
}
947+
948+
// fetchBlock fetches a block from a datahub and parses subtree hashes and the coinbase txid.
949+
//
965950
// Teranode block binary format:
966951
// - Block header (80 bytes)
967952
// - Transaction count (varint)
968953
// - Size in bytes (varint)
969954
// - Subtree count (varint)
970955
// - Subtree hashes (32 bytes each)
971-
// - Coinbase transaction
956+
// - Coinbase transaction (raw tx bytes)
972957
// - Height (varint)
973-
//
974-
//nolint:gocyclo // complex business logic for parsing merkle tree data
975-
func (a *Arcade) fetchBlockSubtrees(ctx context.Context, url string) ([]chainhash.Hash, error) {
958+
func (a *Arcade) fetchBlock(ctx context.Context, url string) (*blockData, error) { //nolint:gocyclo // sequential binary parsing
976959
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
977960
if err != nil {
978961
return nil, fmt.Errorf("failed to create request: %w", err)
@@ -1029,7 +1012,18 @@ func (a *Arcade) fetchBlockSubtrees(ctx context.Context, url string) ([]chainhas
10291012
hashes = append(hashes, *hash)
10301013
}
10311014

1032-
return hashes, nil
1015+
result := &blockData{SubtreeHashes: hashes}
1016+
1017+
// Parse coinbase tx from the remaining stream.
1018+
// The stream contains: coinbase tx bytes followed by a height varint.
1019+
// ReadFrom reads exactly the tx and stops, leaving the height unread.
1020+
coinbaseTx := &transaction.Transaction{}
1021+
if _, err := coinbaseTx.ReadFrom(resp.Body); err != nil {
1022+
return nil, fmt.Errorf("failed to read coinbase tx: %w", err)
1023+
}
1024+
result.CoinbaseTxID = coinbaseTx.TxID()
1025+
1026+
return result, nil
10331027
}
10341028

10351029
func (a *Arcade) fetchSubtreeHashes(ctx context.Context, dataHubURL, subtreeHash string) ([]chainhash.Hash, error) {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ require (
247247
gonum.org/v1/gonum v0.17.0 // indirect
248248
google.golang.org/genproto/googleapis/api v0.0.0-20260330182312-d5a96adf58d8 // indirect
249249
google.golang.org/genproto/googleapis/rpc v0.0.0-20260330182312-d5a96adf58d8 // indirect
250-
google.golang.org/grpc v1.79.3 // indirect
250+
google.golang.org/grpc v1.80.0 // indirect
251251
google.golang.org/protobuf v1.36.11 // indirect
252252
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
253253
gopkg.in/inf.v0 v0.9.1 // indirect

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,8 +1196,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
11961196
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
11971197
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
11981198
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
1199-
google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE=
1200-
google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
1199+
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
1200+
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
12011201
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
12021202
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
12031203
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

handlers/waitfor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ func statusReached(current, target models.Status) bool {
130130
order := map[models.Status]int{
131131
models.StatusUnknown: 0,
132132
models.StatusReceived: 1,
133+
models.StatusServiceError: 1, // transient — may still succeed
133134
models.StatusSentToNetwork: 2,
134135
models.StatusAcceptedByNetwork: 3,
135136
models.StatusSeenOnNetwork: 4,

models/transaction.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ const (
6666
StatusSeenOnNetwork = Status("SEEN_ON_NETWORK")
6767
// StatusDoubleSpendAttempted indicates a double spend was attempted.
6868
StatusDoubleSpendAttempted = Status("DOUBLE_SPEND_ATTEMPTED")
69-
// StatusRejected indicates the transaction was rejected.
69+
// StatusRejected indicates the transaction was rejected by the network (invalid tx).
7070
StatusRejected = Status("REJECTED")
71+
// StatusServiceError indicates a broadcast service failure (not a tx rejection).
72+
StatusServiceError = Status("SERVICE_ERROR")
7173
// StatusMined indicates the transaction was mined.
7274
StatusMined = Status("MINED")
7375
// StatusImmutable indicates the transaction is immutable.
@@ -84,6 +86,8 @@ func (s Status) DisallowedPreviousStatuses() []Status {
8486
return []Status{StatusSentToNetwork, StatusAcceptedByNetwork, StatusSeenOnNetwork, StatusRejected, StatusDoubleSpendAttempted, StatusMined}
8587
case StatusAcceptedByNetwork:
8688
return []Status{StatusAcceptedByNetwork, StatusSeenOnNetwork, StatusRejected, StatusDoubleSpendAttempted, StatusMined}
89+
case StatusServiceError:
90+
return []Status{StatusAcceptedByNetwork, StatusSeenOnNetwork, StatusMined, StatusImmutable}
8791
case StatusSeenOnNetwork, StatusRejected, StatusDoubleSpendAttempted, StatusMined, StatusImmutable:
8892
return []Status{}
8993
default:

0 commit comments

Comments
 (0)