|
| 1 | +//go:build smoke |
| 2 | + |
| 3 | +package smoke |
| 4 | + |
| 5 | +import ( |
| 6 | + "bytes" |
| 7 | + "context" |
| 8 | + "fmt" |
| 9 | + "io" |
| 10 | + "math/rand" |
| 11 | + "net/http" |
| 12 | + "sync" |
| 13 | + "testing" |
| 14 | + "time" |
| 15 | + |
| 16 | + "github.com/bsv-blockchain/go-bt/v2" |
| 17 | +) |
| 18 | + |
| 19 | +// TestSmoke_ChainedTxBatchOrdering is the load-bearing smoke test for |
| 20 | +// arcade's propagation dispatcher's parent-child ordering guarantee. |
| 21 | +// |
| 22 | +// It builds a forest of ~10k chained txs (mixed depths 2–10), submits |
| 23 | +// them via concurrent POST /txs calls, then captures every batch arcade |
| 24 | +// sends to a fake teranode and asserts: |
| 25 | +// |
| 26 | +// 1. No batch ever contains both a parent and one of its children. |
| 27 | +// 2. For every parent→child edge, the parent's batch was sent BEFORE |
| 28 | +// the child's batch. |
| 29 | +// 3. Every tx broadcasts exactly once — no losses, no duplicates. |
| 30 | +// 4. No batch exceeds Propagation.TeranodeMaxBatchSize. |
| 31 | +// 5. The whole pipeline completes within the test window. |
| 32 | +// |
| 33 | +// (1) is the production invariant teranode's /txs endpoint relies on — |
| 34 | +// the dispatcher holds children in heldMsgs until every parent leaves |
| 35 | +// inFlight via ACCEPTED_BY_NETWORK. (2)–(4) are weaker corollaries that |
| 36 | +// also catch interesting regressions: an ordering inversion would |
| 37 | +// surface in (2), a requeue bug in (3), a chunking bug in (4). |
| 38 | +func TestSmoke_ChainedTxBatchOrdering(t *testing.T) { |
| 39 | + const ( |
| 40 | + totalTxs = 10_000 |
| 41 | + minDepth = 2 |
| 42 | + maxDepth = 10 |
| 43 | + submissionBatchSize = 100 |
| 44 | + concurrentSubmitters = 8 |
| 45 | + broadcastWaitTimeout = 60 * time.Second |
| 46 | + teranodeMaxBatchSize = 1024 // matches buildSmokeConfig |
| 47 | + pipelineWallBudget = 60 * time.Second |
| 48 | + ) |
| 49 | + |
| 50 | + recorder := newRecordingTeranode(t) |
| 51 | + rt := startArcadeSmoke(t, smokeOptions{TeranodeURL: recorder.URL()}) |
| 52 | + |
| 53 | + chains := BuildChains(ChainOpts{ |
| 54 | + TotalTxs: totalTxs, |
| 55 | + MinDepth: minDepth, |
| 56 | + MaxDepth: maxDepth, |
| 57 | + Seed: 1, // deterministic; bump to time.Now() for stochastic runs |
| 58 | + }) |
| 59 | + t.Logf("built %d chains, %d total txs", len(chains), countTxs(chains)) |
| 60 | + |
| 61 | + // Flatten into submission batches. Each chain lives entirely |
| 62 | + // inside one submission so the dispatcher's ordering guarantee |
| 63 | + // holds: a single SendBatch on a single-partition Kafka topic |
| 64 | + // preserves order within the batch, so parent always reaches the |
| 65 | + // dispatcher before child. Concurrent submitters race the |
| 66 | + // submissions themselves, but since chains don't span submissions |
| 67 | + // and different chains share no edges, no cross-submission race |
| 68 | + // can produce a child-before-parent on the wire. |
| 69 | + submissions := buildSubmissionBatches(chains, submissionBatchSize) |
| 70 | + |
| 71 | + start := time.Now() |
| 72 | + |
| 73 | + var wg sync.WaitGroup |
| 74 | + sem := make(chan struct{}, concurrentSubmitters) |
| 75 | + for i, batch := range submissions { |
| 76 | + wg.Add(1) |
| 77 | + sem <- struct{}{} |
| 78 | + go func(idx int, batch []*bt.Tx) { |
| 79 | + defer wg.Done() |
| 80 | + defer func() { <-sem }() |
| 81 | + if err := postTxs(rt, batch); err != nil { |
| 82 | + t.Errorf("submission %d: %v", idx, err) |
| 83 | + } |
| 84 | + }(i, batch) |
| 85 | + } |
| 86 | + wg.Wait() |
| 87 | + t.Logf("submitted %d batches in %s", len(submissions), time.Since(start)) |
| 88 | + |
| 89 | + if err := recorder.WaitForTxCount(countTxs(chains), broadcastWaitTimeout); err != nil { |
| 90 | + t.Fatalf("waiting for broadcast: %v", err) |
| 91 | + } |
| 92 | + elapsed := time.Since(start) |
| 93 | + t.Logf("pipeline drained in %s", elapsed) |
| 94 | + |
| 95 | + batches := recorder.Snapshot() |
| 96 | + t.Logf("recorded %d /txs batches", len(batches)) |
| 97 | + |
| 98 | + assertParentChildNeverCohabits(t, batches, chains) |
| 99 | + assertParentPrecedesChild(t, batches, chains) |
| 100 | + assertEveryTxBroadcastExactlyOnce(t, batches, chains) |
| 101 | + assertChunkSize(t, batches, teranodeMaxBatchSize) |
| 102 | + |
| 103 | + if elapsed > pipelineWallBudget { |
| 104 | + t.Errorf("pipeline took %s for %d txs (budget %s) — likely a perf regression", |
| 105 | + elapsed, countTxs(chains), pipelineWallBudget) |
| 106 | + } |
| 107 | +} |
| 108 | + |
| 109 | +// buildSubmissionBatches groups chains into HTTP submission batches such |
| 110 | +// that each chain lives entirely inside one submission. This is the |
| 111 | +// crucial invariant for the test: a SendBatch on the single-partition |
| 112 | +// Kafka topic preserves order, so parent reaches the dispatcher before |
| 113 | +// child for txs in the same submission. Different chains share no |
| 114 | +// edges, so the order in which submissions arrive at Kafka doesn't |
| 115 | +// matter — there's no cross-submission race that can produce a |
| 116 | +// child-before-parent on the wire. |
| 117 | +// |
| 118 | +// Chain order across submissions is shuffled (deterministically by |
| 119 | +// seed) so concurrent submitters still stress the dispatcher with txs |
| 120 | +// arriving in random order. |
| 121 | +func buildSubmissionBatches(chains [][]*bt.Tx, batchSize int) [][]*bt.Tx { |
| 122 | + order := make([]int, len(chains)) |
| 123 | + for i := range order { |
| 124 | + order[i] = i |
| 125 | + } |
| 126 | + rng := rand.New(rand.NewSource(42)) //nolint:gosec // deterministic test fixture |
| 127 | + rng.Shuffle(len(order), func(i, j int) { order[i], order[j] = order[j], order[i] }) |
| 128 | + |
| 129 | + batches := make([][]*bt.Tx, 0) |
| 130 | + cur := make([]*bt.Tx, 0, batchSize) |
| 131 | + for _, ci := range order { |
| 132 | + chain := chains[ci] |
| 133 | + // If adding the whole chain would push us past batchSize, |
| 134 | + // flush the current submission first. The chain is then |
| 135 | + // appended to the new (empty) submission. |
| 136 | + if len(cur)+len(chain) > batchSize && len(cur) > 0 { |
| 137 | + batches = append(batches, cur) |
| 138 | + cur = make([]*bt.Tx, 0, batchSize) |
| 139 | + } |
| 140 | + cur = append(cur, chain...) |
| 141 | + } |
| 142 | + if len(cur) > 0 { |
| 143 | + batches = append(batches, cur) |
| 144 | + } |
| 145 | + return batches |
| 146 | +} |
| 147 | + |
| 148 | +// postTxs POSTs a batch to arcade's /txs endpoint and returns nil on a |
| 149 | +// 2xx response. The body is concatenated raw tx bytes in BSV Extended |
| 150 | +// Format — arcade's validator (post PR #171) calls spv.Verify |
| 151 | +// unconditionally, which needs each input's PreviousTxScript + |
| 152 | +// PreviousTxSatoshis. Those bins live on the wire only in EF; sending |
| 153 | +// standard-format bytes is rejected with "'PreviousTx' not supplied". |
| 154 | +func postTxs(rt *arcadeRuntime, txs []*bt.Tx) error { |
| 155 | + var body bytes.Buffer |
| 156 | + for _, tx := range txs { |
| 157 | + body.Write(tx.ExtendedBytes()) |
| 158 | + } |
| 159 | + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) |
| 160 | + defer cancel() |
| 161 | + req, err := http.NewRequestWithContext(ctx, http.MethodPost, rt.baseURL+"/txs", &body) |
| 162 | + if err != nil { |
| 163 | + return fmt.Errorf("build request: %w", err) |
| 164 | + } |
| 165 | + req.Header.Set("Content-Type", "application/octet-stream") |
| 166 | + resp, err := http.DefaultClient.Do(req) |
| 167 | + if err != nil { |
| 168 | + return fmt.Errorf("post /txs: %w", err) |
| 169 | + } |
| 170 | + defer func() { _ = resp.Body.Close() }() |
| 171 | + if resp.StatusCode/100 != 2 { |
| 172 | + respBody, _ := io.ReadAll(resp.Body) |
| 173 | + return fmt.Errorf("/txs returned %d: %s", resp.StatusCode, respBody) |
| 174 | + } |
| 175 | + return nil |
| 176 | +} |
| 177 | + |
| 178 | +// countTxs sums total tx count across the forest. |
| 179 | +func countTxs(chains [][]*bt.Tx) int { |
| 180 | + n := 0 |
| 181 | + for _, c := range chains { |
| 182 | + n += len(c) |
| 183 | + } |
| 184 | + return n |
| 185 | +} |
| 186 | + |
| 187 | +// parentEdges builds a map child-txid → parent-txid from the chain |
| 188 | +// forest. Roots (chain[0]) are absent. |
| 189 | +func parentEdges(chains [][]*bt.Tx) map[string]string { |
| 190 | + parentOf := make(map[string]string) |
| 191 | + for _, c := range chains { |
| 192 | + for i := 1; i < len(c); i++ { |
| 193 | + parentOf[c[i].TxID()] = c[i-1].TxID() |
| 194 | + } |
| 195 | + } |
| 196 | + return parentOf |
| 197 | +} |
| 198 | + |
| 199 | +// assertParentChildNeverCohabits is the load-bearing assertion. For every |
| 200 | +// recorded batch, it builds the txid set S and checks that no tx in S has |
| 201 | +// a parent also in S. A failure prints the offending parent + child + the |
| 202 | +// batch sequence number so the operator can map the regression back to a |
| 203 | +// dispatcher state-machine bug. |
| 204 | +func assertParentChildNeverCohabits(t *testing.T, batches []BatchRecord, chains [][]*bt.Tx) { |
| 205 | + t.Helper() |
| 206 | + parentOf := parentEdges(chains) |
| 207 | + for _, b := range batches { |
| 208 | + members := make(map[string]struct{}, len(b.TxIDs)) |
| 209 | + for _, id := range b.TxIDs { |
| 210 | + members[id] = struct{}{} |
| 211 | + } |
| 212 | + for _, id := range b.TxIDs { |
| 213 | + parent, ok := parentOf[id] |
| 214 | + if !ok { |
| 215 | + continue |
| 216 | + } |
| 217 | + if _, parentInBatch := members[parent]; parentInBatch { |
| 218 | + t.Fatalf("batch seq=%d contains both parent %s and child %s — dispatcher parent-child invariant violated", |
| 219 | + b.Seq, parent, id) |
| 220 | + } |
| 221 | + } |
| 222 | + } |
| 223 | +} |
| 224 | + |
| 225 | +// assertParentPrecedesChild verifies that for every parent→child edge, |
| 226 | +// the parent's first-appearance batch precedes the child's first- |
| 227 | +// appearance batch. The dispatcher releases children only after the |
| 228 | +// parent terminalizes (ACCEPTED_BY_NETWORK), which by construction is |
| 229 | +// after the parent's batch has been broadcast. |
| 230 | +func assertParentPrecedesChild(t *testing.T, batches []BatchRecord, chains [][]*bt.Tx) { |
| 231 | + t.Helper() |
| 232 | + parentOf := parentEdges(chains) |
| 233 | + firstBatchOf := make(map[string]int, countTxs(chains)) |
| 234 | + for _, b := range batches { |
| 235 | + for _, id := range b.TxIDs { |
| 236 | + if _, exists := firstBatchOf[id]; !exists { |
| 237 | + firstBatchOf[id] = b.Seq |
| 238 | + } |
| 239 | + } |
| 240 | + } |
| 241 | + for child, parent := range parentOf { |
| 242 | + ps, pok := firstBatchOf[parent] |
| 243 | + cs, cok := firstBatchOf[child] |
| 244 | + if !pok || !cok { |
| 245 | + // Coverage gap is reported by assertEveryTxBroadcastExactlyOnce. |
| 246 | + continue |
| 247 | + } |
| 248 | + if ps >= cs { |
| 249 | + t.Errorf("child %s appeared in batch %d but parent %s appeared in batch %d (parent must precede child)", |
| 250 | + child, cs, parent, ps) |
| 251 | + } |
| 252 | + } |
| 253 | +} |
| 254 | + |
| 255 | +// assertEveryTxBroadcastExactlyOnce checks that every submitted tx is in |
| 256 | +// exactly one batch. Missing txs reveal a loss path (e.g., dispatcher |
| 257 | +// dropping a held tx on release); duplicate txs reveal a requeue bug. |
| 258 | +func assertEveryTxBroadcastExactlyOnce(t *testing.T, batches []BatchRecord, chains [][]*bt.Tx) { |
| 259 | + t.Helper() |
| 260 | + count := make(map[string]int, countTxs(chains)) |
| 261 | + for _, b := range batches { |
| 262 | + for _, id := range b.TxIDs { |
| 263 | + count[id]++ |
| 264 | + } |
| 265 | + } |
| 266 | + missing, duplicated := 0, 0 |
| 267 | + for _, c := range chains { |
| 268 | + for _, tx := range c { |
| 269 | + id := tx.TxID() |
| 270 | + switch count[id] { |
| 271 | + case 0: |
| 272 | + missing++ |
| 273 | + if missing <= 5 { |
| 274 | + t.Errorf("tx %s never broadcast", id) |
| 275 | + } |
| 276 | + case 1: |
| 277 | + // OK |
| 278 | + default: |
| 279 | + duplicated++ |
| 280 | + if duplicated <= 5 { |
| 281 | + t.Errorf("tx %s broadcast %d times (expected 1)", id, count[id]) |
| 282 | + } |
| 283 | + } |
| 284 | + } |
| 285 | + } |
| 286 | + if missing > 5 { |
| 287 | + t.Errorf("(plus %d more missing txs, not printed)", missing-5) |
| 288 | + } |
| 289 | + if duplicated > 5 { |
| 290 | + t.Errorf("(plus %d more duplicated txs, not printed)", duplicated-5) |
| 291 | + } |
| 292 | +} |
| 293 | + |
| 294 | +// assertChunkSize sanity-checks the broadcastInChunks layer: every |
| 295 | +// outbound batch fits the configured TeranodeMaxBatchSize. A regression |
| 296 | +// here would indicate the chunker is no longer slicing. |
| 297 | +func assertChunkSize(t *testing.T, batches []BatchRecord, maxBatchSize int) { |
| 298 | + t.Helper() |
| 299 | + for _, b := range batches { |
| 300 | + if len(b.TxIDs) > maxBatchSize { |
| 301 | + t.Errorf("batch seq=%d has %d txs, exceeds TeranodeMaxBatchSize=%d", |
| 302 | + b.Seq, len(b.TxIDs), maxBatchSize) |
| 303 | + } |
| 304 | + } |
| 305 | +} |
0 commit comments