Skip to content

Commit 18e181e

Browse files
galt-trclaudegithub-advanced-security[bot]
authored
Add better processing behavior and tests (#134)
* feat(block-status): track header → BLOCK_PROCESSED → BUMP-built per block Adds a `block_processing` table populated by three writers — chaintracks header subscription, the merkle-service callback handler, and the bump-builder consumer — so we can answer "which blocks reached which milestone?" without scanning transactions or compound BUMPs. - Models: `BlockProcessingStatus` with active/orphaned status and pointer-time milestones so JSON cleanly distinguishes "not yet" from zero. - Store: 6 new methods (Upsert/MarkProcessed/MarkBUMPBuilt/MarkOrphaned/ Get/List) implemented across Postgres (column-level ON CONFLICT), Pebble (inverted-uint64 height index for descending scans), and Aerospike (per-bin Operate to avoid clobbering concurrent writers). Drops dead `setProcessedBlocks` helpers in Aerospike along the way. - Writers: chaintracks tip + reorg subscription in api-server, `MarkBlockProcessed` in `handleBlockProcessed` before Kafka publish (log-and-continue on store error), `MarkBlockBUMPBuilt` in bump-builder after `InsertBUMP`. - API: `GET /api/v1/blocks/processing-status` (paginated, descending height) and `GET /api/v1/blocks/processing-status/:blockHash`. - Tests: 8 Pebble + 7 Postgres backend tests + 7 api-server handler tests + 2 bump-builder integration tests; existing test mocks extended to satisfy the larger interface. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(app): hoist Bootstrap and BuildServices out of cmd/arcade `buildServices` and the dependency wiring around it lived as private helpers inside `cmd/arcade/main.go`, which meant test harnesses that want to boot arcade in-process either had to launch the binary or duplicate every kafka/store/teranode/merkle-client setup line. Moves both into a new top-level `app` package with two entry points: `app.Bootstrap(ctx, cfg, logger)` returns a `*Deps` plus a cleanup func (closing publisher → teranode client → store → producer in reverse order), and `app.BuildServices(deps)` returns the slice of services to run for the configured mode. `cmd/arcade/main.go` becomes a thin supervisor that calls the two and handles signals. No behavior change. The full default test suite passes; this lands ahead of the e2e harness which will reuse the same boot path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(e2e): add container layer for arcade ↔ merkle-service smoke tests Lays the foundation for a reusable end-to-end test harness that boots real backing services and runs against `ghcr.io/bsv-blockchain/ merkle-service:latest` instead of mocks. - Adds testcontainers-go (postgres + redpanda modules) to go.mod. - New `tests/e2e/harness` package, gated behind the `e2e` build tag so the default `go test ./...` run is unaffected. - `harness.New(t, ...Option)` brings up Postgres, Redpanda, and the merkle-service container on a shared user-defined bridge network. merkle-service is wired with `STORE_BACKEND=sql` (Postgres), `KAFKA_BROKERS` (Redpanda alias), `BLOB_STORE_URL=memory:`, and the SSRF/private-IP guards relaxed so callbacks back to `host.docker.internal` succeed. Wait strategy accepts both 200 (healthy) and 503 (degraded — peer count zero before libp2p host attaches) on `/health`. - Container teardown handles partial-start failures and runs via `t.Cleanup`. Helpers expose Postgres DSN / Kafka broker addresses and a `WaitForMerkleLogLine` poll that reads container stdout — used by later steps to assert deterministic events without poking at internal state. - `TestContainers_BootAndTearDown` skips when no container runtime is reachable (so devs without docker/podman aren't blocked) and otherwise takes ~16s warm to boot the stack and verify /health. Verified against rootless podman with `DOCKER_HOST=unix:///run/user/$(id -u)/podman/podman.sock`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(e2e): in-process libp2p host as bootstrap peer for merkle-service The smoke test needs to drive synthetic SubtreeMessage / BlockMessage announcements that merkle-service consumes via libp2p. Adds a `LibP2PHost` that wraps go-p2p-message-bus and: - Listens on a random TCP port and advertises `/dns4/host.docker.internal/tcp/<port>` so the merkle-service container can dial back via the host gateway. - Subscribes to the regtest block + subtree topics so gossipsub forms mesh links with merkle-service (publish-only peers don't get picked up by the mesh). A 250ms grace after Subscribe avoids a msgbus close-of-closed race when tests tear down quickly. - Exposes `BootstrapMultiaddr()` for feeding to merkle-service via `P2P_BOOTSTRAP_PEERS`, and `PublishBlock` / `PublishSubtree` that JSON-encode `teranode.BlockMessage` / `teranode.SubtreeMessage` and send on the right pubsub topic. `TestLibP2PHost_MerkleServicePeersWithHost` brings up the harness host plus the full container stack and asserts merkle-service logs `[CONNECTED] Topic peer <harness-peer-id>` within 90s. Using the container log line as the peering signal — rather than `msgbus.GetPeers()` — because msgbus's peerTracker only counts peers we've received messages from, which stays empty when merkle-service is silent. Round-trip peering verified end-to-end against rootless podman in ~5s warm. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(e2e): in-process datahub fake + synthetic block/subtree builder The smoke test needs to feed arcade and merkle-service the same block + subtree binaries that they would normally pull from a teranode datahub. Adds two pieces: - `harness.Datahub` is an httptest.Server bound on 0.0.0.0:auto-port exposing GET /block/<hash> and GET /subtree/<hash>. Tests Stage* payloads keyed by hash; the merkle-service container reaches the server via the host.docker.internal URL. - `txbuilder.go` mints synthetic transactions (using LockTime to vary txids), packs them into the concatenated-32-byte-hash subtree binary merkle-service expects, and constructs full model.Block.Bytes() payloads via teranode's `model` package. Bitcoin merkle-tree helpers (SubtreeRoot, MerkleRootFromCoinbaseAndSubtree) compose a header- valid layout for callers that need ValidateCompoundRoot to pass downstream. `TestDatahub_StageAndServe` round-trips a synthetic block through arcade's `bump.FetchBlockDataForBUMP`, which exercises the same parseBlockBinary path the bump-builder runs. This verifies our format is wire-compatible with both arcade (unchanged) and merkle-service (model.NewBlockFromBytes uses identical byte layout). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(e2e): in-process arcade runtime + poll helpers The harness now boots arcade via the same `app.Bootstrap` → `app.BuildServices` path the production binary uses, with config overridden in-memory: - regtest network (chaintracks auto-disabled) - pebble store on a t.TempDir() - in-process memory kafka broker - merkle_service.url = merkle-service container's host-mapped URL - callback_url = host.docker.internal so the container can reach back - datahub_urls seeded with the harness datahub fake - p2p bootstrap_peers = harness libp2p multiaddr (datahub_discovery off) - callback.allow_private_ips = true (RFC1918 loopback by definition) `StartArcade(t, opts)` returns an ArcadeRuntime exposing the bound port, host-side base URL, and merkle-service-reachable host URL. All services start in goroutines bound to the test context; t.Cleanup cancels and waits with a 15s bound. `poll.go` adds the helpers later steps lean on: - BroadcastTx: POST /tx with raw bytes; returns server-side txid. - GetTxStatus / WaitForMined: poll /tx/:txid until every supplied txid reaches MINED with a non-empty merklePath, or fail with the first stuck txid's last-observed status. `TestArcadeRuntime_BootsAndServesHealth` brings up the full stack — containers + libp2p + datahub + arcade — and asserts /health returns 200 with the harness datahub URL listed as healthy. Verified against rootless podman in ~5s warm. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(e2e): smoke test verifies tx → arcade → merkle-service /watch The first cross-service smoke scenario. It boots the full harness (Postgres + Redpanda + merkle-service container, plus in-process libp2p host, datahub fake, and arcade), submits a single transaction to arcade via POST /tx, and asserts the txid lands in merkle-service's registration store via GET /api/lookup/<txid>. That walks every piece of wiring this PR adds: - arcade's tx-validator parses + structurally validates the tx - propagation calls merkleClient.Register over HTTP - merkle-service inside its container reaches Postgres and persists the (txid, callbackUrl, callbackToken) entry Adds harness.BuildValidatableTxs which produces the minimum-valid transactions arcade's structural validator accepts (1 input + 1 output + non-data scripts + LockTime-driven txid uniqueness). TxBuilder unit tests confirm the validator path on these synthetic txs without any container involvement. Round-trips cleanly against rootless podman in ~5s warm. Full MINED status (which requires merkle-tree-valid block construction so arcade's ValidateCompoundRoot accepts the compound BUMP) is the next step — tracked as a gap in tests/e2e/MERKLE_SERVICE_GAPS.md in a follow-up commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * ci(e2e): wire smoke suite as required PR gate, document harness Adds .github/workflows/e2e-smoke.yml — pre-pulls the merkle-service image so registry latency stays out of the test's wait budgets, disables Ryuk (flaky on rootless container runtimes), and runs the full e2e suite under the e2e build tag with a 20-min hard timeout. Triggers on every PR and push to main; configured to be the required gate via repo branch-protection settings. tests/e2e/README.md documents the local-run recipe for both Docker and rootless podman (DOCKER_HOST socket override), the file layout, and how to add a new scenario. tests/e2e/MERKLE_SERVICE_GAPS.md captures six friction points the harness work surfaced — backend-import drift, mandatory external Kafka, private-IP guard defaults, health-endpoint signal aliasing, no deterministic block-replay, and missing private-network docs — each with a suggested fix and a note about how the harness works around it today. Worth filing upstream against /git/merkle-service. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Fix linting * Lint and test fixes * docs(e2e): document rootless podman + pasta networking limitation The harness now reaches the host from inside containers via the docker bridge gateway IP (auto-discovered) with host.docker.internal as a fallback. README updates to reflect the new flow: - Document the gateway-IP-first announce strategy and the host.docker.internal fallback. - Call out the rootless-podman + pasta limitation: when pasta runs with --no-map-gw (the security-conscious default on recent Fedora/Ubuntu), the host is not reachable from inside containers via either the gateway IP or host.docker.internal. Tests that require merkle-service to dial back into the harness will time out under this configuration. CI uses Docker so the required PR gate is unaffected. - Add a one-liner reachability check developers can run before trying the libp2p-peering tests locally. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Run go mod tidy * Potential fix for pull request finding 'CodeQL / Incorrect conversion between integer types' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
1 parent 9fb01f2 commit 18e181e

42 files changed

Lines changed: 4506 additions & 275 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/build.yml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ jobs:
5858
name: Verify GoFortress gates passed
5959
runs-on: ubuntu-latest
6060
if: >-
61-
github.event_name != 'workflow_run' ||
62-
(github.event.workflow_run.conclusion == 'success' &&
61+
github.event_name != 'workflow_run' || (github.event.workflow_run.conclusion == 'success' &&
6362
github.event.workflow_run.event == 'push')
6463
steps:
6564
- name: Confirm upstream gates

.github/workflows/e2e-smoke.yml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
name: e2e-smoke
2+
3+
# ------------------------------------------------------------------------------------
4+
# arcade ↔ merkle-service end-to-end smoke
5+
# ------------------------------------------------------------------------------------
6+
# Boots Postgres, Redpanda, and ghcr.io/bsv-blockchain/merkle-service:latest via
7+
# testcontainers-go alongside an in-process arcade instance, then drives a
8+
# representative tx-flow scenario through both services.
9+
#
10+
# Required PR gate: every PR must show a green run before merging. The job
11+
# pre-pulls the merkle-service image so cold-start latency doesn't surface as
12+
# flaky timeouts on the wait strategies in tests/e2e/harness.
13+
# ------------------------------------------------------------------------------------
14+
15+
on:
16+
pull_request:
17+
push:
18+
branches: [main]
19+
workflow_dispatch:
20+
21+
permissions:
22+
contents: read
23+
24+
concurrency:
25+
group: e2e-smoke-${{ github.workflow }}-${{ github.ref }}
26+
cancel-in-progress: true
27+
28+
jobs:
29+
smoke:
30+
name: smoke
31+
runs-on: ubuntu-24.04
32+
timeout-minutes: 25
33+
steps:
34+
- name: Checkout
35+
uses: actions/checkout@v4
36+
37+
- name: Set up Go
38+
uses: actions/setup-go@v5
39+
with:
40+
go-version-file: go.mod
41+
cache: true
42+
43+
# Pre-pulling pins image-fetch time outside the test's wait strategies so
44+
# the harness's own timeouts only have to cover container startup, not
45+
# registry pulls. ghcr.io is unauthenticated for public images.
46+
- name: Pre-pull merkle-service image
47+
run: docker pull ghcr.io/bsv-blockchain/merkle-service:latest
48+
49+
- name: Run e2e smoke tests
50+
env:
51+
# testcontainers-go honors DOCKER_HOST when set; on the GitHub
52+
# ubuntu runner the docker socket lives at the default path so we
53+
# leave it unset.
54+
TESTCONTAINERS_RYUK_DISABLED: "true"
55+
run: go test -tags=e2e -timeout=20m -v ./tests/e2e/...

app/app.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Package app builds and wires the shared dependencies arcade's services
2+
// need (kafka broker + producer, store, teranode client, merkle-service
3+
// client, events publisher, validator) and turns them into a slice of
4+
// runnable services. cmd/arcade/main.go is a thin wrapper that loads
5+
// config, calls Bootstrap, then BuildServices, and supervises the lifecycle.
6+
//
7+
// Splitting this out of cmd/arcade lets the e2e test harness reuse the same
8+
// boot path without invoking the binary or duplicating wiring.
9+
package app
10+
11+
import (
12+
"context"
13+
"fmt"
14+
"time"
15+
16+
"go.uber.org/zap"
17+
18+
"github.com/bsv-blockchain/arcade/config"
19+
"github.com/bsv-blockchain/arcade/events"
20+
"github.com/bsv-blockchain/arcade/kafka"
21+
"github.com/bsv-blockchain/arcade/merkleservice"
22+
"github.com/bsv-blockchain/arcade/services"
23+
"github.com/bsv-blockchain/arcade/services/api_server"
24+
"github.com/bsv-blockchain/arcade/services/bump_builder"
25+
"github.com/bsv-blockchain/arcade/services/p2p_client"
26+
"github.com/bsv-blockchain/arcade/services/propagation"
27+
"github.com/bsv-blockchain/arcade/services/tx_validator"
28+
"github.com/bsv-blockchain/arcade/services/webhook"
29+
"github.com/bsv-blockchain/arcade/store"
30+
storefactory "github.com/bsv-blockchain/arcade/store/factory"
31+
"github.com/bsv-blockchain/arcade/teranode"
32+
"github.com/bsv-blockchain/arcade/validator"
33+
)
34+
35+
// Deps is the set of process-wide dependencies arcade's services share.
36+
// Bootstrap returns one of these fully wired; BuildServices consumes it.
37+
type Deps struct {
38+
Cfg *config.Config
39+
Logger *zap.Logger
40+
Broker kafka.Broker
41+
Producer *kafka.Producer
42+
Publisher events.Publisher
43+
Store store.Store
44+
Leaser store.Leaser
45+
TxTracker *store.TxTracker
46+
TeranodeClient *teranode.Client
47+
MerkleClient *merkleservice.Client // nil when MerkleService.URL is unset
48+
Validator *validator.Validator
49+
}
50+
51+
// Bootstrap creates every shared dependency the services rely on, in the
52+
// order they need to be created. The returned cleanup func closes them in
53+
// reverse order. teranodeClient.Start is wired to ctx, so its background
54+
// probes terminate when ctx is canceled.
55+
func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*Deps, func(), error) {
56+
logger.Info(
57+
"starting arcade",
58+
zap.String("mode", cfg.Mode),
59+
zap.String("kafka_backend", cfg.Kafka.Backend),
60+
zap.String("store_backend", cfg.Store.Backend),
61+
)
62+
63+
broker, err := kafka.NewBroker(cfg.Kafka)
64+
if err != nil {
65+
return nil, nil, fmt.Errorf("creating kafka broker: %w", err)
66+
}
67+
producer := kafka.NewProducer(broker)
68+
69+
if cfg.Kafka.MinPartitions > 1 {
70+
if pErr := kafka.CheckPartitions(broker, []string{kafka.TopicTransaction, kafka.TopicPropagation}, cfg.Kafka.MinPartitions, logger); pErr != nil {
71+
_ = producer.Close()
72+
return nil, nil, fmt.Errorf("kafka partition check: %w", pErr)
73+
}
74+
}
75+
76+
st, leaser, err := storefactory.New(cfg) //nolint:contextcheck // factory signature predates ctx; the Postgres backend honors ctx via its own internal pool dial, others (Pebble, Aerospike) connect synchronously
77+
if err != nil {
78+
_ = producer.Close()
79+
return nil, nil, fmt.Errorf("creating store: %w", err)
80+
}
81+
if err := st.EnsureIndexes(); err != nil {
82+
_ = st.Close()
83+
_ = producer.Close()
84+
return nil, nil, fmt.Errorf("ensuring store indexes: %w", err)
85+
}
86+
87+
txTracker := store.NewTxTracker()
88+
89+
teranodeClient := teranode.NewClient(cfg.DatahubURLs, cfg.Teranode.AuthToken, teranode.HealthConfig{
90+
FailureThreshold: cfg.Propagation.EndpointHealth.FailureThreshold,
91+
ProbeInterval: time.Duration(cfg.Propagation.EndpointHealth.ProbeIntervalMs) * time.Millisecond,
92+
ProbeTimeout: time.Duration(cfg.Propagation.EndpointHealth.ProbeTimeoutMs) * time.Millisecond,
93+
MinHealthyEndpoints: cfg.Propagation.EndpointHealth.MinHealthyEndpoints,
94+
RefreshInterval: time.Duration(cfg.Propagation.EndpointHealth.RefreshIntervalMs) * time.Millisecond,
95+
Source: endpointSource{st: st, network: cfg.Network},
96+
Logger: logger,
97+
})
98+
99+
// Seed the registry with statically configured URLs so a freshly started
100+
// pod (especially mode=p2p-client) always sees them via the same discovery
101+
// path. The upsert is idempotent and refreshes LastSeen on every restart,
102+
// so this also works as a heartbeat for the operator-defined seed list.
103+
if len(cfg.DatahubURLs) > 0 {
104+
seedCtx, seedCancel := context.WithTimeout(ctx, 5*time.Second)
105+
for _, url := range cfg.DatahubURLs {
106+
if err := st.UpsertDatahubEndpoint(seedCtx, store.DatahubEndpoint{
107+
URL: url,
108+
Network: cfg.Network,
109+
Source: store.DatahubEndpointSourceConfigured,
110+
LastSeen: time.Now(),
111+
}); err != nil {
112+
logger.Warn(
113+
"failed to seed configured datahub url",
114+
zap.String("url", url),
115+
zap.Error(err),
116+
)
117+
}
118+
}
119+
seedCancel()
120+
}
121+
122+
var merkleClient *merkleservice.Client
123+
if cfg.MerkleService.URL != "" {
124+
merkleClient = merkleservice.NewClient(cfg.MerkleService.URL, cfg.MerkleService.AuthToken, 0)
125+
merkleClient.SetLogger(logger.Named("merkle-client"))
126+
}
127+
128+
txVal := validator.NewValidator(nil, nil)
129+
130+
publisher := events.NewKafkaPublisher(producer, logger, cfg.Events.SubscriberBuffer)
131+
132+
teranodeClient.Start(ctx)
133+
134+
deps := &Deps{
135+
Cfg: cfg,
136+
Logger: logger,
137+
Broker: broker,
138+
Producer: producer,
139+
Publisher: publisher,
140+
Store: st,
141+
Leaser: leaser,
142+
TxTracker: txTracker,
143+
TeranodeClient: teranodeClient,
144+
MerkleClient: merkleClient,
145+
Validator: txVal,
146+
}
147+
148+
cleanup := func() {
149+
_ = publisher.Close()
150+
teranodeClient.Close()
151+
_ = st.Close()
152+
_ = producer.Close()
153+
}
154+
return deps, cleanup, nil
155+
}
156+
157+
// BuildServices returns the services that should run for the configured mode.
158+
// Each service's lifetime is tied to the ctx passed to its Start method by
159+
// the caller — the supervisor in cmd/arcade or the test harness.
160+
func BuildServices(d *Deps) []services.Service {
161+
cfg := d.Cfg
162+
var svcs []services.Service
163+
164+
shouldRun := func(name string) bool {
165+
return cfg.Mode == "all" || cfg.Mode == name
166+
}
167+
168+
if shouldRun("api-server") {
169+
svcs = append(svcs, api_server.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TxTracker, d.TeranodeClient))
170+
}
171+
if shouldRun("bump-builder") {
172+
svcs = append(svcs, bump_builder.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TeranodeClient))
173+
}
174+
if shouldRun("tx-validator") {
175+
svcs = append(svcs, tx_validator.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.TxTracker, d.Validator))
176+
}
177+
if shouldRun("propagation") {
178+
svcs = append(svcs, propagation.New(cfg, d.Logger, d.Producer, d.Publisher, d.Store, d.Leaser, d.TeranodeClient, d.MerkleClient))
179+
}
180+
if shouldRun("api-server") || shouldRun("webhook") {
181+
svcs = append(svcs, webhook.New(cfg.Webhook, cfg.Callback, d.Logger, d.Publisher, d.Store))
182+
}
183+
if shouldRun("propagation") || shouldRun("p2p-client") {
184+
svcs = append(svcs, p2p_client.New(cfg, d.Logger, d.Producer, d.TeranodeClient, d.Store))
185+
}
186+
187+
return svcs
188+
}
189+
190+
// endpointSource adapts store.Store to teranode.EndpointSource by extracting
191+
// just the URL list. network scopes the listing to the configured Bitcoin
192+
// network so a store shared across pods (or reused after a network change)
193+
// never replays peers from a different network.
194+
type endpointSource struct {
195+
st store.Store
196+
network string
197+
}
198+
199+
func (a endpointSource) ListEndpointURLs(ctx context.Context) ([]string, error) {
200+
eps, err := a.st.ListDatahubEndpoints(ctx, a.network)
201+
if err != nil {
202+
return nil, err
203+
}
204+
out := make([]string, 0, len(eps))
205+
for _, ep := range eps {
206+
out = append(out, ep.URL)
207+
}
208+
return out, nil
209+
}

0 commit comments

Comments
 (0)