Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions services/api_server/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (m *mockStore) SetStatusByBlockHash(context.Context, string, models.Status)
}
func (m *mockStore) InsertBUMP(context.Context, string, uint64, []byte) error { return nil }
func (m *mockStore) GetBUMP(context.Context, string) (uint64, []byte, error) { return 0, nil, nil }
func (m *mockStore) SetMinedByTxIDs(context.Context, string, []string) ([]*models.TransactionStatus, error) {
func (m *mockStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ([]*models.TransactionStatus, error) {
return nil, nil
}

Expand Down Expand Up @@ -398,8 +398,7 @@ func TestHandleCallback_UnknownTxid_NoPhantomRow(t *testing.T) {
}
body := mustMarshalJSON(t, payload)

req := httptest.NewRequestWithContext(t.Context(), http.MethodPost, "/api/v1/merkle-service/callback", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req := authedCallbackRequest(t, body)
w := httptest.NewRecorder()

router.ServeHTTP(w, req)
Expand Down
52 changes: 35 additions & 17 deletions services/bump_builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,35 @@ func (b *Builder) publishStatus(ctx context.Context, status *models.TransactionS
}
}

// markMinedAndPublish moves the txids to MINED and fans the resulting status
// updates out to the events Publisher. blockHeight is required so each
// published status carries the block-height anchor that downstream SSE /
// webhook / BUMP-dedup consumers depend on (issue #87 / F-029). If a backend
// regresses and returns a status with BlockHeight == 0, the publish path
// repairs it from the compound BUMP's height before fanning out so a
// half-applied revert can never reintroduce the original bug.
func (b *Builder) markMinedAndPublish(ctx context.Context, logger *zap.Logger, blockHash string, blockHeight uint64, txids []string) {
mined, err := b.store.SetMinedByTxIDs(ctx, blockHash, blockHeight, txids)
if err != nil {
logger.Error("failed to set mined status", zap.Error(err))
return
}
logger.Info("set transactions to MINED",
zap.Int("count", len(mined)),
zap.Uint64("block_height", blockHeight),
)
// SetMinedByTxIDs returns full status objects only for the rows it
// actually updated — silently skipping txids without an existing record.
// Publish the rich rows directly so SSE clients receive blockHash /
// blockHeight / merklePath in their status updates.
for _, st := range mined {
if st.BlockHeight == 0 {
st.BlockHeight = blockHeight
}
b.publishStatus(ctx, st)
}
}

func (b *Builder) Name() string { return "bump-builder" }

func (b *Builder) Start(ctx context.Context) error {
Expand Down Expand Up @@ -208,24 +237,13 @@ func (b *Builder) handleMessage(ctx context.Context, msg *kafka.Message) error {
return fmt.Errorf("storing BUMP: %w", err)
}

// 6. Set tracked transactions to MINED
// 6. Set tracked transactions to MINED.
// blockHeight is threaded through here (and asserted on the returned
// statuses below) because downstream SSE/webhook consumers and the
// dedup path in BUMP-build rely on the height to anchor each MINED
// status to a specific block — a zero/missing height triggered F-029.
if len(txids) > 0 {
mined, err := b.store.SetMinedByTxIDs(ctx, blockHash, txids)
if err != nil {
logger.Error("failed to set mined status", zap.Error(err))
} else {
logger.Info("set transactions to MINED",
zap.Int("count", len(mined)),
)
// SetMinedByTxIDs returns full status objects only for the rows
// it actually updated — silently skipping txids without an
// existing record. Publish the rich rows directly so SSE
// clients receive blockHash / blockHeight / merklePath in their
// status updates.
for _, st := range mined {
b.publishStatus(ctx, st)
}
}
b.markMinedAndPublish(ctx, logger, blockHash, blockHeight, txids)
}

// 7. Prune STUMPs
Expand Down
237 changes: 229 additions & 8 deletions services/bump_builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/bsv-blockchain/arcade/teranode"
)

// testBlockHash is a synthetic block hash reused across the table-style tests
// in this file. Lifted to a constant to satisfy goconst now that the
// regression tests for issue #87 push the literal count past the threshold.
const testBlockHash = "aabbccdd00000000000000000000000000000000000000000000000000000000"

// --- Mock Store ---

type mockStore struct {
Expand All @@ -43,8 +48,9 @@ type mockStore struct {
}

type minedCall struct {
blockHash string
txids []string
blockHash string
blockHeight uint64
txids []string
}

func newMockStore() *mockStore {
Expand Down Expand Up @@ -85,20 +91,21 @@ func (m *mockStore) InsertBUMP(_ context.Context, blockHash string, _ uint64, bu
return nil
}

func (m *mockStore) SetMinedByTxIDs(_ context.Context, blockHash string, txids []string) ([]*models.TransactionStatus, error) {
func (m *mockStore) SetMinedByTxIDs(_ context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) {
m.mu.Lock()
defer m.mu.Unlock()
if m.setMinedErr != nil {
return nil, m.setMinedErr
}
m.minedCalls = append(m.minedCalls, minedCall{blockHash, txids})
m.minedCalls = append(m.minedCalls, minedCall{blockHash, blockHeight, txids})
var statuses []*models.TransactionStatus
for _, txid := range txids {
statuses = append(statuses, &models.TransactionStatus{
TxID: txid,
Status: models.StatusMined,
BlockHash: blockHash,
Timestamp: time.Now(),
TxID: txid,
Status: models.StatusMined,
BlockHash: blockHash,
BlockHeight: blockHeight,
Timestamp: time.Now(),
})
}
return statuses, nil
Expand Down Expand Up @@ -757,3 +764,217 @@ func searchStr(s, substr string) bool {
}
return false
}

// --- Publisher mock + block-height regression tests for issue #87 / F-029 ---

// recordingPublisher captures every TransactionStatus the builder publishes
// downstream so tests can assert on what SSE / webhook subscribers would see.
type recordingPublisher struct {
mu sync.Mutex
published []*models.TransactionStatus
}

func (p *recordingPublisher) Publish(_ context.Context, status *models.TransactionStatus) error {
p.mu.Lock()
defer p.mu.Unlock()
// Copy so later mutation by the builder (or the store) cannot retroactively
// repair a height we want to assert was missing at publish time.
cp := *status
p.published = append(p.published, &cp)
return nil
}

func (p *recordingPublisher) Subscribe(context.Context) (<-chan *models.TransactionStatus, error) {
return nil, errors.New("recordingPublisher: Subscribe not used in tests")
}

func (p *recordingPublisher) Close() error { return nil }

func (p *recordingPublisher) snapshot() []*models.TransactionStatus {
p.mu.Lock()
defer p.mu.Unlock()
out := make([]*models.TransactionStatus, len(p.published))
copy(out, p.published)
return out
}

// makeMinimalSTUMPAtHeight builds a single-leaf STUMP for txidHex at the given
// block height. Uses go-sdk's MerklePath.Bytes() so the height is encoded as a
// proper BRC-74 varint (the hand-rolled makeMinimalSTUMP only supports heights
// < 0xfd because it writes a single raw byte).
func makeMinimalSTUMPAtHeight(t *testing.T, txidHex string, blockHeight uint32) []byte {
t.Helper()
txHash := mustHash(t, txidHex)
isTxid := true
mp := transaction.NewMerklePath(blockHeight, [][]*transaction.PathElement{
{{Offset: 0, Hash: &txHash, Txid: &isTxid}},
})
return mp.Bytes()
}

// TestBuilder_HandleMessage_PublishesMinedStatusWithBlockHeight is the
// regression test for issue #87 / F-029: the builder must thread the
// compound BUMP's block height all the way through SetMinedByTxIDs and
// onto the TransactionStatus that gets published. A previous code path
// dropped the height before publish, leaving downstream SSE/webhook
// consumers with BlockHash but BlockHeight=0.
func TestBuilder_HandleMessage_PublishesMinedStatusWithBlockHeight(t *testing.T) {
const wantHeight uint32 = 850123
ms := newMockStore()
pub := &recordingPublisher{}

blockHash := testBlockHash
txidHex := "1111111111111111111111111111111111111111111111111111111111111111"

stumpData := makeMinimalSTUMPAtHeight(t, txidHex, wantHeight)
ms.addStump(blockHash, 0, stumpData)

subtreeHash := mustHash(t, txidHex)
root := expectedCompoundRoot(t,
[]*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}},
[]chainhash.Hash{subtreeHash}, nil)
datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash})
defer datahub.Close()

b := newTestBuilder(ms, datahub.URL)
b.publisher = pub

if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil {
t.Fatalf("handleMessage: %v", err)
}

// 1) The store call itself must receive the height — without this, no
// backend can persist it even if the publish path were correct.
ms.mu.Lock()
if len(ms.minedCalls) != 1 {
ms.mu.Unlock()
t.Fatalf("expected 1 SetMinedByTxIDs call, got %d", len(ms.minedCalls))
}
got := ms.minedCalls[0]
ms.mu.Unlock()
if got.blockHeight != uint64(wantHeight) {
t.Errorf("SetMinedByTxIDs got blockHeight=%d, want %d", got.blockHeight, wantHeight)
}
if got.blockHash != blockHash {
t.Errorf("SetMinedByTxIDs got blockHash=%q, want %q", got.blockHash, blockHash)
}

// 2) The published TransactionStatus must carry both fields. This is the
// contract SSE / webhook subscribers see; F-029 was that BlockHeight
// was zero here even though BlockHash was set.
emitted := pub.snapshot()
if len(emitted) != 1 {
t.Fatalf("expected 1 published status, got %d", len(emitted))
}
st := emitted[0]
if st.Status != models.StatusMined {
t.Errorf("published status = %q, want MINED", st.Status)
}
if st.BlockHash != blockHash {
t.Errorf("published BlockHash=%q, want %q", st.BlockHash, blockHash)
}
if st.BlockHeight != uint64(wantHeight) {
t.Errorf("published BlockHeight=%d, want %d", st.BlockHeight, wantHeight)
}
}

// TestBuilder_HandleMessage_PublishedHeightIsNeverZero is the narrow
// regression guard: anyone refactoring SetMinedByTxIDs or the publish
// loop and zero-valuing BlockHeight will fail this test even if every
// other assertion happens to pass (e.g. a future test that compares
// published statuses to mock-returned statuses without checking height).
func TestBuilder_HandleMessage_PublishedHeightIsNeverZero(t *testing.T) {
ms := newMockStore()
pub := &recordingPublisher{}

blockHash := testBlockHash
txidHex := "1111111111111111111111111111111111111111111111111111111111111111"

// Use the existing 0x01 minimal STUMP — it encodes blockHeight=1, which is
// non-zero, so any code path that zero-values the height will be caught.
stumpData := makeMinimalSTUMP(txidHex)
ms.addStump(blockHash, 0, stumpData)

subtreeHash := mustHash(t, txidHex)
root := expectedCompoundRoot(t,
[]*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}},
[]chainhash.Hash{subtreeHash}, nil)
datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash})
defer datahub.Close()

b := newTestBuilder(ms, datahub.URL)
b.publisher = pub

if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil {
t.Fatalf("handleMessage: %v", err)
}

emitted := pub.snapshot()
if len(emitted) == 0 {
t.Fatal("expected at least one published status")
}
for i, st := range emitted {
if st.BlockHeight == 0 {
t.Errorf("published status %d has BlockHeight=0; F-029 regression: %+v", i, st)
}
}
}

// TestBuilder_HandleMessage_DefensivelyRestoresHeightIfStoreDropsIt
// asserts the safety net the builder added on top of SetMinedByTxIDs:
// if a backend regresses and forgets to populate BlockHeight on its
// returned status, the publish path repairs it from the compound BUMP's
// height before fanning out. This guards against a partial revert that
// undoes only the store change.
func TestBuilder_HandleMessage_DefensivelyRestoresHeightIfStoreDropsIt(t *testing.T) {
ms := newMockStore()
pub := &recordingPublisher{}

blockHash := testBlockHash
txidHex := "1111111111111111111111111111111111111111111111111111111111111111"

stumpData := makeMinimalSTUMP(txidHex)
ms.addStump(blockHash, 0, stumpData)

subtreeHash := mustHash(t, txidHex)
root := expectedCompoundRoot(t,
[]*models.Stump{{BlockHash: blockHash, SubtreeIndex: 0, StumpData: stumpData}},
[]chainhash.Hash{subtreeHash}, nil)
datahub := newDatahubServer(root, []chainhash.Hash{subtreeHash})
defer datahub.Close()

b := newTestBuilder(ms, datahub.URL)
b.publisher = pub

// Wrap the mock so SetMinedByTxIDs returns BlockHeight=0 — simulating a
// backend that regresses on the persistence side. The builder must still
// publish a non-zero height by falling back to the compound's height.
heightDroppingStore := &heightDroppingMockStore{mockStore: ms}
b.store = heightDroppingStore

if err := b.handleMessage(context.Background(), makeBlockProcessedMsg(blockHash)); err != nil {
t.Fatalf("handleMessage: %v", err)
}

emitted := pub.snapshot()
if len(emitted) != 1 {
t.Fatalf("expected 1 published status, got %d", len(emitted))
}
if emitted[0].BlockHeight == 0 {
t.Errorf("publish path failed to defensively restore BlockHeight: %+v", emitted[0])
}
}

// heightDroppingMockStore wraps mockStore and zeroes BlockHeight on every
// returned status to simulate a buggy backend.
type heightDroppingMockStore struct {
*mockStore
}

func (h *heightDroppingMockStore) SetMinedByTxIDs(ctx context.Context, blockHash string, blockHeight uint64, txids []string) ([]*models.TransactionStatus, error) {
statuses, err := h.mockStore.SetMinedByTxIDs(ctx, blockHash, blockHeight, txids)
for _, s := range statuses {
s.BlockHeight = 0
}
return statuses, err
}
2 changes: 1 addition & 1 deletion services/webhook/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *fakeStore) SetStatusByBlockHash(context.Context, string, models.Status)
}
func (s *fakeStore) InsertBUMP(context.Context, string, uint64, []byte) error { return nil }
func (s *fakeStore) GetBUMP(context.Context, string) (uint64, []byte, error) { return 0, nil, nil }
func (s *fakeStore) SetMinedByTxIDs(context.Context, string, []string) ([]*models.TransactionStatus, error) {
func (s *fakeStore) SetMinedByTxIDs(context.Context, string, uint64, []string) ([]*models.TransactionStatus, error) {
return nil, nil
}
func (s *fakeStore) InsertSubmission(context.Context, *models.Submission) error { return nil }
Expand Down
Loading
Loading