diff --git a/.github/workflows/add-issues-to-project.yml b/.github/workflows/add-issues-to-project.yml new file mode 100644 index 0000000..8cf3cd0 --- /dev/null +++ b/.github/workflows/add-issues-to-project.yml @@ -0,0 +1,51 @@ +# ------------------------------------------------------------------------------ +# Add Issues to Project Workflow +# +# Purpose: Automatically add every newly opened issue in this repository to the +# bsv-blockchain organization project board so triage and planning stay +# in sync without manual intervention. +# +# Triggers: Runs whenever an issue is opened in this repository. +# +# Maintainer: @mrz1836 +# ------------------------------------------------------------------------------ + +name: add-issues-to-project + +# -------------------------------------------------------------------- +# Trigger Configuration +# -------------------------------------------------------------------- +on: + issues: + types: + - opened + +# -------------------------------------------------------------------- +# Permissions +# -------------------------------------------------------------------- +permissions: read-all + +# -------------------------------------------------------------------- +# Concurrency Control +# -------------------------------------------------------------------- +concurrency: + group: ${{ github.workflow }}-${{ github.event.issue.number }} + cancel-in-progress: false + +jobs: + add-to-project: + name: Add issue to bsv-blockchain project #18 + runs-on: ubuntu-latest + permissions: + # The org-level project token is supplied via ADD_TO_PROJECT_PAT; no + # repository-scoped write permissions are required here. + contents: read + steps: + # -------------------------------------------------------------------- + # Add the newly opened issue to the configured organization project + # -------------------------------------------------------------------- + - name: Add issue to project + uses: actions/add-to-project@5afcf98fcd03f1c2f92c3c83f58ae24323cc57fd # v2.0.0 + with: + project-url: https://github.com/orgs/bsv-blockchain/projects/18 + github-token: ${{ secrets.ADD_TO_PROJECT_PAT }} diff --git a/config/config.go b/config/config.go index dc0bb04..8929072 100644 --- a/config/config.go +++ b/config/config.go @@ -323,9 +323,28 @@ type PropagationConfig struct { RegisterReplayOnStart *bool `mapstructure:"register_replay_on_start"` // RegisterReplayLookbackHours bounds how far back IterateStatusesSince // scans when replaying. Older txs are very likely terminal already - // (MINED/IMMUTABLE) and skipping them avoids walking months of history - // on every boot. Defaults to 168 (7 days). + // (MINED/IMMUTABLE), and a non-terminal tx older than this window has + // almost certainly stalled — re-registering it on every boot won't + // unstick it. Defaults to 24 (one day), tightened from the original + // 7 days after issue #145. RegisterReplayLookbackHours int `mapstructure:"register_replay_lookback_hours"` + // MerkleReplaySkipRecentMinutes lets the startup replay skip txs whose + // MerkleRegisteredAt is within this window. /watch is INSERT ... ON + // CONFLICT DO NOTHING on the merkle-service side and does not refresh + // expires_at, so re-registering a tx merkle-service already knows about + // is wasted work. Default 30 (matches merkle-service postMineTTLSec). + // Set to 0 to disable the skip and re-register every non-terminal tx — + // useful for forcing a full re-sync after a known merkle-service wipe. + // Issue #145. + MerkleReplaySkipRecentMinutes int `mapstructure:"merkle_replay_skip_recent_minutes"` + // MerkleReplayRPS caps the average requests-per-second the startup + // replay issues against merkle-service. Implemented as an inter-batch + // sleep proportional to batch size, so the actual rate hovers around + // the configured RPS rather than burst-then-stall. 0 disables + // throttling. Default 50 — a 24h replay over a 1.85M-row store would + // otherwise pin merkle-service at its postgres write ceiling for hours. + // Issue #145. + MerkleReplayRPS int `mapstructure:"merkle_replay_rps"` // MaxPending caps the in-memory pending-batch slice the propagation // consumer accumulates between flushes. Once full, new messages are // returned as errors from handleMessage so the Kafka consumer's @@ -665,7 +684,9 @@ func setDefaults() { // drops its registration state and arcade silently stops receiving // callbacks for everything in-flight. viper.SetDefault("propagation.register_replay_on_start", true) - viper.SetDefault("propagation.register_replay_lookback_hours", 168) + viper.SetDefault("propagation.register_replay_lookback_hours", 24) + viper.SetDefault("propagation.merkle_replay_skip_recent_minutes", 30) + viper.SetDefault("propagation.merkle_replay_rps", 50) viper.SetDefault("network", NetworkMainnet) diff --git a/models/transaction.go b/models/transaction.go index 621576f..edb2c6b 100644 --- a/models/transaction.go +++ b/models/transaction.go @@ -62,6 +62,12 @@ type TransactionStatus struct { RetryCount int `json:"retryCount,omitempty"` NextRetryAt time.Time `json:"nextRetryAt,omitempty"` CreatedAt time.Time `json:"-"` + // MerkleRegisteredAt is the wall-clock time of the most recent + // successful merkle-service /watch registration for this txid. Zero + // value means "never registered" (or registered before this field was + // added). The startup replay loop reads this to skip rows registered + // within MerkleReplaySkipRecentMinutes — see issue #145. + MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"` } // Status represents the various states a transaction can be in diff --git a/services/api_server/handlers_test.go b/services/api_server/handlers_test.go index b9312b2..edb33fb 100644 --- a/services/api_server/handlers_test.go +++ b/services/api_server/handlers_test.go @@ -90,6 +90,10 @@ func (m *mockStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ( return nil, nil } +func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error { + return nil +} + func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error { m.mu.Lock() defer m.mu.Unlock() diff --git a/services/propagation/propagator.go b/services/propagation/propagator.go index 49af74e..a02059f 100644 --- a/services/propagation/propagator.go +++ b/services/propagation/propagator.go @@ -383,10 +383,12 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg) metrics.PropagationMerkleRegisterDuration.Observe(time.Since(start).Seconds()) registered = make([]propagationMsg, 0, len(batch)) + successTxIDs := make([]string, 0, len(batch)) var failedCount int for i, err := range errs { if err == nil { registered = append(registered, batch[i]) + successTxIDs = append(successTxIDs, batch[i].TXID) continue } failedCount++ @@ -412,6 +414,22 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg) default: metrics.PropagationMerkleRegisterBatchOutcomeTotal.WithLabelValues("partial").Inc() } + // Stamp merkle_registered_at on every tx we successfully registered. The + // startup replay loop reads this and skips rows registered within + // MerkleReplaySkipRecentMinutes — without it, every restart re-walks the + // whole watchlist regardless of whether merkle-service already has it + // (issue #145). A failure here must not block broadcast: the mark is a + // hint, not part of the F-024 invariant. Worst case a missed mark causes + // one redundant /watch on the next replay. + if len(successTxIDs) > 0 { + if err := p.store.MarkMerkleRegisteredByTxIDs(ctx, successTxIDs, time.Now()); err != nil { + p.logger.Warn( + "mark merkle-registered failed", + zap.Int("count", len(successTxIDs)), + zap.Error(err), + ) + } + } return registered } diff --git a/services/propagation/propagator_test.go b/services/propagation/propagator_test.go index 427d80c..304f8ae 100644 --- a/services/propagation/propagator_test.go +++ b/services/propagation/propagator_test.go @@ -70,6 +70,13 @@ type mockStore struct { cleared []clearedCall // replayRows drives IterateStatusesSince for merkle-replay tests. replayRows []*models.TransactionStatus + // merkleMarks records every MarkMerkleRegisteredByTxIDs call as one + // slice per call. Lets tests assert how many flushes happened and + // which txids landed in each. + merkleMarks [][]string + // markErr forces MarkMerkleRegisteredByTxIDs to return this error. + // Used by tests that verify a mark failure doesn't block broadcast. + markErr error } type clearedCall struct { @@ -136,6 +143,53 @@ func (m *mockStore) GetReadyRetries(_ context.Context, now time.Time, limit int) return out, nil } +func (m *mockStore) MarkMerkleRegisteredByTxIDs(_ context.Context, txids []string, ts time.Time) error { + m.mu.Lock() + defer m.mu.Unlock() + if m.markErr != nil { + return m.markErr + } + cp := append([]string(nil), txids...) + m.merkleMarks = append(m.merkleMarks, cp) + // Also stamp the replayRows so successive IterateStatusesSince calls + // observe the marker — lets replay tests verify the round-trip. + marked := make(map[string]struct{}, len(txids)) + for _, t := range txids { + marked[t] = struct{}{} + } + for _, r := range m.replayRows { + if _, ok := marked[r.TxID]; ok { + r.MerkleRegisteredAt = ts + } + } + return nil +} + +func (m *mockStore) markCount() int { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.merkleMarks) +} + +func (m *mockStore) lastMark() []string { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.merkleMarks) == 0 { + return nil + } + return m.merkleMarks[len(m.merkleMarks)-1] +} + +func (m *mockStore) allMarks() []string { + m.mu.Lock() + defer m.mu.Unlock() + var out []string + for _, batch := range m.merkleMarks { + out = append(out, batch...) + } + return out +} + func (m *mockStore) ClearRetryState(_ context.Context, txid string, finalStatus models.Status, extraInfo string) error { m.mu.Lock() defer m.mu.Unlock() @@ -150,11 +204,17 @@ func (m *mockStore) ClearRetryState(_ context.Context, txid string, finalStatus return nil } -func (m *mockStore) IterateStatusesSince(_ context.Context, _ time.Time, fn func(*models.TransactionStatus) error) error { +func (m *mockStore) IterateStatusesSince(_ context.Context, since time.Time, fn func(*models.TransactionStatus) error) error { m.mu.Lock() rows := append([]*models.TransactionStatus(nil), m.replayRows...) m.mu.Unlock() for _, r := range rows { + // Honor the lookback filter so replay tests can pin behavior that + // depends on it. Rows with a zero Timestamp are always returned — + // matches existing tests that don't bother setting one. + if !r.Timestamp.IsZero() && r.Timestamp.Before(since) { + continue + } if err := fn(r); err != nil { return err } @@ -652,6 +712,185 @@ func TestRunMerkleReplay_DisabledByConfig(t *testing.T) { } } +// replayPropagator builds a Propagator wired to the supplied merkle server, +// with all the knobs replay tests care about pre-populated. Keeps the +// per-test setup boilerplate small. +func replayPropagator(t *testing.T, ms *mockStore, merkleURL string, configure func(*config.Config)) *Propagator { + t.Helper() + cfg := &config.Config{CallbackURL: "http://arcade/cb", CallbackToken: "tok"} + cfg.Propagation.MerkleConcurrency = 4 + cfg.Propagation.RegisterReplayLookbackHours = 24 + enabled := true + cfg.Propagation.RegisterReplayOnStart = &enabled + if configure != nil { + configure(cfg) + } + mc := merkleservice.NewClient(merkleURL, "auth", 5*time.Second) + return New(cfg, zap.NewNop(), nil, nil, ms, nil, nil, mc) +} + +// TestRunMerkleReplay_SkipsRecentlyRegistered pins the issue #145 fix: +// rows whose MerkleRegisteredAt is within MerkleReplaySkipRecentMinutes +// don't need re-registration (merkle-service still has them, and POST +// /watch wouldn't refresh expires_at anyway). +func TestRunMerkleReplay_SkipsRecentlyRegistered(t *testing.T) { + log := &eventLog{} + ms := newMockStore() + now := time.Now() + ms.replayRows = []*models.TransactionStatus{ + {TxID: "tx-stale-1", Status: models.StatusReceived, MerkleRegisteredAt: now.Add(-2 * time.Hour)}, + {TxID: "tx-recent-1", Status: models.StatusReceived, MerkleRegisteredAt: now.Add(-5 * time.Minute)}, + {TxID: "tx-stale-2", Status: models.StatusSeenOnNetwork, MerkleRegisteredAt: now.Add(-2 * time.Hour)}, + {TxID: "tx-recent-2", Status: models.StatusSeenOnNetwork, MerkleRegisteredAt: now.Add(-5 * time.Minute)}, + } + + merkleSrv := newMerkleServer(log, http.StatusOK) + defer merkleSrv.Close() + + p := replayPropagator(t, ms, merkleSrv.URL, func(cfg *config.Config) { + cfg.Propagation.MerkleReplaySkipRecentMinutes = 30 + }) + p.runMerkleReplay(context.Background()) + + if got := log.count("register:"); got != 2 { + t.Errorf("registered=%d want 2 (only stale rows)", got) + } + for _, skip := range []string{"tx-recent-1", "tx-recent-2"} { + for _, ev := range log.all() { + if strings.Contains(ev, skip) { + t.Errorf("event %q should have been skipped (recently registered)", ev) + } + } + } +} + +// TestRunMerkleReplay_SkipDisabled verifies that +// MerkleReplaySkipRecentMinutes=0 forces a full re-register regardless +// of recency — the escape hatch operators need after a known +// merkle-service wipe. +func TestRunMerkleReplay_SkipDisabled(t *testing.T) { + log := &eventLog{} + ms := newMockStore() + now := time.Now() + ms.replayRows = []*models.TransactionStatus{ + {TxID: "tx-1", Status: models.StatusReceived, MerkleRegisteredAt: now.Add(-1 * time.Minute)}, + {TxID: "tx-2", Status: models.StatusReceived, MerkleRegisteredAt: now.Add(-30 * time.Second)}, + } + + merkleSrv := newMerkleServer(log, http.StatusOK) + defer merkleSrv.Close() + + p := replayPropagator(t, ms, merkleSrv.URL, func(cfg *config.Config) { + cfg.Propagation.MerkleReplaySkipRecentMinutes = 0 + }) + p.runMerkleReplay(context.Background()) + + if got := log.count("register:"); got != 2 { + t.Errorf("registered=%d want 2 (skip disabled — every row re-registers)", got) + } +} + +// TestRunMerkleReplay_LookbackDefault24h pins the lookback default +// change. Rows older than 24h are filtered out by IterateStatusesSince; +// only the recent rows make it into the replay scan. +func TestRunMerkleReplay_LookbackDefault24h(t *testing.T) { + log := &eventLog{} + ms := newMockStore() + now := time.Now() + ms.replayRows = []*models.TransactionStatus{ + {TxID: "tx-recent", Status: models.StatusReceived, Timestamp: now.Add(-12 * time.Hour)}, + {TxID: "tx-old", Status: models.StatusReceived, Timestamp: now.Add(-5 * 24 * time.Hour)}, + } + + merkleSrv := newMerkleServer(log, http.StatusOK) + defer merkleSrv.Close() + + p := replayPropagator(t, ms, merkleSrv.URL, func(cfg *config.Config) { + cfg.Propagation.RegisterReplayLookbackHours = 0 // fall back to defaultReplayLookback (24h) + cfg.Propagation.MerkleReplaySkipRecentMinutes = 0 + }) + p.runMerkleReplay(context.Background()) + + if got := log.count("register:"); got != 1 { + t.Errorf("registered=%d want 1 (only the 12h-old row is in lookback)", got) + } + for _, ev := range log.all() { + if strings.Contains(ev, "tx-old") { + t.Errorf("event %q: tx-old is 5 days old, must be excluded by 24h default lookback", ev) + } + } +} + +// TestRunMerkleReplay_RateLimit verifies the throttle: with RPS=10 and +// batch size 1000, a 30-row replay falls into one batch and pays +// ~3s of inter-batch sleep before flushing. (The first flush is also +// throttled in the current implementation since we sleep before each +// non-empty flush.) Wall-time floor with generous CI-slack. +func TestRunMerkleReplay_RateLimit(t *testing.T) { + log := &eventLog{} + ms := newMockStore() + rows := make([]*models.TransactionStatus, 30) + for i := range rows { + rows[i] = &models.TransactionStatus{TxID: fmt.Sprintf("tx-%d", i), Status: models.StatusReceived} + } + ms.replayRows = rows + + merkleSrv := newMerkleServer(log, http.StatusOK) + defer merkleSrv.Close() + + p := replayPropagator(t, ms, merkleSrv.URL, func(cfg *config.Config) { + cfg.Propagation.MerkleReplayRPS = 10 + cfg.Propagation.MerkleReplaySkipRecentMinutes = 0 + }) + + start := time.Now() + p.runMerkleReplay(context.Background()) + elapsed := time.Since(start) + + if got := log.count("register:"); got != 30 { + t.Errorf("registered=%d want 30", got) + } + // 30 rows / 10 rps = 3s nominal. Accept ≥ 2.5s to absorb scheduling jitter. + if elapsed < 2500*time.Millisecond { + t.Errorf("elapsed=%v want ≥ 2.5s with RPS=10 over 30 rows", elapsed) + } +} + +// TestRunMerkleReplay_MarksSuccessfulFlush pins the round-trip: +// replay's successful flush() must stamp merkle_registered_at on the +// rows it sent, so the NEXT replay skips them. +func TestRunMerkleReplay_MarksSuccessfulFlush(t *testing.T) { + log := &eventLog{} + ms := newMockStore() + ms.replayRows = []*models.TransactionStatus{ + {TxID: "tx-1", Status: models.StatusReceived}, + {TxID: "tx-2", Status: models.StatusReceived}, + {TxID: "tx-3", Status: models.StatusReceived}, + } + + merkleSrv := newMerkleServer(log, http.StatusOK) + defer merkleSrv.Close() + + p := replayPropagator(t, ms, merkleSrv.URL, func(cfg *config.Config) { + cfg.Propagation.MerkleReplaySkipRecentMinutes = 0 // not relevant here, just keep behavior explicit + cfg.Propagation.MerkleReplayRPS = 0 // no throttle so the test stays fast + }) + p.runMerkleReplay(context.Background()) + + if ms.markCount() != 1 { + t.Errorf("expected 1 mark batch (one successful flush), got %d", ms.markCount()) + } + got := map[string]bool{} + for _, txid := range ms.lastMark() { + got[txid] = true + } + for _, want := range []string{"tx-1", "tx-2", "tx-3"} { + if !got[want] { + t.Errorf("expected %s in last mark, got %v", want, ms.lastMark()) + } + } +} + // Test 7: Batch of 100 — all registered then broadcast in single call func TestProcessBatch_100Transactions(t *testing.T) { var registerCount atomic.Int32 @@ -976,6 +1215,83 @@ func TestRegisterBatch_Metric_AllFailed(t *testing.T) { } } +// TestRegisterBatch_MarksSuccessesOnly is the issue #145 hook: every txid +// that successfully /watch-registers must get a merkle_registered_at stamp +// on its row so the next startup replay can skip it. Failed txids must NOT +// be marked — their row is PENDING_RETRY and the reaper will re-register. +func TestRegisterBatch_MarksSuccessesOnly(t *testing.T) { + merkleSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req struct { + TxID string `json:"txid"` + } + _ = json.NewDecoder(r.Body).Decode(&req) + if req.TxID == "tx-bad" { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + })) + defer merkleSrv.Close() + teranodeSrv := newTeranodeServer(&eventLog{}, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + + for _, txid := range []string{"tx-good-a", "tx-bad", "tx-good-b"} { + if err := p.handleMessage(context.Background(), consumerMsg(makePropMsg(txid))); err != nil { + t.Fatalf("handleMessage %s: %v", txid, err) + } + } + if err := p.flushBatch(context.Background()); err != nil { + t.Fatalf("flushBatch: %v", err) + } + + marks := ms.allMarks() + if len(marks) != 2 { + t.Fatalf("expected 2 marks (only good txs), got %d: %v", len(marks), marks) + } + got := map[string]bool{} + for _, m := range marks { + got[m] = true + } + if !got["tx-good-a"] || !got["tx-good-b"] { + t.Errorf("expected tx-good-a and tx-good-b marked, got %v", got) + } + if got["tx-bad"] { + t.Errorf("tx-bad must NOT be marked — it failed registration") + } +} + +// TestRegisterBatch_MarkStoreFailure_DoesNotBlockBroadcast pins the +// "best-effort" contract on the mark hook: the marker is a replay-skip +// hint, not part of F-024. If MarkMerkleRegisteredByTxIDs returns an +// error, broadcast must still happen — worst case the next replay +// re-registers one extra time. +func TestRegisterBatch_MarkStoreFailure_DoesNotBlockBroadcast(t *testing.T) { + merkleSrv := newMerkleServer(&eventLog{}, http.StatusOK) + defer merkleSrv.Close() + + broadcastLog := &eventLog{} + teranodeSrv := newTeranodeServer(broadcastLog, http.StatusOK) + defer teranodeSrv.Close() + + ms := newMockStore() + ms.markErr = errors.New("store down") + p := newPropagator(merkleSrv.URL, teranodeSrv.URL, ms) + + if err := handleAndFlush(t, p, makePropMsg("tx-1")); err != nil { + t.Fatalf("handleAndFlush: %v", err) + } + + if broadcastLog.count("broadcast") != 1 { + t.Errorf("broadcast should still fire on mark failure, got %d", broadcastLog.count("broadcast")) + } + if u := ms.lastUpdateForTxid("tx-1"); u == nil || u.Status != models.StatusAcceptedByNetwork { + t.Errorf("tx-1: expected ACCEPTED_BY_NETWORK, got %+v", u) + } +} + // F-024 durability: a registration failure creates a PENDING_RETRY row so // the reaper picks the tx back up on its cadence — registration retries are // no longer the Kafka consumer's responsibility (which used to be coupled to diff --git a/services/propagation/replay.go b/services/propagation/replay.go index 4241df6..ff8e759 100644 --- a/services/propagation/replay.go +++ b/services/propagation/replay.go @@ -12,10 +12,22 @@ import ( ) // defaultReplayLookback is the IterateStatusesSince window used when the -// operator hasn't pinned register_replay_lookback_hours. 7 days covers the -// typical confirmation horizon (deep reorgs + watchdog recency_depth) while -// keeping startup work bounded for accounts with months of history. -const defaultReplayLookback = 7 * 24 * time.Hour +// operator hasn't pinned register_replay_lookback_hours. 24h covers the +// confirmation horizon and watchdog recency window while keeping startup +// work bounded — a non-terminal tx older than this is almost certainly +// stuck, and re-registering it on every restart won't unstick it (issue +// #145, was 7 days). +const defaultReplayLookback = 24 * time.Hour + +// defaultReplaySkipRecent is the merkle_registered_at recency window used +// when MerkleReplaySkipRecentMinutes isn't set. Matches merkle-service's +// postMineTTLSec (1800s = 30min): if we registered within this window, +// merkle-service almost certainly still has the row. +const defaultReplaySkipRecent = 30 * time.Minute + +// defaultReplayRPS caps the average requests-per-second the replay loop +// issues against merkle-service when MerkleReplayRPS isn't set. +const defaultReplayRPS = 50 // runMerkleReplay re-registers every non-terminal tx in the store with // merkle-service /watch. Runs once on startup and exits. @@ -51,6 +63,23 @@ func (p *Propagator) runMerkleReplay(ctx context.Context) { if concurrency <= 0 { concurrency = 10 } + // skipRecent: rows registered within this window are skipped. 0 disables + // the skip — useful for forcing a full re-sync after a known + // merkle-service wipe (issue #145). + var skipRecent time.Duration + switch m := p.cfg.Propagation.MerkleReplaySkipRecentMinutes; { + case m < 0: + skipRecent = defaultReplaySkipRecent + case m == 0: + skipRecent = 0 + default: + skipRecent = time.Duration(m) * time.Minute + } + // rps: average rate cap on RegisterBatch calls. 0 disables throttling. + rps := p.cfg.Propagation.MerkleReplayRPS + if rps < 0 { + rps = defaultReplayRPS + } // batchSize bounds the in-memory accumulator before each RegisterBatch // round. Small enough that a stalled merkle-service doesn't pin tens of // MB of strings while we wait; large enough that the per-batch fixed @@ -62,15 +91,33 @@ func (p *Propagator) runMerkleReplay(ctx context.Context) { "merkle-service replay starting", zap.Time("since", since), zap.Int("concurrency", concurrency), + zap.Duration("skip_recent", skipRecent), + zap.Int("rps", rps), ) - var scanned, queued, failures int + var scanned, queued, failures, skippedRecent int + var throttled time.Duration batch := make([]merkleservice.Registration, 0, batchSize) + now := time.Now() flush := func() { if len(batch) == 0 { return } + // Average-rate throttle: sleep proportional to batch size so the + // long-run RPS converges on the configured cap. Simpler than a + // token bucket and good enough for boot-time catch-up. + if rps > 0 { + delay := time.Duration(float64(len(batch))/float64(rps)) * time.Second + if delay > 0 { + throttled += delay + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + } + } if err := p.merkleClient.RegisterBatch(ctx, batch, concurrency); err != nil { failures += len(batch) p.logger.Warn( @@ -78,6 +125,22 @@ func (p *Propagator) runMerkleReplay(ctx context.Context) { zap.Int("batch_size", len(batch)), zap.Error(err), ) + batch = batch[:0] + return + } + // On success, stamp merkle_registered_at so future replays can skip + // these rows. RegisterBatch is fail-fast, so reaching here means + // every entry succeeded — mark them all in one round-trip. + txids := make([]string, len(batch)) + for i := range batch { + txids[i] = batch[i].TxID + } + if err := p.store.MarkMerkleRegisteredByTxIDs(ctx, txids, time.Now()); err != nil { + p.logger.Warn( + "merkle-service replay mark failed", + zap.Int("count", len(txids)), + zap.Error(err), + ) } batch = batch[:0] } @@ -93,6 +156,13 @@ func (p *Propagator) runMerkleReplay(ctx context.Context) { if status.TxID == "" { return nil } + // Skip rows we registered recently — merkle-service almost certainly + // still has them, and POST /watch doesn't refresh expires_at anyway + // (issue #145). skipRecent == 0 disables. + if skipRecent > 0 && !status.MerkleRegisteredAt.IsZero() && now.Sub(status.MerkleRegisteredAt) < skipRecent { + skippedRecent++ + return nil + } batch = append(batch, merkleservice.Registration{ TxID: status.TxID, CallbackURL: p.cfg.CallbackURL, @@ -126,6 +196,8 @@ func (p *Propagator) runMerkleReplay(ctx context.Context) { zap.Duration("elapsed", time.Since(start)), zap.Int("scanned", scanned), zap.Int("queued", queued), + zap.Int("skipped_recent", skippedRecent), zap.Int("failures", failures), + zap.Duration("throttled", throttled), ) } diff --git a/services/webhook/service_test.go b/services/webhook/service_test.go index ef13ba3..4af29e6 100644 --- a/services/webhook/service_test.go +++ b/services/webhook/service_test.go @@ -100,6 +100,10 @@ func (s *fakeStore) GetBUMP(context.Context, string) (uint64, []byte, error) { func (s *fakeStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ([]*models.TransactionStatus, error) { return nil, nil } + +func (s *fakeStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error { + return nil +} func (s *fakeStore) InsertSubmission(context.Context, *models.Submission) error { return nil } func (s *fakeStore) GetSubmissionsByToken(context.Context, string) ([]*models.Submission, error) { return nil, nil diff --git a/store/aerospike/aerospike.go b/store/aerospike/aerospike.go index 97e1406..c9c6c9d 100644 --- a/store/aerospike/aerospike.go +++ b/store/aerospike/aerospike.go @@ -367,6 +367,9 @@ func (s *Store) UpdateStatus(ctx context.Context, status *models.TransactionStat if len(status.MerklePath) > 0 { bins["merkle_path"] = []byte(status.MerklePath) } + if !status.MerkleRegisteredAt.IsZero() { + bins["merkle_registered_at"] = status.MerkleRegisteredAt.UnixMilli() + } // Enforce the status lattice: refuse to overwrite a terminal status with a // later, lower-priority update (e.g. a stray SEEN_ON_NETWORK callback after @@ -751,6 +754,45 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeig return statuses, nil } +// MarkMerkleRegisteredByTxIDs writes merkle_registered_at = ts.UnixMilli() on +// every existing transaction record in the txid list. Unknown txids are +// silently skipped via UPDATE_ONLY (matching SetMinedByTxIDs). Used by the +// startup replay loop to skip rows registered recently (issue #145). +func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + bwp := aero.NewBatchWritePolicy() + bwp.RecordExistsAction = aero.UPDATE_ONLY + + for i := 0; i < len(txids); i += s.batchSize { + if err := ctx.Err(); err != nil { + return err + } + end := i + s.batchSize + if end > len(txids) { + end = len(txids) + } + batch := txids[i:end] + + records := make([]aero.BatchRecordIfc, len(batch)) + for j, txid := range batch { + key, err := s.key(setTransactions, txid) + if err != nil { + continue + } + records[j] = aero.NewBatchWrite(bwp, key, + aero.PutOp(aero.NewBin("merkle_registered_at", ts.UnixMilli())), + ) + } + + if err := s.client.BatchOperate(s.batchPolicy(ctx), records); err != nil { + return fmt.Errorf("batch mark merkle registered: %w", err) + } + } + return nil +} + // --- BUMP Operations --- // // BUMPs are stored as a manifest record at primary key plus N @@ -1718,6 +1760,11 @@ func recordToStatus(rec *aero.Record, txid string) *models.TransactionStatus { status.CreatedAt = time.UnixMilli(int64(ms)) } } + if v, ok := rec.Bins["merkle_registered_at"]; ok { + if ms, ok := v.(int); ok { + status.MerkleRegisteredAt = time.UnixMilli(int64(ms)) + } + } return status } diff --git a/store/aerospike/ctx_test.go b/store/aerospike/ctx_test.go index c2db271..0486add 100644 --- a/store/aerospike/ctx_test.go +++ b/store/aerospike/ctx_test.go @@ -151,3 +151,73 @@ func TestUpdateStatus_ExistingTxidStillWorks(t *testing.T) { t.Fatalf("expected SEEN_ON_NETWORK, got %+v", got) } } + +// TestMarkMerkleRegisteredByTxIDs_UpdatesExistingRows pins the issue #145 +// fix: every successful merkle-service register must stamp the row so the +// next startup replay can skip it. +func TestMarkMerkleRegisteredByTxIDs_UpdatesExistingRows(t *testing.T) { + s := integrationStore(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stamp := time.Now().Format("150405.000000000") + txids := []string{"mr-1-" + stamp, "mr-2-" + stamp, "mr-3-" + stamp} + for _, txid := range txids { + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: txid, Status: models.StatusReceived, + }); err != nil { + t.Fatalf("seed %s: %v", txid, err) + } + } + + ts := time.Now().Add(-5 * time.Minute).Truncate(time.Millisecond) + if err := s.MarkMerkleRegisteredByTxIDs(ctx, txids, ts); err != nil { + t.Fatalf("MarkMerkleRegisteredByTxIDs: %v", err) + } + + for _, txid := range txids { + got, err := s.GetStatus(ctx, txid) + if err != nil { + t.Fatalf("GetStatus %s: %v", txid, err) + } + if got == nil { + t.Fatalf("%s: missing after mark", txid) + } + if !got.MerkleRegisteredAt.Equal(ts) { + t.Errorf("%s: MerkleRegisteredAt=%v want %v", txid, got.MerkleRegisteredAt, ts) + } + } +} + +// TestMarkMerkleRegisteredByTxIDs_SkipsUnknownTxIDs verifies the +// UPDATE_ONLY contract: passing a txid with no existing row is a silent +// no-op rather than creating a phantom row (matching SetMinedByTxIDs). +func TestMarkMerkleRegisteredByTxIDs_SkipsUnknownTxIDs(t *testing.T) { + s := integrationStore(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + stamp := time.Now().Format("150405.000000000") + known := "mr-known-" + stamp + ghosts := []string{"mr-ghost-a-" + stamp, "mr-ghost-b-" + stamp} + + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: known, Status: models.StatusReceived, + }); err != nil { + t.Fatalf("seed: %v", err) + } + + all := append([]string{known}, ghosts...) + if err := s.MarkMerkleRegisteredByTxIDs(ctx, all, time.Now()); err != nil { + t.Fatalf("mark: %v", err) + } + + if got, _ := s.GetStatus(ctx, known); got == nil || got.MerkleRegisteredAt.IsZero() { + t.Errorf("known: MerkleRegisteredAt should be set, got %+v", got) + } + for _, txid := range ghosts { + if got, _ := s.GetStatus(ctx, txid); got != nil { + t.Errorf("%s: unknown txid should not have created a row", txid) + } + } +} diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index 0ab815f..25cd52d 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -57,19 +57,20 @@ type Store struct { // unix-nanoseconds to dodge JSON time-zone drift and to keep retry index // keys byte-for-byte consistent. type storedStatus struct { - TxID string `json:"txid"` - Status string `json:"status"` - StatusCode int `json:"status_code,omitempty"` - BlockHash string `json:"block_hash,omitempty"` - BlockHeight uint64 `json:"block_height,omitempty"` - MerklePath []byte `json:"merkle_path,omitempty"` - ExtraInfo string `json:"extra_info,omitempty"` - CompetingTxs []string `json:"competing_txs,omitempty"` - RawTx []byte `json:"raw_tx,omitempty"` - RetryCount int `json:"retry_count,omitempty"` - TimestampUnixNs int64 `json:"ts"` - CreatedUnixNs int64 `json:"created_at,omitempty"` - NextRetryUnixNs int64 `json:"next_retry_at,omitempty"` + TxID string `json:"txid"` + Status string `json:"status"` + StatusCode int `json:"status_code,omitempty"` + BlockHash string `json:"block_hash,omitempty"` + BlockHeight uint64 `json:"block_height,omitempty"` + MerklePath []byte `json:"merkle_path,omitempty"` + ExtraInfo string `json:"extra_info,omitempty"` + CompetingTxs []string `json:"competing_txs,omitempty"` + RawTx []byte `json:"raw_tx,omitempty"` + RetryCount int `json:"retry_count,omitempty"` + TimestampUnixNs int64 `json:"ts"` + CreatedUnixNs int64 `json:"created_at,omitempty"` + NextRetryUnixNs int64 `json:"next_retry_at,omitempty"` + MerkleRegisteredUnixNs int64 `json:"merkle_registered_at,omitempty"` } func (s storedStatus) toModel() *models.TransactionStatus { @@ -94,6 +95,9 @@ func (s storedStatus) toModel() *models.TransactionStatus { if s.NextRetryUnixNs != 0 { out.NextRetryAt = time.Unix(0, s.NextRetryUnixNs) } + if s.MerkleRegisteredUnixNs != 0 { + out.MerkleRegisteredAt = time.Unix(0, s.MerkleRegisteredUnixNs) + } return out } @@ -119,6 +123,9 @@ func fromModel(m *models.TransactionStatus) storedStatus { if !m.NextRetryAt.IsZero() { out.NextRetryUnixNs = m.NextRetryAt.UnixNano() } + if !m.MerkleRegisteredAt.IsZero() { + out.MerkleRegisteredUnixNs = m.MerkleRegisteredAt.UnixNano() + } return out } @@ -429,6 +436,9 @@ func mergeStatus(existing *storedStatus, update *models.TransactionStatus) store if !update.NextRetryAt.IsZero() { out.NextRetryUnixNs = update.NextRetryAt.UnixNano() } + if !update.MerkleRegisteredAt.IsZero() { + out.MerkleRegisteredUnixNs = update.MerkleRegisteredAt.UnixNano() + } return out } @@ -853,6 +863,51 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeig return out, nil } +// MarkMerkleRegisteredByTxIDs stamps merkle_registered_at on every existing row +// in the txid list. Unknown txids are silently no-ops (matching SetMinedByTxIDs). +// Per-shard locking matches the rest of the status writers; an UpdateStatus +// racing with this write may see either ordering but never lose other fields +// since mergeStatus only ever overwrites populated fields. Issue #145. +func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if err := ctx.Err(); err != nil { + return err + } + tsNs := ts.UnixNano() + for _, txid := range txids { + if err := ctx.Err(); err != nil { + return err + } + mu := s.shardFor(txid) + mu.Lock() + + existing, err := s.readStoredStatus(txid) + if err != nil { + mu.Unlock() + return err + } + if existing == nil { + mu.Unlock() + continue + } + + updated := *existing + updated.MerkleRegisteredUnixNs = tsNs + + payload, err := json.Marshal(updated) + if err != nil { + mu.Unlock() + return err + } + // No index churn: merkle_registered_at isn't indexed. + err = s.db.Set(txKey(txid), payload, s.writeOpts) + mu.Unlock() + if err != nil { + return err + } + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { diff --git a/store/pebble/pebble_test.go b/store/pebble/pebble_test.go index 49011e2..4e1e903 100644 --- a/store/pebble/pebble_test.go +++ b/store/pebble/pebble_test.go @@ -414,6 +414,96 @@ func TestSetStatusByBlockHash_UpdatesAllInBlock(t *testing.T) { } } +func TestMarkMerkleRegisteredByTxIDs_UpdatesExistingRows(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + txids := []string{"mr-1", "mr-2", "mr-3"} + for _, txid := range txids { + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: txid, Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("seed %s: %v", txid, err) + } + } + + ts := time.Now().Add(-5 * time.Minute) + if err := s.MarkMerkleRegisteredByTxIDs(ctx, txids, ts); err != nil { + t.Fatalf("mark: %v", err) + } + + for _, txid := range txids { + got, err := s.GetStatus(ctx, txid) + if err != nil { + t.Fatalf("GetStatus %s: %v", txid, err) + } + if got == nil { + t.Fatalf("%s: missing after mark", txid) + } + if !got.MerkleRegisteredAt.Equal(ts) { + t.Errorf("%s: MerkleRegisteredAt=%v want %v", txid, got.MerkleRegisteredAt, ts) + } + } +} + +func TestMarkMerkleRegisteredByTxIDs_SkipsUnknownTxIDs(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: "known", Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatal(err) + } + + if err := s.MarkMerkleRegisteredByTxIDs(ctx, []string{"known", "ghost-a", "ghost-b"}, time.Now()); err != nil { + t.Fatalf("mark: %v", err) + } + + got, _ := s.GetStatus(ctx, "known") + if got == nil || got.MerkleRegisteredAt.IsZero() { + t.Errorf("known: MerkleRegisteredAt should be set, got %+v", got) + } + for _, txid := range []string{"ghost-a", "ghost-b"} { + got, _ := s.GetStatus(ctx, txid) + if got != nil { + t.Errorf("%s: unknown txid should not have created a row", txid) + } + } +} + +func TestMarkMerkleRegisteredByTxIDs_RoundTripsThroughIterate(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: "iter-1", Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatal(err) + } + + ts := time.Now() + if err := s.MarkMerkleRegisteredByTxIDs(ctx, []string{"iter-1"}, ts); err != nil { + t.Fatal(err) + } + + var seen *models.TransactionStatus + if err := s.IterateStatusesSince(ctx, time.Now().Add(-time.Hour), func(st *models.TransactionStatus) error { + if st.TxID == "iter-1" { + seen = st + } + return nil + }); err != nil { + t.Fatal(err) + } + if seen == nil { + t.Fatalf("iter-1 not seen") + } + if !seen.MerkleRegisteredAt.Equal(ts) { + t.Errorf("MerkleRegisteredAt=%v want %v", seen.MerkleRegisteredAt, ts) + } +} + func TestSubmissions_InsertAndQueryByTxID(t *testing.T) { s := newTestStore(t) ctx := context.Background() diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index 0dfad7e..01784c8 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -135,14 +135,18 @@ func (s *Store) GetOrInsertStatus(ctx context.Context, status *models.Transactio const q = ` INSERT INTO transactions (txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, - next_retry_at, timestamp_at, created_at) -VALUES ($1,$2,NULLIF($3,0),NULLIF($4,''),NULLIF($5,0),$6,NULLIF($7,''),$8,$9,$10,$11,$12,$13) + next_retry_at, timestamp_at, created_at, merkle_registered_at) +VALUES ($1,$2,NULLIF($3,0),NULLIF($4,''),NULLIF($5,0),$6,NULLIF($7,''),$8,$9,$10,$11,$12,$13,$14) ON CONFLICT (txid) DO NOTHING` var nextRetry any if !status.NextRetryAt.IsZero() { nextRetry = status.NextRetryAt } + var merkleRegisteredAt any + if !status.MerkleRegisteredAt.IsZero() { + merkleRegisteredAt = status.MerkleRegisteredAt + } tag, err := s.pool.Exec( ctx, q, @@ -150,7 +154,7 @@ ON CONFLICT (txid) DO NOTHING` status.BlockHash, int64(status.BlockHeight), /* #nosec G115 */ []byte(status.MerklePath), status.ExtraInfo, competing, []byte(status.RawTx), status.RetryCount, - nextRetry, status.Timestamp, status.CreatedAt, + nextRetry, status.Timestamp, status.CreatedAt, merkleRegisteredAt, ) if err != nil { return nil, false, fmt.Errorf("insert tx %s: %w", status.TxID, err) @@ -169,7 +173,7 @@ ON CONFLICT (txid) DO NOTHING` // columnsPerInsertRow is how many placeholders one row in the multi-row // INSERT VALUES list consumes. Matches the column list in the static SQL // fragment built by BatchGetOrInsertStatus. -const columnsPerInsertRow = 13 +const columnsPerInsertRow = 14 // BatchGetOrInsertStatus is the multi-row form of GetOrInsertStatus. It uses // the "xmax = 0" trick: ON CONFLICT DO UPDATE SET txid = excluded.txid is a @@ -227,13 +231,17 @@ func (s *Store) BatchGetOrInsertStatus(ctx context.Context, statuses []*models.T if !st.NextRetryAt.IsZero() { nextRetry = st.NextRetryAt } + var merkleRegisteredAt any + if !st.MerkleRegisteredAt.IsZero() { + merkleRegisteredAt = st.MerkleRegisteredAt + } args = append( args, st.TxID, string(statusVal), st.StatusCode, st.BlockHash, int64(st.BlockHeight), /* #nosec G115 */ []byte(st.MerklePath), st.ExtraInfo, competing, []byte(st.RawTx), st.RetryCount, - nextRetry, ts, now, + nextRetry, ts, now, merkleRegisteredAt, ) } @@ -247,21 +255,21 @@ func (s *Store) BatchGetOrInsertStatus(ctx context.Context, statuses []*models.T base := i * columnsPerInsertRow fmt.Fprintf( &values, - "($%d,$%d,NULLIF($%d,0),NULLIF($%d,''),NULLIF($%d,0),$%d,NULLIF($%d,''),$%d,$%d,$%d,$%d,$%d,$%d)", + "($%d,$%d,NULLIF($%d,0),NULLIF($%d,''),NULLIF($%d,0),$%d,NULLIF($%d,''),$%d,$%d,$%d,$%d,$%d,$%d,$%d)", base+1, base+2, base+3, base+4, base+5, base+6, base+7, - base+8, base+9, base+10, base+11, base+12, base+13, + base+8, base+9, base+10, base+11, base+12, base+13, base+14, ) } q := ` INSERT INTO transactions (txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, - next_retry_at, timestamp_at, created_at) + next_retry_at, timestamp_at, created_at, merkle_registered_at) VALUES ` + values.String() + ` ON CONFLICT (txid) DO UPDATE SET txid = transactions.txid RETURNING txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at, (xmax = 0) AS inserted` + timestamp_at, created_at, merkle_registered_at, (xmax = 0) AS inserted` rows, err := s.pool.Query(ctx, q, args...) if err != nil { @@ -506,7 +514,7 @@ func (s *Store) GetStatus(ctx context.Context, txid string) (*models.Transaction const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at + timestamp_at, created_at, merkle_registered_at FROM transactions WHERE txid = $1` row := s.pool.QueryRow(ctx, q, txid) st, err := scanStatus(row) @@ -524,7 +532,7 @@ func (s *Store) GetStatusesSince(ctx context.Context, since time.Time) ([]*model const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at + timestamp_at, created_at, merkle_registered_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -551,7 +559,7 @@ func (s *Store) IterateStatusesSince(ctx context.Context, since time.Time, fn fu const q = ` SELECT txid, status, status_code, block_hash, block_height, merkle_path, extra_info, competing_txs, raw_tx, retry_count, next_retry_at, - timestamp_at, created_at + timestamp_at, created_at, merkle_registered_at FROM transactions WHERE timestamp_at >= $1 ORDER BY timestamp_at DESC` rows, err := s.pool.Query(ctx, q, since) @@ -730,6 +738,22 @@ RETURNING txid` return out, rows.Err() } +// MarkMerkleRegisteredByTxIDs stamps merkle_registered_at = $1 on every existing +// row whose txid is in $2. Unknown txids are silently no-ops, matching the +// SetMinedByTxIDs contract. The startup replay loop calls this after each +// successful /watch batch so future replays can skip recently-registered rows +// (issue #145). +func (s *Store) MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error { + if len(txids) == 0 { + return nil + } + const q = `UPDATE transactions SET merkle_registered_at = $1 WHERE txid = ANY($2)` + if _, err := s.pool.Exec(ctx, q, ts, txids); err != nil { + return fmt.Errorf("mark merkle registered: %w", err) + } + return nil +} + // --- BUMP / STUMP --- func (s *Store) InsertBUMP(ctx context.Context, blockHash string, blockHeight uint64, bumpData []byte) error { @@ -1124,23 +1148,24 @@ type rowScanner interface { // the single-row paths stays identical to the existing scanStatus. func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, error) { var ( - st models.TransactionStatus - statusCode *int - blockHash *string - blockHeight *int64 - merklePath []byte - extraInfo *string - competingTxs []byte - rawTx []byte - nextRetry *time.Time - inserted bool + st models.TransactionStatus + statusCode *int + blockHash *string + blockHeight *int64 + merklePath []byte + extraInfo *string + competingTxs []byte + rawTx []byte + nextRetry *time.Time + merkleRegisteredAt *time.Time + inserted bool ) if err := row.Scan( &st.TxID, &st.Status, &statusCode, &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, &inserted, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, &inserted, ); err != nil { return nil, false, err } @@ -1168,27 +1193,31 @@ func scanStatusWithInserted(row rowScanner) (*models.TransactionStatus, bool, er if nextRetry != nil { st.NextRetryAt = *nextRetry } + if merkleRegisteredAt != nil { + st.MerkleRegisteredAt = *merkleRegisteredAt + } return &st, inserted, nil } func scanStatus(row rowScanner) (*models.TransactionStatus, error) { var ( - st models.TransactionStatus - statusCode *int - blockHash *string - blockHeight *int64 - merklePath []byte - extraInfo *string - competingTxs []byte - rawTx []byte - nextRetry *time.Time + st models.TransactionStatus + statusCode *int + blockHash *string + blockHeight *int64 + merklePath []byte + extraInfo *string + competingTxs []byte + rawTx []byte + nextRetry *time.Time + merkleRegisteredAt *time.Time ) if err := row.Scan( &st.TxID, &st.Status, &statusCode, &blockHash, &blockHeight, &merklePath, &extraInfo, &competingTxs, &rawTx, &st.RetryCount, &nextRetry, - &st.Timestamp, &st.CreatedAt, + &st.Timestamp, &st.CreatedAt, &merkleRegisteredAt, ); err != nil { return nil, err } @@ -1216,6 +1245,9 @@ func scanStatus(row rowScanner) (*models.TransactionStatus, error) { if nextRetry != nil { st.NextRetryAt = *nextRetry } + if merkleRegisteredAt != nil { + st.MerkleRegisteredAt = *merkleRegisteredAt + } return &st, nil } diff --git a/store/postgres/postgres_test.go b/store/postgres/postgres_test.go index 34147f0..5026f2f 100644 --- a/store/postgres/postgres_test.go +++ b/store/postgres/postgres_test.go @@ -299,6 +299,97 @@ func TestSetStatusByBlockHash_UpdatesAllInBlock(t *testing.T) { } } +func TestMarkMerkleRegisteredByTxIDs_UpdatesExistingRows(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + txids := []string{"mr-1", "mr-2", "mr-3"} + for _, txid := range txids { + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: txid, Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatalf("seed %s: %v", txid, err) + } + } + + ts := time.Now().Add(-5 * time.Minute).UTC().Round(time.Microsecond) + if err := s.MarkMerkleRegisteredByTxIDs(ctx, txids, ts); err != nil { + t.Fatalf("MarkMerkleRegisteredByTxIDs: %v", err) + } + + for _, txid := range txids { + got, err := s.GetStatus(ctx, txid) + if err != nil { + t.Fatalf("GetStatus %s: %v", txid, err) + } + if got == nil { + t.Fatalf("%s: status missing after mark", txid) + } + if delta := got.MerkleRegisteredAt.Sub(ts).Abs(); delta > time.Millisecond { + t.Errorf("%s: MerkleRegisteredAt=%v want ~%v (delta %v)", txid, got.MerkleRegisteredAt, ts, delta) + } + } +} + +func TestMarkMerkleRegisteredByTxIDs_SkipsUnknownTxIDs(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: "known", Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatal(err) + } + + ts := time.Now() + if err := s.MarkMerkleRegisteredByTxIDs(ctx, []string{"known", "unknown-a", "unknown-b"}, ts); err != nil { + t.Fatalf("mark: %v", err) + } + + got, _ := s.GetStatus(ctx, "known") + if got == nil || got.MerkleRegisteredAt.IsZero() { + t.Errorf("known: MerkleRegisteredAt should be set, got %+v", got) + } + for _, txid := range []string{"unknown-a", "unknown-b"} { + got, _ := s.GetStatus(ctx, txid) + if got != nil { + t.Errorf("%s: unknown txid should not have created a row, got %+v", txid, got) + } + } +} + +func TestMarkMerkleRegisteredByTxIDs_RoundTripsThroughIterate(t *testing.T) { + s := newTestStore(t) + ctx := context.Background() + + if _, _, err := s.GetOrInsertStatus(ctx, &models.TransactionStatus{ + TxID: "iter-1", Status: models.StatusReceived, Timestamp: time.Now(), + }); err != nil { + t.Fatal(err) + } + + ts := time.Now().UTC().Round(time.Microsecond) + if err := s.MarkMerkleRegisteredByTxIDs(ctx, []string{"iter-1"}, ts); err != nil { + t.Fatal(err) + } + + var seen *models.TransactionStatus + if err := s.IterateStatusesSince(ctx, time.Now().Add(-time.Hour), func(st *models.TransactionStatus) error { + if st.TxID == "iter-1" { + seen = st + } + return nil + }); err != nil { + t.Fatalf("IterateStatusesSince: %v", err) + } + if seen == nil { + t.Fatalf("row not seen in iterate") + } + if delta := seen.MerkleRegisteredAt.Sub(ts).Abs(); delta > time.Millisecond { + t.Errorf("MerkleRegisteredAt=%v want ~%v", seen.MerkleRegisteredAt, ts) + } +} + func TestSubmissions_InsertAndQueryByTxID(t *testing.T) { s := newTestStore(t) ctx := context.Background() diff --git a/store/postgres/schema.sql b/store/postgres/schema.sql index 5c7849a..451c17c 100644 --- a/store/postgres/schema.sql +++ b/store/postgres/schema.sql @@ -2,21 +2,27 @@ -- Store.EnsureIndexes() via pgx.Exec; safe to run repeatedly. CREATE TABLE IF NOT EXISTS transactions ( - txid TEXT PRIMARY KEY, - status TEXT NOT NULL, - status_code INT, - block_hash TEXT, - block_height BIGINT, - merkle_path BYTEA, - extra_info TEXT, - competing_txs JSONB, - raw_tx BYTEA, - retry_count INT NOT NULL DEFAULT 0, - next_retry_at TIMESTAMPTZ, - timestamp_at TIMESTAMPTZ NOT NULL, - created_at TIMESTAMPTZ NOT NULL + txid TEXT PRIMARY KEY, + status TEXT NOT NULL, + status_code INT, + block_hash TEXT, + block_height BIGINT, + merkle_path BYTEA, + extra_info TEXT, + competing_txs JSONB, + raw_tx BYTEA, + retry_count INT NOT NULL DEFAULT 0, + next_retry_at TIMESTAMPTZ, + timestamp_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ NOT NULL, + merkle_registered_at TIMESTAMPTZ ); +-- Idempotent column add for stores created before merkle_registered_at was +-- introduced. Existing rows keep NULL until the next successful /watch call +-- repopulates the marker — see issue #145. +ALTER TABLE transactions ADD COLUMN IF NOT EXISTS merkle_registered_at TIMESTAMPTZ; + CREATE INDEX IF NOT EXISTS idx_tx_status ON transactions(status); CREATE INDEX IF NOT EXISTS idx_tx_block_hash ON transactions(block_hash); CREATE INDEX IF NOT EXISTS idx_tx_updated ON transactions(timestamp_at); diff --git a/store/store.go b/store/store.go index b286f53..54c65f8 100644 --- a/store/store.go +++ b/store/store.go @@ -221,6 +221,13 @@ type Store interface { // row stops showing up in ready-retry queries. ClearRetryState(ctx context.Context, txid string, finalStatus models.Status, extraInfo string) error + // MarkMerkleRegisteredByTxIDs records that the given txids have been + // successfully registered with merkle-service at ts. Unknown txids are + // silently skipped (matching SetMinedByTxIDs semantics). Used by the + // startup replay loop to skip rows it already registered recently — see + // issue #145. + MarkMerkleRegisteredByTxIDs(ctx context.Context, txids []string, ts time.Time) error + // EnsureIndexes creates any required secondary indexes for query operations. EnsureIndexes() error