Skip to content

Commit 4e6d1f3

Browse files
authored
Fix TTL for watch calls on replay (#147)
1 parent d5a2b2d commit 4e6d1f3

16 files changed

Lines changed: 958 additions & 68 deletions

File tree

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# ------------------------------------------------------------------------------
2+
# Add Issues to Project Workflow
3+
#
4+
# Purpose: Automatically add every newly opened issue in this repository to the
5+
# bsv-blockchain organization project board so triage and planning stay
6+
# in sync without manual intervention.
7+
#
8+
# Triggers: Runs whenever an issue is opened in this repository.
9+
#
10+
# Maintainer: @mrz1836
11+
# ------------------------------------------------------------------------------
12+
13+
name: add-issues-to-project
14+
15+
# --------------------------------------------------------------------
16+
# Trigger Configuration
17+
# --------------------------------------------------------------------
18+
on:
19+
issues:
20+
types:
21+
- opened
22+
23+
# --------------------------------------------------------------------
24+
# Permissions
25+
# --------------------------------------------------------------------
26+
permissions: read-all
27+
28+
# --------------------------------------------------------------------
29+
# Concurrency Control
30+
# --------------------------------------------------------------------
31+
concurrency:
32+
group: ${{ github.workflow }}-${{ github.event.issue.number }}
33+
cancel-in-progress: false
34+
35+
jobs:
36+
add-to-project:
37+
name: Add issue to bsv-blockchain project #18
38+
runs-on: ubuntu-latest
39+
permissions:
40+
# The org-level project token is supplied via ADD_TO_PROJECT_PAT; no
41+
# repository-scoped write permissions are required here.
42+
contents: read
43+
steps:
44+
# --------------------------------------------------------------------
45+
# Add the newly opened issue to the configured organization project
46+
# --------------------------------------------------------------------
47+
- name: Add issue to project
48+
uses: actions/add-to-project@5afcf98fcd03f1c2f92c3c83f58ae24323cc57fd # v2.0.0
49+
with:
50+
project-url: https://github.com/orgs/bsv-blockchain/projects/18
51+
github-token: ${{ secrets.ADD_TO_PROJECT_PAT }}

config/config.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,28 @@ type PropagationConfig struct {
323323
RegisterReplayOnStart *bool `mapstructure:"register_replay_on_start"`
324324
// RegisterReplayLookbackHours bounds how far back IterateStatusesSince
325325
// scans when replaying. Older txs are very likely terminal already
326-
// (MINED/IMMUTABLE) and skipping them avoids walking months of history
327-
// on every boot. Defaults to 168 (7 days).
326+
// (MINED/IMMUTABLE), and a non-terminal tx older than this window has
327+
// almost certainly stalled — re-registering it on every boot won't
328+
// unstick it. Defaults to 24 (one day), tightened from the original
329+
// 7 days after issue #145.
328330
RegisterReplayLookbackHours int `mapstructure:"register_replay_lookback_hours"`
331+
// MerkleReplaySkipRecentMinutes lets the startup replay skip txs whose
332+
// MerkleRegisteredAt is within this window. /watch is INSERT ... ON
333+
// CONFLICT DO NOTHING on the merkle-service side and does not refresh
334+
// expires_at, so re-registering a tx merkle-service already knows about
335+
// is wasted work. Default 30 (matches merkle-service postMineTTLSec).
336+
// Set to 0 to disable the skip and re-register every non-terminal tx —
337+
// useful for forcing a full re-sync after a known merkle-service wipe.
338+
// Issue #145.
339+
MerkleReplaySkipRecentMinutes int `mapstructure:"merkle_replay_skip_recent_minutes"`
340+
// MerkleReplayRPS caps the average requests-per-second the startup
341+
// replay issues against merkle-service. Implemented as an inter-batch
342+
// sleep proportional to batch size, so the actual rate hovers around
343+
// the configured RPS rather than burst-then-stall. 0 disables
344+
// throttling. Default 50 — a 24h replay over a 1.85M-row store would
345+
// otherwise pin merkle-service at its postgres write ceiling for hours.
346+
// Issue #145.
347+
MerkleReplayRPS int `mapstructure:"merkle_replay_rps"`
329348
// MaxPending caps the in-memory pending-batch slice the propagation
330349
// consumer accumulates between flushes. Once full, new messages are
331350
// returned as errors from handleMessage so the Kafka consumer's
@@ -665,7 +684,9 @@ func setDefaults() {
665684
// drops its registration state and arcade silently stops receiving
666685
// callbacks for everything in-flight.
667686
viper.SetDefault("propagation.register_replay_on_start", true)
668-
viper.SetDefault("propagation.register_replay_lookback_hours", 168)
687+
viper.SetDefault("propagation.register_replay_lookback_hours", 24)
688+
viper.SetDefault("propagation.merkle_replay_skip_recent_minutes", 30)
689+
viper.SetDefault("propagation.merkle_replay_rps", 50)
669690

670691
viper.SetDefault("network", NetworkMainnet)
671692

models/transaction.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ type TransactionStatus struct {
6262
RetryCount int `json:"retryCount,omitempty"`
6363
NextRetryAt time.Time `json:"nextRetryAt,omitempty"`
6464
CreatedAt time.Time `json:"-"`
65+
// MerkleRegisteredAt is the wall-clock time of the most recent
66+
// successful merkle-service /watch registration for this txid. Zero
67+
// value means "never registered" (or registered before this field was
68+
// added). The startup replay loop reads this to skip rows registered
69+
// within MerkleReplaySkipRecentMinutes — see issue #145.
70+
MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"`
6571
}
6672

6773
// Status represents the various states a transaction can be in

services/api_server/handlers_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ func (m *mockStore) SetMinedByTxIDs(context.Context, string, uint64, []string) (
9090
return nil, nil
9191
}
9292

93+
func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error {
94+
return nil
95+
}
96+
9397
func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error {
9498
m.mu.Lock()
9599
defer m.mu.Unlock()

services/propagation/propagator.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,12 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg)
383383
metrics.PropagationMerkleRegisterDuration.Observe(time.Since(start).Seconds())
384384

385385
registered = make([]propagationMsg, 0, len(batch))
386+
successTxIDs := make([]string, 0, len(batch))
386387
var failedCount int
387388
for i, err := range errs {
388389
if err == nil {
389390
registered = append(registered, batch[i])
391+
successTxIDs = append(successTxIDs, batch[i].TXID)
390392
continue
391393
}
392394
failedCount++
@@ -412,6 +414,22 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg)
412414
default:
413415
metrics.PropagationMerkleRegisterBatchOutcomeTotal.WithLabelValues("partial").Inc()
414416
}
417+
// Stamp merkle_registered_at on every tx we successfully registered. The
418+
// startup replay loop reads this and skips rows registered within
419+
// MerkleReplaySkipRecentMinutes — without it, every restart re-walks the
420+
// whole watchlist regardless of whether merkle-service already has it
421+
// (issue #145). A failure here must not block broadcast: the mark is a
422+
// hint, not part of the F-024 invariant. Worst case a missed mark causes
423+
// one redundant /watch on the next replay.
424+
if len(successTxIDs) > 0 {
425+
if err := p.store.MarkMerkleRegisteredByTxIDs(ctx, successTxIDs, time.Now()); err != nil {
426+
p.logger.Warn(
427+
"mark merkle-registered failed",
428+
zap.Int("count", len(successTxIDs)),
429+
zap.Error(err),
430+
)
431+
}
432+
}
415433
return registered
416434
}
417435

0 commit comments

Comments
 (0)