diff --git a/services/api_server/handlers_test.go b/services/api_server/handlers_test.go index f5ceb92..768a8cc 100644 --- a/services/api_server/handlers_test.go +++ b/services/api_server/handlers_test.go @@ -85,7 +85,7 @@ func (m *mockStore) SetStatusByBlockHash(context.Context, string, models.Status) } func (m *mockStore) InsertBUMP(context.Context, string, uint64, []byte) error { return nil } func (m *mockStore) GetBUMP(context.Context, string) (uint64, []byte, error) { return 0, nil, nil } -func (m *mockStore) SetMinedByTxIDs(context.Context, string, []string) ([]*models.TransactionStatus, error) { +func (m *mockStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ([]*models.TransactionStatus, error) { return nil, nil } @@ -398,8 +398,7 @@ func TestHandleCallback_UnknownTxid_NoPhantomRow(t *testing.T) { } body := mustMarshalJSON(t, payload) - req := httptest.NewRequestWithContext(t.Context(), http.MethodPost, "/api/v1/merkle-service/callback", bytes.NewReader(body)) - req.Header.Set("Content-Type", "application/json") + req := authedCallbackRequest(t, body) w := httptest.NewRecorder() router.ServeHTTP(w, req) diff --git a/services/bump_builder/builder.go b/services/bump_builder/builder.go index aa49c9c..6039e72 100644 --- a/services/bump_builder/builder.go +++ b/services/bump_builder/builder.go @@ -63,6 +63,35 @@ func (b *Builder) publishStatus(ctx context.Context, status *models.TransactionS } } +// markMinedAndPublish moves the txids to MINED and fans the resulting status +// updates out to the events Publisher. blockHeight is required so each +// published status carries the block-height anchor that downstream SSE / +// webhook / BUMP-dedup consumers depend on (issue #87 / F-029). If a backend +// regresses and returns a status with BlockHeight == 0, the publish path +// repairs it from the compound BUMP's height before fanning out so a +// half-applied revert can never reintroduce the original bug. +func (b *Builder) markMinedAndPublish(ctx context.Context, logger *zap.Logger, blockHash string, blockHeight uint64, txids []string) { + mined, err := b.store.SetMinedByTxIDs(ctx, blockHash, blockHeight, txids) + if err != nil { + logger.Error("failed to set mined status", zap.Error(err)) + return + } + logger.Info("set transactions to MINED", + zap.Int("count", len(mined)), + zap.Uint64("block_height", blockHeight), + ) + // SetMinedByTxIDs returns full status objects only for the rows it + // actually updated — silently skipping txids without an existing record. + // Publish the rich rows directly so SSE clients receive blockHash / + // blockHeight / merklePath in their status updates. + for _, st := range mined { + if st.BlockHeight == 0 { + st.BlockHeight = blockHeight + } + b.publishStatus(ctx, st) + } +} + func (b *Builder) Name() string { return "bump-builder" } func (b *Builder) Start(ctx context.Context) error { @@ -208,24 +237,13 @@ func (b *Builder) handleMessage(ctx context.Context, msg *kafka.Message) error { return fmt.Errorf("storing BUMP: %w", err) } - // 6. Set tracked transactions to MINED + // 6. Set tracked transactions to MINED. + // blockHeight is threaded through here (and asserted on the returned + // statuses below) because downstream SSE/webhook consumers and the + // dedup path in BUMP-build rely on the height to anchor each MINED + // status to a specific block — a zero/missing height triggered F-029. if len(txids) > 0 { - mined, err := b.store.SetMinedByTxIDs(ctx, blockHash, txids) - if err != nil { - logger.Error("failed to set mined status", zap.Error(err)) - } else { - logger.Info("set transactions to MINED", - zap.Int("count", len(mined)), - ) - // SetMinedByTxIDs returns full status objects only for the rows - // it actually updated — silently skipping txids without an - // existing record. Publish the rich rows directly so SSE - // clients receive blockHash / blockHeight / merklePath in their - // status updates. - for _, st := range mined { - b.publishStatus(ctx, st) - } - } + b.markMinedAndPublish(ctx, logger, blockHash, blockHeight, txids) } // 7. Prune STUMPs diff --git a/services/bump_builder/builder_test.go b/services/bump_builder/builder_test.go index 015db4d..519f380 100644 --- a/services/bump_builder/builder_test.go +++ b/services/bump_builder/builder_test.go @@ -25,6 +25,11 @@ import ( "github.com/bsv-blockchain/arcade/teranode" ) +// testBlockHash is a synthetic block hash reused across the table-style tests +// in this file. Lifted to a constant to satisfy goconst now that the +// regression tests for issue #87 push the literal count past the threshold. +const testBlockHash = "aabbccdd00000000000000000000000000000000000000000000000000000000" + // --- Mock Store --- type mockStore struct { @@ -43,8 +48,9 @@ type mockStore struct { } type minedCall struct { - blockHash string - txids []string + blockHash string + blockHeight uint64 + txids []string } func newMockStore() *mockStore { @@ -85,20 +91,21 @@ func (m *mockStore) InsertBUMP(_ context.Context, blockHash string, _ uint64, bu return nil } -func (m *mockStore) SetMinedByTxIDs(_ context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) { +func (m *mockStore) SetMinedByTxIDs(_ context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) { m.mu.Lock() defer m.mu.Unlock() if m.setMinedErr != nil { return nil, m.setMinedErr } - m.minedCalls = append(m.minedCalls, minedCall{blockHash, txids}) + m.minedCalls = append(m.minedCalls, minedCall{blockHash, blockHeight, txids}) var statuses []*models.TransactionStatus for _, txid := range txids { statuses = append(statuses, &models.TransactionStatus{ - TxID: txid, - Status: models.StatusMined, - BlockHash: blockHash, - Timestamp: time.Now(), + TxID: txid, + Status: models.StatusMined, + BlockHash: blockHash, + BlockHeight: blockHeight, + Timestamp: time.Now(), }) } return statuses, nil @@ -757,3 +764,217 @@ func searchStr(s, substr string) bool { } return false } + +// --- Publisher mock + block-height regression tests for issue #87 / F-029 --- + +// recordingPublisher captures every TransactionStatus the builder publishes +// downstream so tests can assert on what SSE / webhook subscribers would see. +type recordingPublisher struct { + mu sync.Mutex + published []*models.TransactionStatus +} + +func (p *recordingPublisher) Publish(_ context.Context, status *models.TransactionStatus) error { + p.mu.Lock() + defer p.mu.Unlock() + // Copy so later mutation by the builder (or the store) cannot retroactively + // repair a height we want to assert was missing at publish time. + cp := *status + p.published = append(p.published, &cp) + return nil +} + +func (p *recordingPublisher) Subscribe(context.Context) (<-chan *models.TransactionStatus, error) { + return nil, errors.New("recordingPublisher: Subscribe not used in tests") +} + +func (p *recordingPublisher) Close() error { return nil } + +func (p *recordingPublisher) snapshot() []*models.TransactionStatus { + p.mu.Lock() + defer p.mu.Unlock() + out := make([]*models.TransactionStatus, len(p.published)) + copy(out, p.published) + return out +} + +// makeMinimalSTUMPAtHeight builds a single-leaf STUMP for txidHex at the given +// block height. Uses go-sdk's MerklePath.Bytes() so the height is encoded as a +// proper BRC-74 varint (the hand-rolled makeMinimalSTUMP only supports heights +// < 0xfd because it writes a single raw byte). +func makeMinimalSTUMPAtHeight(t *testing.T, txidHex string, blockHeight uint32) []byte { + t.Helper() + txHash := mustHash(t, txidHex) + isTxid := true + mp := transaction.NewMerklePath(blockHeight, [][]*transaction.PathElement{ + {{Offset: 0, Hash: &txHash, Txid: &isTxid}}, + }) + return mp.Bytes() +} + +// TestBuilder_HandleMessage_PublishesMinedStatusWithBlockHeight is the +// regression test for issue #87 / F-029: the builder must thread the +// compound BUMP's block height all the way through SetMinedByTxIDs and +// onto the TransactionStatus that gets published. A previous code path +// dropped the height before publish, leaving downstream SSE/webhook +// consumers with BlockHash but BlockHeight=0. +func TestBuilder_HandleMessage_PublishesMinedStatusWithBlockHeight(t *testing.T) { + const wantHeight uint32 = 850123 + ms := newMockStore() + pub := &recordingPublisher{} + + blockHash := testBlockHash + txidHex := "1111111111111111111111111111111111111111111111111111111111111111" + + stumpData := makeMinimalSTUMPAtHeight(t, txidHex, wantHeight) + ms.addStump(blockHash, 0, stumpData) + + subtreeHash := mustHash(t, txidHex) + root := expectedCompoundRoot(t, + []*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}}, + []chainhash.Hash{subtreeHash}, nil) + datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash}) + defer datahub.Close() + + b := newTestBuilder(ms, datahub.URL) + b.publisher = pub + + if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil { + t.Fatalf("handleMessage: %v", err) + } + + // 1) The store call itself must receive the height — without this, no + // backend can persist it even if the publish path were correct. + ms.mu.Lock() + if len(ms.minedCalls) != 1 { + ms.mu.Unlock() + t.Fatalf("expected 1 SetMinedByTxIDs call, got %d", len(ms.minedCalls)) + } + got := ms.minedCalls[0] + ms.mu.Unlock() + if got.blockHeight != uint64(wantHeight) { + t.Errorf("SetMinedByTxIDs got blockHeight=%d, want %d", got.blockHeight, wantHeight) + } + if got.blockHash != blockHash { + t.Errorf("SetMinedByTxIDs got blockHash=%q, want %q", got.blockHash, blockHash) + } + + // 2) The published TransactionStatus must carry both fields. This is the + // contract SSE / webhook subscribers see; F-029 was that BlockHeight + // was zero here even though BlockHash was set. + emitted := pub.snapshot() + if len(emitted) != 1 { + t.Fatalf("expected 1 published status, got %d", len(emitted)) + } + st := emitted[0] + if st.Status != models.StatusMined { + t.Errorf("published status = %q, want MINED", st.Status) + } + if st.BlockHash != blockHash { + t.Errorf("published BlockHash=%q, want %q", st.BlockHash, blockHash) + } + if st.BlockHeight != uint64(wantHeight) { + t.Errorf("published BlockHeight=%d, want %d", st.BlockHeight, wantHeight) + } +} + +// TestBuilder_HandleMessage_PublishedHeightIsNeverZero is the narrow +// regression guard: anyone refactoring SetMinedByTxIDs or the publish +// loop and zero-valuing BlockHeight will fail this test even if every +// other assertion happens to pass (e.g. a future test that compares +// published statuses to mock-returned statuses without checking height). +func TestBuilder_HandleMessage_PublishedHeightIsNeverZero(t *testing.T) { + ms := newMockStore() + pub := &recordingPublisher{} + + blockHash := testBlockHash + txidHex := "1111111111111111111111111111111111111111111111111111111111111111" + + // Use the existing 0x01 minimal STUMP — it encodes blockHeight=1, which is + // non-zero, so any code path that zero-values the height will be caught. + stumpData := makeMinimalSTUMP(txidHex) + ms.addStump(blockHash, 0, stumpData) + + subtreeHash := mustHash(t, txidHex) + root := expectedCompoundRoot(t, + []*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}}, + []chainhash.Hash{subtreeHash}, nil) + datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash}) + defer datahub.Close() + + b := newTestBuilder(ms, datahub.URL) + b.publisher = pub + + if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil { + t.Fatalf("handleMessage: %v", err) + } + + emitted := pub.snapshot() + if len(emitted) == 0 { + t.Fatal("expected at least one published status") + } + for i, st := range emitted { + if st.BlockHeight == 0 { + t.Errorf("published status %d has BlockHeight=0; F-029 regression: %+v", i, st) + } + } +} + +// TestBuilder_HandleMessage_DefensivelyRestoresHeightIfStoreDropsIt +// asserts the safety net the builder added on top of SetMinedByTxIDs: +// if a backend regresses and forgets to populate BlockHeight on its +// returned status, the publish path repairs it from the compound BUMP's +// height before fanning out. This guards against a partial revert that +// undoes only the store change. +func TestBuilder_HandleMessage_DefensivelyRestoresHeightIfStoreDropsIt(t *testing.T) { + ms := newMockStore() + pub := &recordingPublisher{} + + blockHash := testBlockHash + txidHex := "1111111111111111111111111111111111111111111111111111111111111111" + + stumpData := makeMinimalSTUMP(txidHex) + ms.addStump(blockHash, 0, stumpData) + + subtreeHash := mustHash(t, txidHex) + root := expectedCompoundRoot(t, + []*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}}, + []chainhash.Hash{subtreeHash}, nil) + datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash}) + defer datahub.Close() + + b := newTestBuilder(ms, datahub.URL) + b.publisher = pub + + // Wrap the mock so SetMinedByTxIDs returns BlockHeight=0 — simulating a + // backend that regresses on the persistence side. The builder must still + // publish a non-zero height by falling back to the compound's height. + heightDroppingStore := &heightDroppingMockStore{mockStore: ms} + b.store = heightDroppingStore + + if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil { + t.Fatalf("handleMessage: %v", err) + } + + emitted := pub.snapshot() + if len(emitted) != 1 { + t.Fatalf("expected 1 published status, got %d", len(emitted)) + } + if emitted[0].BlockHeight == 0 { + t.Errorf("publish path failed to defensively restore BlockHeight: %+v", emitted[0]) + } +} + +// heightDroppingMockStore wraps mockStore and zeroes BlockHeight on every +// returned status to simulate a buggy backend. +type heightDroppingMockStore struct { + *mockStore +} + +func (h *heightDroppingMockStore) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) { + statuses, err := h.mockStore.SetMinedByTxIDs(ctx, blockHash, blockHeight, txids) + for _, s := range statuses { + s.BlockHeight = 0 + } + return statuses, err +} diff --git a/services/webhook/service_test.go b/services/webhook/service_test.go index 5a2b8c9..82b95b0 100644 --- a/services/webhook/service_test.go +++ b/services/webhook/service_test.go @@ -94,7 +94,7 @@ func (s *fakeStore) SetStatusByBlockHash(context.Context, string, models.Status) } func (s *fakeStore) InsertBUMP(context.Context, string, uint64, []byte) error { return nil } func (s *fakeStore) GetBUMP(context.Context, string) (uint64, []byte, error) { return 0, nil, nil } -func (s *fakeStore) SetMinedByTxIDs(context.Context, string, []string) ([]*models.TransactionStatus, error) { +func (s *fakeStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ([]*models.TransactionStatus, error) { return nil, nil } func (s *fakeStore) InsertSubmission(context.Context, *models.Submission) error { return nil } diff --git a/store/aerospike/aerospike.go b/store/aerospike/aerospike.go index b38e758..cfeeca1 100644 --- a/store/aerospike/aerospike.go +++ b/store/aerospike/aerospike.go @@ -694,7 +694,11 @@ func (s *Store) ClearRetryState(ctx context.Context, txid string, finalStatus mo return nil } -func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) { +// SetMinedByTxIDs writes a MINED status batch keyed by blockHash + blockHeight. +// blockHeight is persisted as the block_height bin and echoed back on each +// returned TransactionStatus so SSE/webhook consumers always see the height +// alongside the hash (issue #87 / F-029). +func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) { now := time.Now() var statuses []*models.TransactionStatus @@ -720,6 +724,7 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []s ops := []*aero.Operation{ aero.PutOp(aero.NewBin("status", string(models.StatusMined))), aero.PutOp(aero.NewBin("block_hash", blockHash)), + aero.PutOp(aero.NewBin("block_height", int(blockHeight))), //nolint:gosec // block height fits in int on 64-bit platforms aero.PutOp(aero.NewBin("timestamp", now.UnixMilli())), } records[j] = aero.NewBatchWrite(bwp, key, ops...) @@ -732,10 +737,11 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []s for j, txid := range batch { if records[j] != nil && records[j].BatchRec().Err == nil { statuses = append(statuses, &models.TransactionStatus{ - TxID: txid, - Status: models.StatusMined, - BlockHash: blockHash, - Timestamp: now, + TxID: txid, + Status: models.StatusMined, + BlockHash: blockHash, + BlockHeight: blockHeight, + Timestamp: now, }) } } diff --git a/store/pebble/pebble.go b/store/pebble/pebble.go index 0751019..7b6b51b 100644 --- a/store/pebble/pebble.go +++ b/store/pebble/pebble.go @@ -788,8 +788,10 @@ func (s *Store) ClearRetryState(ctx context.Context, txid string, finalStatus mo } // SetMinedByTxIDs updates only rows that already exist — matching the -// Aerospike contract where absent txids are silently skipped. -func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) { +// Aerospike contract where absent txids are silently skipped. blockHeight is +// persisted on each row and echoed back on the returned status so SSE/webhook +// consumers see the same height that anchors the BUMP (issue #87 / F-029). +func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) { if err := ctx.Err(); err != nil { return nil, err } @@ -815,6 +817,7 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []s updated := *existing updated.Status = string(models.StatusMined) updated.BlockHash = blockHash + updated.BlockHeight = blockHeight updated.TimestampUnixNs = now.UnixNano() payload, err := json.Marshal(updated) @@ -839,10 +842,11 @@ func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []s } out = append(out, &models.TransactionStatus{ - TxID: txid, - Status: models.StatusMined, - BlockHash: blockHash, - Timestamp: now, + TxID: txid, + Status: models.StatusMined, + BlockHash: blockHash, + BlockHeight: blockHeight, + Timestamp: now, }) } return out, nil diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index ef82f23..3c9d837 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -687,19 +687,22 @@ WHERE txid=$1` } // SetMinedByTxIDs updates only rows that already exist. UPDATE ... WHERE txid -// = ANY($2) is a single round-trip for the batch, and RETURNING lets us emit -// one status object per affected row without a second read. -func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) { +// = ANY($5) is a single round-trip for the batch, and RETURNING lets us emit +// one status object per affected row without a second read. blockHeight is +// persisted alongside blockHash and echoed back on each returned status so +// downstream SSE/webhook consumers always see the height that anchors the +// MINED transition (issue #87 / F-029). +func (s *Store) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) { if len(txids) == 0 { return nil, nil } now := time.Now() const q = ` UPDATE transactions -SET status=$1, block_hash=$2, timestamp_at=$3 -WHERE txid = ANY($4) +SET status=$1, block_hash=$2, block_height=$3, timestamp_at=$4 +WHERE txid = ANY($5) RETURNING txid` - rows, err := s.pool.Query(ctx, q, string(models.StatusMined), blockHash, now, txids) + rows, err := s.pool.Query(ctx, q, string(models.StatusMined), blockHash, int64(blockHeight), now, txids) //nolint:gosec // block height fits in int64 if err != nil { return nil, fmt.Errorf("set mined: %w", err) } @@ -711,10 +714,11 @@ RETURNING txid` return out, err } out = append(out, &models.TransactionStatus{ - TxID: txid, - Status: models.StatusMined, - BlockHash: blockHash, - Timestamp: now, + TxID: txid, + Status: models.StatusMined, + BlockHash: blockHash, + BlockHeight: blockHeight, + Timestamp: now, }) } return out, rows.Err() diff --git a/store/store.go b/store/store.go index c9560b3..e3f5b3d 100644 --- a/store/store.go +++ b/store/store.go @@ -109,11 +109,17 @@ type Store interface { // GetBUMP retrieves the compound BUMP for a block. GetBUMP(ctx context.Context, blockHash string) (blockHeight uint64, bumpData []byte, err error) - // SetMinedByTxIDs marks transactions as mined for a given block hash and tx list. + // SetMinedByTxIDs marks transactions as mined for a given block (hash + height) + // and tx list. blockHeight is required: downstream consumers (SSE, webhooks, + // BUMP-build dedup) rely on the height to anchor each MINED status to a + // specific block, and a zero/missing height has historically caused dropped + // updates and BUMP-build re-work (see issue #87 / F-029). Implementations + // must persist both blockHash and blockHeight on each updated row, and the + // returned TransactionStatus values MUST carry BlockHeight populated. // Implementations must only update records that already exist in the store; // txids with no existing record should be silently skipped (not created). // Returns full status objects only for the transactions that were actually updated. - SetMinedByTxIDs(ctx context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) + SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) // InsertSubmission creates a new submission record InsertSubmission(ctx context.Context, sub *models.Submission) error