Skip to content

Commit 7dab540

Browse files
Integrate reprocess behavior and break out services (#140)
* Integrate reprocess behavior and break out services * Potential fix for pull request finding 'CodeQL / Uncontrolled data used in path expression' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * perf improvements * Fix linting errors * Updates for SSE drops * Potential fix for pull request finding 'CodeQL / Uncontrolled data used in path expression' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * ci: remove unused //nolint:gosec on context.WithCancel golangci-lint 2.12.2 in CI no longer flags context.WithCancel patterns where the cancel is stored for later invocation (gosec G115 / G505 relaxation), so the suppression at services/sse/manager.go:232 became unused under nolintlint. The local 2.11.4 still required it, masking the CI failure. --------- Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
1 parent 786659e commit 7dab540

71 files changed

Lines changed: 6336 additions & 1293 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

app/app.go

Lines changed: 135 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ package app
1111
import (
1212
"context"
1313
"fmt"
14+
"os"
15+
"path"
1416
"time"
1517

18+
chaintrackslib "github.com/bsv-blockchain/go-chaintracks/chaintracks"
19+
"github.com/bsv-blockchain/go-sdk/chainhash"
1620
"go.uber.org/zap"
1721

1822
"github.com/bsv-blockchain/arcade/config"
@@ -22,9 +26,12 @@ import (
2226
"github.com/bsv-blockchain/arcade/services"
2327
"github.com/bsv-blockchain/arcade/services/api_server"
2428
"github.com/bsv-blockchain/arcade/services/bump_builder"
29+
"github.com/bsv-blockchain/arcade/services/chaintracks_server"
2530
"github.com/bsv-blockchain/arcade/services/p2p_client"
2631
"github.com/bsv-blockchain/arcade/services/propagation"
32+
"github.com/bsv-blockchain/arcade/services/sse"
2733
"github.com/bsv-blockchain/arcade/services/tx_validator"
34+
"github.com/bsv-blockchain/arcade/services/watchdog"
2835
"github.com/bsv-blockchain/arcade/services/webhook"
2936
"github.com/bsv-blockchain/arcade/store"
3037
storefactory "github.com/bsv-blockchain/arcade/store/factory"
@@ -46,6 +53,11 @@ type Deps struct {
4653
TeranodeClient *teranode.Client
4754
MerkleClient *merkleservice.Client // nil when MerkleService.URL is unset
4855
Validator *validator.Validator
56+
// Chaintracks is the shared in-process header tracker. nil when
57+
// chaintracks_server is disabled (regtest, or explicit opt-out) — services
58+
// that consume it (chaintracks_server, bump-builder canonical-root
59+
// validation) must nil-guard.
60+
Chaintracks chaintrackslib.Chaintracks
4961
}
5062

5163
// Bootstrap creates every shared dependency the services rely on, in the
@@ -78,22 +90,28 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
7890
_ = producer.Close()
7991
return nil, nil, fmt.Errorf("creating store: %w", err)
8092
}
81-
if err := st.EnsureIndexes(); err != nil {
93+
if idxErr := st.EnsureIndexes(); idxErr != nil {
8294
_ = st.Close()
8395
_ = producer.Close()
84-
return nil, nil, fmt.Errorf("ensuring store indexes: %w", err)
96+
return nil, nil, fmt.Errorf("ensuring store indexes: %w", idxErr)
8597
}
8698

99+
// Align store batch-helper concurrency with config. Zero falls back to
100+
// runtime.NumCPU which matches validator parallelism — keeps DB write
101+
// fanout from being the bottleneck.
102+
store.SetBatchConcurrency(cfg.Store.BatchConcurrency)
103+
87104
txTracker := store.NewTxTracker()
88105

89106
teranodeClient := teranode.NewClient(cfg.DatahubURLs, cfg.Teranode.AuthToken, teranode.HealthConfig{
90-
FailureThreshold: cfg.Propagation.EndpointHealth.FailureThreshold,
91-
ProbeInterval: time.Duration(cfg.Propagation.EndpointHealth.ProbeIntervalMs) * time.Millisecond,
92-
ProbeTimeout: time.Duration(cfg.Propagation.EndpointHealth.ProbeTimeoutMs) * time.Millisecond,
93-
MinHealthyEndpoints: cfg.Propagation.EndpointHealth.MinHealthyEndpoints,
94-
RefreshInterval: time.Duration(cfg.Propagation.EndpointHealth.RefreshIntervalMs) * time.Millisecond,
95-
Source: endpointSource{st: st, network: cfg.Network},
96-
Logger: logger,
107+
FailureThreshold: cfg.Propagation.EndpointHealth.FailureThreshold,
108+
BroadcastFailureThreshold: cfg.Propagation.EndpointHealth.BroadcastFailureThreshold,
109+
ProbeInterval: time.Duration(cfg.Propagation.EndpointHealth.ProbeIntervalMs) * time.Millisecond,
110+
ProbeTimeout: time.Duration(cfg.Propagation.EndpointHealth.ProbeTimeoutMs) * time.Millisecond,
111+
MinHealthyEndpoints: cfg.Propagation.EndpointHealth.MinHealthyEndpoints,
112+
RefreshInterval: time.Duration(cfg.Propagation.EndpointHealth.RefreshIntervalMs) * time.Millisecond,
113+
Source: endpointSource{st: st, network: cfg.Network},
114+
Logger: logger,
97115
})
98116

99117
// Seed the registry with statically configured URLs so a freshly started
@@ -103,16 +121,16 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
103121
if len(cfg.DatahubURLs) > 0 {
104122
seedCtx, seedCancel := context.WithTimeout(ctx, 5*time.Second)
105123
for _, url := range cfg.DatahubURLs {
106-
if err := st.UpsertDatahubEndpoint(seedCtx, store.DatahubEndpoint{
124+
if seedErr := st.UpsertDatahubEndpoint(seedCtx, store.DatahubEndpoint{
107125
URL: url,
108126
Network: cfg.Network,
109127
Source: store.DatahubEndpointSourceConfigured,
110128
LastSeen: time.Now(),
111-
}); err != nil {
129+
}); seedErr != nil {
112130
logger.Warn(
113131
"failed to seed configured datahub url",
114132
zap.String("url", url),
115-
zap.Error(err),
133+
zap.Error(seedErr),
116134
)
117135
}
118136
}
@@ -131,6 +149,22 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
131149

132150
teranodeClient.Start(ctx)
133151

152+
// Construct chaintracks once at process startup so every consumer
153+
// (chaintracks_server, bump-builder canonical-root validation, watchdog
154+
// future use) shares one P2P subscription and header cache. Skipped when
155+
// chaintracks_server is disabled — that flag is already the operator's
156+
// process-wide "no chaintracks" switch (regtest force-disables it).
157+
var chainTracks chaintrackslib.Chaintracks
158+
if cfg.ChaintracksServer.Enabled {
159+
ct, ctErr := initChaintracks(ctx, cfg, logger)
160+
if ctErr != nil {
161+
_ = st.Close()
162+
_ = producer.Close()
163+
return nil, nil, fmt.Errorf("chaintracks init: %w", ctErr)
164+
}
165+
chainTracks = ct
166+
}
167+
134168
deps := &Deps{
135169
Cfg: cfg,
136170
Logger: logger,
@@ -143,6 +177,7 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
143177
TeranodeClient: teranodeClient,
144178
MerkleClient: merkleClient,
145179
Validator: txVal,
180+
Chaintracks: chainTracks,
146181
}
147182

148183
cleanup := func() {
@@ -154,6 +189,52 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
154189
return deps, cleanup, nil
155190
}
156191

192+
// initChaintracks brings up the embedded go-chaintracks instance shared
193+
// across the process. Caller gates the enabled-ness check; this function
194+
// always tries to construct and returns an error on failure.
195+
//
196+
// The construction logic mirrors what previously lived in
197+
// chaintracks_server.Service.initChaintracks. Moving it here lets bump-
198+
// builder use the same instance without depending on a service's
199+
// initialization timing or a duplicate P2P subscription.
200+
func initChaintracks(ctx context.Context, cfg *config.Config, logger *zap.Logger) (chaintrackslib.Chaintracks, error) {
201+
// Default chaintracks storage to <storage_path>/chaintracks/ so
202+
// operators only need to set a single storage root. Tilde expansion
203+
// happens in config.Load.
204+
if cfg.Chaintracks.StoragePath == "" {
205+
root := cfg.StoragePath
206+
if root == "" {
207+
root = "."
208+
}
209+
if err := os.MkdirAll(root, 0o750); err != nil {
210+
return nil, fmt.Errorf("creating storage directory %s: %w", root, err)
211+
}
212+
cfg.Chaintracks.StoragePath = path.Join(root, "chaintracks")
213+
}
214+
215+
// Thread the top-level network into chaintracks' embedded p2p config.
216+
// Without this go-chaintracks falls back to "main" silently. Chaintracks
217+
// needs the upstream-strict spelling ("main"/"test"/"teratestnet").
218+
_, defaultBootstrap := config.ResolveP2PNetwork(cfg.Network)
219+
cfg.Chaintracks.P2P.Network = config.ResolveChaintracksNetwork(cfg.Network)
220+
if len(cfg.Chaintracks.P2P.MsgBus.BootstrapPeers) == 0 {
221+
cfg.Chaintracks.P2P.MsgBus.BootstrapPeers = defaultBootstrap
222+
}
223+
224+
ct, err := cfg.Chaintracks.Initialize(ctx, "arcade", nil)
225+
if err != nil {
226+
return nil, fmt.Errorf("chaintracks initialize: %w", err)
227+
}
228+
229+
network, _ := ct.GetNetwork(ctx)
230+
logger.Info(
231+
"chaintracks initialized",
232+
zap.String("storage_path", cfg.Chaintracks.StoragePath),
233+
zap.String("network", network),
234+
)
235+
return ct, nil
236+
}
237+
157238
// BuildServices returns the services that should run for the configured mode.
158239
// Each service's lifetime is tied to the ctx passed to its Start method by
159240
// the caller — the supervisor in cmd/arcade or the test harness.
@@ -166,10 +247,37 @@ func BuildServices(d *Deps) []services.Service {
166247
}
167248

168249
if shouldRun("api-server") {
169-
svcs = append(svcs, api_server.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TxTracker, d.TeranodeClient))
250+
svcs = append(svcs, api_server.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TxTracker, d.TeranodeClient, d.MerkleClient))
170251
}
171252
if shouldRun("bump-builder") {
172-
svcs = append(svcs, bump_builder.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TeranodeClient))
253+
// chainHeader is nil when chaintracks is disabled — bump-builder
254+
// nil-guards and falls back to subtree-count-only validation.
255+
var chainHeader bump_builder.ChainHeaderReader
256+
if d.Chaintracks != nil {
257+
chainHeader = chaintracksHeaderReader{ct: d.Chaintracks}
258+
}
259+
svcs = append(svcs, bump_builder.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TeranodeClient, d.TxTracker, chainHeader))
260+
}
261+
if shouldRun("watchdog") && cfg.Watchdog.Enabled {
262+
if wd := watchdog.NewService(cfg, d.Logger, d.Store, d.Leaser, d.MerkleClient); wd != nil {
263+
svcs = append(svcs, wd)
264+
} else {
265+
d.Logger.Info("watchdog skipped: merkle_service.url or leaser not configured")
266+
}
267+
}
268+
if shouldRun("sse") {
269+
if ssvc := sse.New(cfg, d.Logger, d.Publisher, d.Store); ssvc != nil {
270+
svcs = append(svcs, ssvc)
271+
} else {
272+
d.Logger.Info("sse skipped: sse.enabled=false or publisher not configured")
273+
}
274+
}
275+
if shouldRun("chaintracks") {
276+
if ct := chaintracks_server.New(cfg, d.Logger, d.Store, d.Chaintracks); ct != nil {
277+
svcs = append(svcs, ct)
278+
} else {
279+
d.Logger.Info("chaintracks skipped: chaintracks_server.enabled=false (regtest force-disables this)")
280+
}
173281
}
174282
if shouldRun("tx-validator") {
175283
svcs = append(svcs, tx_validator.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TxTracker, d.Validator))
@@ -187,6 +295,19 @@ func BuildServices(d *Deps) []services.Service {
187295
return svcs
188296
}
189297

298+
// chaintracksHeaderReader adapts go-chaintracks's Chaintracks interface to
299+
// the narrower ChainHeaderReader contract bump-builder needs. The library
300+
// already exposes GetHeaderByHash with the same signature, so this is a
301+
// trivial passthrough; the wrapper exists only to keep arcade packages from
302+
// importing chaintracks types into their public APIs.
303+
type chaintracksHeaderReader struct {
304+
ct chaintrackslib.Chaintracks
305+
}
306+
307+
func (a chaintracksHeaderReader) GetHeaderByHash(ctx context.Context, hash *chainhash.Hash) (*chaintrackslib.BlockHeader, error) {
308+
return a.ct.GetHeaderByHash(ctx, hash)
309+
}
310+
190311
// endpointSource adapts store.Store to teranode.EndpointSource by extracting
191312
// just the URL list. network scopes the listing to the configured Bitcoin
192313
// network so a store shared across pods (or reused after a network change)

bump/datahub.go

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@ const DefaultMaxBlockBytes int64 = 1 * 1024 * 1024 * 1024 // 1 GiB
3232
// stops a hostile server from inflating arcade's log lines.
3333
const maxErrorBodyBytes int64 = 512
3434

35+
// BlockDataValidator is an optional response-acceptance predicate run after a
36+
// successful datahub fetch. Returning a non-nil error causes the fetch loop
37+
// to discard that peer's response (logging the reason into urlErrors) and try
38+
// the next URL — turning the otherwise-greedy "first 200 wins" into a
39+
// "first 200 that passes validation wins" policy.
40+
//
41+
// Validators are stateless and pure functions of the response; they receive
42+
// what was parsed from one peer and decide whether it's plausible. Callers
43+
// use this to reject obviously-truncated responses (a pruned peer returning
44+
// fewer subtrees than the STUMPs reference) and to cross-check the header
45+
// merkle root against an out-of-band canonical value (chaintracks).
46+
type BlockDataValidator func(subtreeHashes []chainhash.Hash, headerMerkleRoot *chainhash.Hash) error
47+
3548
// FetchBlockDataForBUMP fetches subtree hashes, coinbase BUMP, and the block's
3649
// header merkle root from the binary block endpoint, trying all DataHub URLs.
3750
// Each attempt emits a Debug-level log line so operators can see which URLs
@@ -48,13 +61,27 @@ const maxErrorBodyBytes int64 = 512
4861
// subtreeCount (varint) | subtreeHashes (N×32) | coinbaseTx (variable) |
4962
// blockHeight (varint) | coinbaseBUMPLen (varint) | coinbaseBUMP (variable)
5063
func FetchBlockDataForBUMP(ctx context.Context, datahubURLs []string, blockHash string, logger *zap.Logger) (subtreeHashes []chainhash.Hash, coinbaseBUMP []byte, headerMerkleRoot *chainhash.Hash, err error) {
51-
return FetchBlockDataForBUMPWithCap(ctx, datahubURLs, blockHash, DefaultMaxBlockBytes, logger)
64+
return FetchBlockDataForBUMPWithOptions(ctx, datahubURLs, blockHash, DefaultMaxBlockBytes, nil, logger)
5265
}
5366

5467
// FetchBlockDataForBUMPWithCap is the cap-aware variant of FetchBlockDataForBUMP.
5568
// maxBlockBytes <= 0 selects DefaultMaxBlockBytes — passing zero/negative does
5669
// not silently disable the protection.
70+
//
71+
// Retained for backwards compatibility with callers that don't need to plug
72+
// in a content validator. Equivalent to FetchBlockDataForBUMPWithOptions with
73+
// validator=nil.
5774
func FetchBlockDataForBUMPWithCap(ctx context.Context, datahubURLs []string, blockHash string, maxBlockBytes int64, logger *zap.Logger) (subtreeHashes []chainhash.Hash, coinbaseBUMP []byte, headerMerkleRoot *chainhash.Hash, err error) {
75+
return FetchBlockDataForBUMPWithOptions(ctx, datahubURLs, blockHash, maxBlockBytes, nil, logger)
76+
}
77+
78+
// FetchBlockDataForBUMPWithOptions adds an optional BlockDataValidator that
79+
// runs after each successful HTTP+parse. A non-nil error from the validator
80+
// is treated the same as a transport error: log it, append to urlErrors, and
81+
// try the next URL. This turns the loop into "first 200-that-passes-validation
82+
// wins" — defending against datahubs that return self-consistent-but-wrong
83+
// block representations (pruned peers, stale caches).
84+
func FetchBlockDataForBUMPWithOptions(ctx context.Context, datahubURLs []string, blockHash string, maxBlockBytes int64, validator BlockDataValidator, logger *zap.Logger) (subtreeHashes []chainhash.Hash, coinbaseBUMP []byte, headerMerkleRoot *chainhash.Hash, err error) {
5885
if logger == nil {
5986
logger = zap.NewNop()
6087
}
@@ -68,22 +95,58 @@ func FetchBlockDataForBUMPWithCap(ctx context.Context, datahubURLs []string, blo
6895
for i, dataHubURL := range datahubURLs {
6996
start := time.Now()
7097
hashes, cbBUMP, root, status, fetchErr := fetchBlockBinary(ctx, dataHubURL, blockHash, maxBlockBytes)
71-
logger.Debug("datahub fetch attempt",
98+
logger.Debug(
99+
"datahub fetch attempt",
72100
zap.Int("idx", i),
73101
zap.String("url", dataHubURL),
74102
zap.Int("status", status),
75103
zap.Duration("elapsed", time.Since(start)),
76104
zap.Int64("max_block_bytes", maxBlockBytes),
77105
zap.Error(fetchErr),
78106
)
79-
if fetchErr == nil {
80-
return hashes, cbBUMP, root, nil
107+
if fetchErr != nil {
108+
urlErrors = append(urlErrors, fmt.Sprintf("url[%d] %q: %v", i, dataHubURL, fetchErr))
109+
continue
110+
}
111+
if validator != nil {
112+
if vErr := validator(hashes, root); vErr != nil {
113+
logger.Warn(
114+
"datahub response rejected by validator",
115+
zap.Int("idx", i),
116+
zap.String("url", dataHubURL),
117+
zap.Int("subtree_count", len(hashes)),
118+
zap.Error(vErr),
119+
)
120+
urlErrors = append(urlErrors, fmt.Sprintf("url[%d] %q: validator rejected: %v", i, dataHubURL, vErr))
121+
continue
122+
}
81123
}
82-
urlErrors = append(urlErrors, fmt.Sprintf("url[%d] %q: %v", i, dataHubURL, fetchErr))
124+
return hashes, cbBUMP, root, nil
83125
}
84126
return nil, nil, nil, fmt.Errorf("all DataHub URLs failed for block %s:\n %s", blockHash, strings.Join(urlErrors, "\n "))
85127
}
86128

129+
// SubtreeCountValidator returns a BlockDataValidator that rejects responses
130+
// with fewer than minSubtrees subtree hashes. Pass max(stump.SubtreeIndex)+1
131+
// when STUMPs are already in hand — a peer whose response can't index every
132+
// STUMP we've collected is provably wrong, no expensive cryptographic check
133+
// required.
134+
//
135+
// minSubtrees <= 0 returns nil — no validation, which preserves the old
136+
// "first 200 wins" behavior for callers that haven't yet collected STUMPs.
137+
func SubtreeCountValidator(minSubtrees int) BlockDataValidator {
138+
if minSubtrees <= 0 {
139+
return nil
140+
}
141+
return func(subtreeHashes []chainhash.Hash, _ *chainhash.Hash) error {
142+
if len(subtreeHashes) < minSubtrees {
143+
return fmt.Errorf("subtree_count %d < required %d (STUMPs reference subtree indexes up to %d)",
144+
len(subtreeHashes), minSubtrees, minSubtrees-1)
145+
}
146+
return nil
147+
}
148+
}
149+
87150
// fetchBlockBinary fetches a block from the binary endpoint and parses
88151
// subtree hashes, coinbase BUMP, and the header merkle root from the response.
89152
// Returns the HTTP status code (0 on transport error before a response was

0 commit comments

Comments
 (0)