Skip to content

Commit 4e2ab45

Browse files
authored
Keep chaintracks isolated in microservice deployment (#156)
1 parent 44e18b8 commit 4e2ab45

7 files changed

Lines changed: 143 additions & 23 deletions

File tree

app/app.go

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,19 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
162162

163163
teranodeClient.Start(ctx)
164164

165-
// Construct chaintracks once at process startup so every consumer
166-
// (chaintracks_server, bump-builder canonical-root validation, watchdog
167-
// future use) shares one P2P subscription and header cache. Skipped when
168-
// chaintracks_server is disabled — that flag is already the operator's
169-
// process-wide "no chaintracks" switch (regtest force-disables it).
165+
// Construct chaintracks once at process startup so every in-process
166+
// consumer (chaintracks_server, bump-builder canonical-root validation)
167+
// shares one P2P subscription and header cache. Skipped when:
168+
// - cfg.ChaintracksServer.Enabled is false (operator-wide off switch;
169+
// regtest force-disables it via config.validate), or
170+
// - the configured mode never dereferences deps.Chaintracks. Without
171+
// this gate every microservice pod (api-server, sse, propagation, …)
172+
// would spin up an embedded ChainManager and poll the upstream node
173+
// even though nothing in that pod reads from it. In microservice
174+
// deployments the chaintracks pod runs embedded and bump-builder
175+
// points chaintracks.mode=remote at it; everything else skips here.
170176
var chainTracks chaintrackslib.Chaintracks
171-
if cfg.ChaintracksServer.Enabled {
177+
if cfg.ChaintracksServer.Enabled && modeNeedsChaintracks(cfg.Mode) {
172178
ct, ctErr := initChaintracks(ctx, cfg, logger)
173179
if ctErr != nil {
174180
_ = st.Close()
@@ -233,6 +239,25 @@ func Bootstrap(ctx context.Context, cfg *config.Config, logger *zap.Logger) (*De
233239
return deps, cleanup, nil
234240
}
235241

242+
// modeNeedsChaintracks reports whether the configured service mode constructs
243+
// at least one service that reads from deps.Chaintracks. Other modes
244+
// (api-server, sse, propagation, p2p-client, watchdog) never dereference it,
245+
// so initializing it would spin up an unused embedded ChainManager + upstream
246+
// header poller in every pod — which is exactly the wrong thing for
247+
// microservice deployments where the dedicated chaintracks pod owns the
248+
// header store and bump-builder reads via chaintracks.mode=remote.
249+
//
250+
// Keep this list in sync with BuildServices: any new mode that wires
251+
// deps.Chaintracks into a service must be added here.
252+
func modeNeedsChaintracks(mode string) bool {
253+
switch mode {
254+
case "all", "chaintracks", "bump-builder":
255+
return true
256+
default:
257+
return false
258+
}
259+
}
260+
236261
// initChaintracks brings up the embedded go-chaintracks instance shared
237262
// across the process. Caller gates the enabled-ness check; this function
238263
// always tries to construct and returns an error on failure.

app/app_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package app
2+
3+
import "testing"
4+
5+
func TestModeNeedsChaintracks(t *testing.T) {
6+
cases := []struct {
7+
mode string
8+
want bool
9+
}{
10+
{"all", true},
11+
{"chaintracks", true},
12+
{"bump-builder", true},
13+
{"api-server", false},
14+
{"sse", false},
15+
{"propagation", false},
16+
{"p2p-client", false},
17+
{"watchdog", false},
18+
{"", false},
19+
{"unknown-mode", false},
20+
}
21+
for _, tc := range cases {
22+
t.Run(tc.mode, func(t *testing.T) {
23+
if got := modeNeedsChaintracks(tc.mode); got != tc.want {
24+
t.Errorf("modeNeedsChaintracks(%q) = %v, want %v", tc.mode, got, tc.want)
25+
}
26+
})
27+
}
28+
}

config.example.yaml

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,8 +181,19 @@ chaintracks_server:
181181

182182
# go-chaintracks library config. See that library's docs for the full set.
183183
# Defaults are inherited from the library's SetDefaults — uncomment to override.
184+
#
185+
# In single-binary (mode=all) and the dedicated chaintracks pod, leave mode at
186+
# the default "embedded" so this process owns the header store and the P2P
187+
# subscription. In a microservice deployment every OTHER consumer pod
188+
# (bump-builder) should run mode="remote" with url pointed at the chaintracks
189+
# pod's HTTP endpoint — otherwise each pod spins up its own embedded
190+
# ChainManager and hits the upstream node N times. arcade additionally skips
191+
# the chaintracks library entirely in modes that don't read from it
192+
# (api-server, sse, propagation, p2p-client, watchdog), so those pods need no
193+
# chaintracks config at all.
184194
# chaintracks:
185-
# mode: embedded # or "remote" with url set to a running chaintracks
186-
# storage_path: "" # defaults to <storage_path>/chaintracks
187-
# bootstrap_url: ""
188-
# bootstrap_mode: api
195+
# mode: embedded # or "remote" with url set
196+
# url: "" # required when mode=remote, e.g. http://chaintracks:8083
197+
# storage_path: "" # embedded mode only; defaults to <storage_path>/chaintracks
198+
# bootstrap_url: "" # embedded mode only
199+
# bootstrap_mode: api # embedded mode only

deploy/bump-builder.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ spec:
3939
value: "aerospike-0.aerospike.arcade.svc.cluster.local:3000"
4040
- name: ARCADE_AEROSPIKE_NAMESPACE
4141
value: "arcade"
42+
# Talk to the chaintracks pod over HTTP instead of running an
43+
# embedded ChainManager + upstream header poller in this pod.
44+
# The chaintracks Service exposes /chaintracks/v1/* on 8083.
45+
- name: ARCADE_CHAINTRACKS_MODE
46+
value: "remote"
47+
- name: ARCADE_CHAINTRACKS_URL
48+
value: "http://chaintracks.arcade.svc.cluster.local:8083"
4249
readinessProbe:
4350
httpGet:
4451
path: /health

services/propagation/propagator.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ type Propagator struct {
4646
leaser store.Leaser
4747
teranodeClient *teranode.Client
4848
merkleClient *merkleservice.Client
49-
consumer *kafka.ConsumerGroup
49+
// consumer is written by Start (after kafka.NewConsumerGroup returns) and
50+
// read by Stop. atomic.Pointer makes that handoff race-free without
51+
// needing a mutex: in tests the harness can call Stop concurrently with
52+
// an in-flight Start that's still blocked initializing the consumer.
53+
consumer atomic.Pointer[kafka.ConsumerGroup]
5054

5155
maxPending int
5256
// admitCh, requeueCh, terminalCh, and drainCh feed runDispatcher's
@@ -470,7 +474,7 @@ func (p *Propagator) Start(ctx context.Context) error {
470474
if err != nil {
471475
return fmt.Errorf("creating consumer group: %w", err)
472476
}
473-
p.consumer = consumer
477+
p.consumer.Store(consumer)
474478

475479
// Spin up the persistent broadcast worker pool. Workers exit when
476480
// Stop() closes the job channel; the WaitGroup lets Stop() block until
@@ -547,8 +551,8 @@ func (p *Propagator) WaitForBatches() {
547551
func (p *Propagator) Stop() error {
548552
p.logger.Info("stopping propagation service")
549553
var consumerErr error
550-
if p.consumer != nil {
551-
consumerErr = p.consumer.Close()
554+
if c := p.consumer.Load(); c != nil {
555+
consumerErr = c.Close()
552556
}
553557
// Wait for in-flight processBatch goroutines to finish before tearing
554558
// down the broadcast worker pool. Otherwise an in-flight batch would

store/postgres/postgres.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -770,19 +770,31 @@ RETURNING t.txid, prev.status, prev.timestamp_at, prev.block_hash, prev.block_he
770770
txid string
771771
prevStatus string
772772
prevTimestamp time.Time
773-
prevBlockHash string
774-
prevHeight int64
773+
// transactions.block_hash / block_height are nullable for any
774+
// pre-MINED status (RECEIVED / SENT_TO_NETWORK / SEEN_*). The CTE
775+
// captures the pre-update row so these will be NULL whenever a tx
776+
// transitions to MINED for the first time. Scan via pointer so a
777+
// NULL doesn't fail the scan; an empty BlockHash on the returned
778+
// prev snapshot is the correct "no block yet" representation
779+
// (matches models.TransactionStatus zero values used elsewhere).
780+
prevBlockHash *string
781+
prevHeight *int64
775782
)
776783
if err := rows.Scan(&txid, &prevStatus, &prevTimestamp, &prevBlockHash, &prevHeight); err != nil {
777784
return prevs, out, err
778785
}
779-
prevs = append(prevs, &models.TransactionStatus{
780-
TxID: txid,
781-
Status: models.Status(prevStatus),
782-
Timestamp: prevTimestamp,
783-
BlockHash: prevBlockHash,
784-
BlockHeight: uint64(prevHeight), //nolint:gosec // value originated as uint64 in this column
785-
})
786+
prev := &models.TransactionStatus{
787+
TxID: txid,
788+
Status: models.Status(prevStatus),
789+
Timestamp: prevTimestamp,
790+
}
791+
if prevBlockHash != nil {
792+
prev.BlockHash = *prevBlockHash
793+
}
794+
if prevHeight != nil {
795+
prev.BlockHeight = uint64(*prevHeight) //nolint:gosec // value originated as uint64 in this column
796+
}
797+
prevs = append(prevs, prev)
786798
out = append(out, &models.TransactionStatus{
787799
TxID: txid,
788800
Status: models.StatusMined,

store/postgres/postgres_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,3 +1062,36 @@ func hashesOf(rows []*models.BlockProcessingStatus) []string {
10621062
}
10631063
return out
10641064
}
1065+
1066+
// Regression: a tx at any pre-MINED status has NULL block_hash / block_height,
1067+
// so the CTE in SetMinedByTxIDs returns NULLs for the previous-row snapshot.
1068+
// Scanning those NULLs into bare string/int64 used to error with
1069+
// "cannot scan NULL into *string", which then caused bump-builder to log
1070+
// "failed to set mined status" and drop the MINED transition entirely.
1071+
func TestSetMinedByTxIDs_HandlesNullPrevBlock(t *testing.T) {
1072+
s := newTestStore(t)
1073+
ctx := context.Background()
1074+
1075+
in := &models.TransactionStatus{TxID: "tx-prev-null", Status: models.StatusReceived}
1076+
if _, _, err := s.GetOrInsertStatus(ctx, in); err != nil {
1077+
t.Fatalf("seed: %v", err)
1078+
}
1079+
1080+
prevs, mined, err := s.SetMinedByTxIDs(ctx, "blockhashX", 42, []string{"tx-prev-null"})
1081+
if err != nil {
1082+
t.Fatalf("SetMinedByTxIDs: %v", err)
1083+
}
1084+
if len(prevs) != 1 || len(mined) != 1 {
1085+
t.Fatalf("prevs=%d mined=%d want 1/1", len(prevs), len(mined))
1086+
}
1087+
if prevs[0].Status != models.StatusReceived {
1088+
t.Errorf("prev status = %q, want RECEIVED", prevs[0].Status)
1089+
}
1090+
if prevs[0].BlockHash != "" || prevs[0].BlockHeight != 0 {
1091+
t.Errorf("prev block fields should be zero for pre-MINED row, got hash=%q height=%d",
1092+
prevs[0].BlockHash, prevs[0].BlockHeight)
1093+
}
1094+
if mined[0].BlockHash != "blockhashX" || mined[0].BlockHeight != 42 {
1095+
t.Errorf("mined snapshot mismatch: %+v", mined[0])
1096+
}
1097+
}

0 commit comments

Comments
 (0)