Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions .github/workflows/add-issues-to-project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# ------------------------------------------------------------------------------
# Add Issues to Project Workflow
#
# Purpose: Automatically add every newly opened issue in this repository to the
# bsv-blockchain organization project board so triage and planning stay
# in sync without manual intervention.
#
# Triggers: Runs whenever an issue is opened in this repository.
#
# Maintainer: @mrz1836
# ------------------------------------------------------------------------------

name: add-issues-to-project

# --------------------------------------------------------------------
# Trigger Configuration
# --------------------------------------------------------------------
on:
issues:
types:
- opened

# --------------------------------------------------------------------
# Permissions
# --------------------------------------------------------------------
permissions: read-all

# --------------------------------------------------------------------
# Concurrency Control
# --------------------------------------------------------------------
concurrency:
group: ${{ github.workflow }}-${{ github.event.issue.number }}
cancel-in-progress: false

jobs:
add-to-project:
name: Add issue to bsv-blockchain project #18
runs-on: ubuntu-latest
permissions:
# The org-level project token is supplied via ADD_TO_PROJECT_PAT; no
# repository-scoped write permissions are required here.
contents: read
steps:
# --------------------------------------------------------------------
# Add the newly opened issue to the configured organization project
# --------------------------------------------------------------------
- name: Add issue to project
uses: actions/add-to-project@5afcf98fcd03f1c2f92c3c83f58ae24323cc57fd # v2.0.0
with:
project-url: https://github.com/orgs/bsv-blockchain/projects/18
github-token: ${{ secrets.ADD_TO_PROJECT_PAT }}
27 changes: 24 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,28 @@ type PropagationConfig struct {
RegisterReplayOnStart *bool `mapstructure:"register_replay_on_start"`
// RegisterReplayLookbackHours bounds how far back IterateStatusesSince
// scans when replaying. Older txs are very likely terminal already
// (MINED/IMMUTABLE) and skipping them avoids walking months of history
// on every boot. Defaults to 168 (7 days).
// (MINED/IMMUTABLE), and a non-terminal tx older than this window has
// almost certainly stalled — re-registering it on every boot won't
// unstick it. Defaults to 24 (one day), tightened from the original
// 7 days after issue #145.
RegisterReplayLookbackHours int `mapstructure:"register_replay_lookback_hours"`
// MerkleReplaySkipRecentMinutes lets the startup replay skip txs whose
// MerkleRegisteredAt is within this window. /watch is INSERT ... ON
// CONFLICT DO NOTHING on the merkle-service side and does not refresh
// expires_at, so re-registering a tx merkle-service already knows about
// is wasted work. Default 30 (matches merkle-service postMineTTLSec).
// Set to 0 to disable the skip and re-register every non-terminal tx —
// useful for forcing a full re-sync after a known merkle-service wipe.
// Issue #145.
MerkleReplaySkipRecentMinutes int `mapstructure:"merkle_replay_skip_recent_minutes"`
// MerkleReplayRPS caps the average requests-per-second the startup
// replay issues against merkle-service. Implemented as an inter-batch
// sleep proportional to batch size, so the actual rate hovers around
// the configured RPS rather than burst-then-stall. 0 disables
// throttling. Default 50 — a 24h replay over a 1.85M-row store would
// otherwise pin merkle-service at its postgres write ceiling for hours.
// Issue #145.
MerkleReplayRPS int `mapstructure:"merkle_replay_rps"`
// MaxPending caps the in-memory pending-batch slice the propagation
// consumer accumulates between flushes. Once full, new messages are
// returned as errors from handleMessage so the Kafka consumer's
Expand Down Expand Up @@ -665,7 +684,9 @@ func setDefaults() {
// drops its registration state and arcade silently stops receiving
// callbacks for everything in-flight.
viper.SetDefault("propagation.register_replay_on_start", true)
viper.SetDefault("propagation.register_replay_lookback_hours", 168)
viper.SetDefault("propagation.register_replay_lookback_hours", 24)
viper.SetDefault("propagation.merkle_replay_skip_recent_minutes", 30)
viper.SetDefault("propagation.merkle_replay_rps", 50)

viper.SetDefault("network", NetworkMainnet)

Expand Down
6 changes: 6 additions & 0 deletions models/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type TransactionStatus struct {
RetryCount int `json:"retryCount,omitempty"`
NextRetryAt time.Time `json:"nextRetryAt,omitempty"`
CreatedAt time.Time `json:"-"`
// MerkleRegisteredAt is the wall-clock time of the most recent
// successful merkle-service /watch registration for this txid. Zero
// value means "never registered" (or registered before this field was
// added). The startup replay loop reads this to skip rows registered
// within MerkleReplaySkipRecentMinutes — see issue #145.
MerkleRegisteredAt time.Time `json:"merkleRegisteredAt,omitempty"`
}

// Status represents the various states a transaction can be in
Expand Down
4 changes: 4 additions & 0 deletions services/api_server/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (m *mockStore) SetMinedByTxIDs(context.Context, string, uint64, []string) (
return nil, nil
}

func (m *mockStore) MarkMerkleRegisteredByTxIDs(context.Context, []string, time.Time) error {
return nil
}

func (m *mockStore) InsertSubmission(_ context.Context, sub *models.Submission) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions services/propagation/propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,12 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg)
metrics.PropagationMerkleRegisterDuration.Observe(time.Since(start).Seconds())

registered = make([]propagationMsg, 0, len(batch))
successTxIDs := make([]string, 0, len(batch))
var failedCount int
for i, err := range errs {
if err == nil {
registered = append(registered, batch[i])
successTxIDs = append(successTxIDs, batch[i].TXID)
continue
}
failedCount++
Expand All @@ -412,6 +414,22 @@ func (p *Propagator) registerBatch(ctx context.Context, batch []propagationMsg)
default:
metrics.PropagationMerkleRegisterBatchOutcomeTotal.WithLabelValues("partial").Inc()
}
// Stamp merkle_registered_at on every tx we successfully registered. The
// startup replay loop reads this and skips rows registered within
// MerkleReplaySkipRecentMinutes — without it, every restart re-walks the
// whole watchlist regardless of whether merkle-service already has it
// (issue #145). A failure here must not block broadcast: the mark is a
// hint, not part of the F-024 invariant. Worst case a missed mark causes
// one redundant /watch on the next replay.
if len(successTxIDs) > 0 {
if err := p.store.MarkMerkleRegisteredByTxIDs(ctx, successTxIDs, time.Now()); err != nil {
p.logger.Warn(
"mark merkle-registered failed",
zap.Int("count", len(successTxIDs)),
zap.Error(err),
)
}
}
return registered
}

Expand Down
Loading
Loading