From f568353a8783d7dd9e39373589fc817b93d7ae35 Mon Sep 17 00:00:00 2001 From: David Case Date: Tue, 7 Apr 2026 12:15:32 -0400 Subject: [PATCH] fix: broadcast context and status code improvements - Detach broadcast goroutines from request context using WithoutCancel so client disconnect doesn't cancel in-flight broadcasts - Use shared broadcast context with 15s timeout per fan-out operation, cancelled only when all goroutines complete - Set StatusCode on TransactionStatus at construction in submitToTeranodeSync: pass through teranode HTTP codes, 503 for network failures, 504 for timeouts - Simplify collection loop to range over buffered channel --- service/embedded/embedded.go | 140 ++++++++++++++++------------------- 1 file changed, 63 insertions(+), 77 deletions(-) diff --git a/service/embedded/embedded.go b/service/embedded/embedded.go index 78cbe09..1f6f289 100644 --- a/service/embedded/embedded.go +++ b/service/embedded/embedded.go @@ -211,8 +211,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo slog.Duration("timeout", 15*time.Second), ) resultCh := make(chan *models.TransactionStatus, len(endpoints)) - submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() + broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second) broadcastStart := time.Now() var wg sync.WaitGroup @@ -221,7 +220,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo go func(ep string) { defer wg.Done() epStart := time.Now() - status := e.submitToTeranodeSync(submitCtx, ep, tx.Bytes(), txid) + status := e.submitToTeranodeSync(broadcastCtx, ep, tx.Bytes(), txid) if status != nil { e.logger.Debug("endpoint responded", slog.String("txid", txid), @@ -230,80 +229,62 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo slog.Duration("elapsed", time.Since(epStart)), ) } - select { - case resultCh <- status: - case <-submitCtx.Done(): - return - } + resultCh <- status }(endpoint) } - // Close channel when all goroutines complete + // Clean up when all goroutines complete go func() { wg.Wait() + broadcastCancel() close(resultCh) }() // Collect results: - // - Success (ACCEPTED/SENT): return immediately + // - Success (ACCEPTED/SENT): return immediately, goroutines continue propagating // - Rejected (4xx): return immediately, tx is invalid // - Service error (5xx): wait for other endpoints - // - All service errors / timeout: return last error + // - All failed: return last error var lastError *models.TransactionStatus - for { - select { - case status, ok := <-resultCh: - if !ok { - // All endpoints responded, none succeeded - if lastError == nil { - lastError, _ = e.store.GetStatus(ctx, txid) - } - e.logger.Debug("broadcast failed on all endpoints", - slog.String("txid", txid), - slog.String("status", string(lastError.Status)), - slog.Duration("elapsed", time.Since(broadcastStart)), - ) - e.applyBroadcastResult(ctx, txid, lastError) - return lastError, nil - } - if status == nil { - continue - } - switch status.Status { - case models.StatusAcceptedByNetwork, models.StatusSentToNetwork: - e.logger.Debug("broadcast complete", - slog.String("txid", txid), - slog.String("status", string(status.Status)), - slog.Duration("elapsed", time.Since(broadcastStart)), - ) - e.applyBroadcastResult(ctx, txid, status) - return status, nil - case models.StatusRejected: - e.logger.Debug("transaction rejected by network", - slog.String("txid", txid), - slog.Duration("elapsed", time.Since(broadcastStart)), - ) - e.applyBroadcastResult(ctx, txid, status) - return status, nil - case models.StatusServiceError: - lastError = status - case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork, - models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable: - lastError = status - } - case <-submitCtx.Done(): - if lastError == nil { - lastError, _ = e.store.GetStatus(ctx, txid) - } - e.logger.Warn("broadcast timeout", + for status := range resultCh { + if status == nil { + continue + } + switch status.Status { + case models.StatusAcceptedByNetwork, models.StatusSentToNetwork: + e.logger.Debug("broadcast complete", slog.String("txid", txid), - slog.Int("endpoints", len(endpoints)), + slog.String("status", string(status.Status)), slog.Duration("elapsed", time.Since(broadcastStart)), ) - e.applyBroadcastResult(ctx, txid, lastError) - return lastError, nil + e.applyBroadcastResult(ctx, txid, status) + return status, nil + case models.StatusRejected: + e.logger.Debug("transaction rejected by network", + slog.String("txid", txid), + slog.Duration("elapsed", time.Since(broadcastStart)), + ) + e.applyBroadcastResult(ctx, txid, status) + return status, nil + case models.StatusServiceError: + lastError = status + case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork, + models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable: + lastError = status } } + + // All endpoints responded, none succeeded + if lastError == nil { + lastError, _ = e.store.GetStatus(ctx, txid) + } + e.logger.Debug("broadcast failed on all endpoints", + slog.String("txid", txid), + slog.String("status", string(lastError.Status)), + slog.Duration("elapsed", time.Since(broadcastStart)), + ) + e.applyBroadcastResult(ctx, txid, lastError) + return lastError, nil } // SubmitTransactions submits multiple transactions for broadcast. @@ -410,10 +391,6 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts txInfos = append(txInfos, txInfo{tx: tx, rawTx: rawTx, txid: txid, isNew: isNew, status: existingStatus}) } - // Submit all to teranode synchronously with timeout - submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second) - defer cancel() - var responses []*models.TransactionStatus for _, info := range txInfos { // Skip rebroadcast if already confirmed on network or rejected @@ -433,22 +410,20 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts endpoints := e.teranodeClient.GetEndpoints() resultCh := make(chan *models.TransactionStatus, len(endpoints)) + broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second) var wg sync.WaitGroup for _, endpoint := range endpoints { wg.Add(1) go func(ep string) { defer wg.Done() - status := e.submitToTeranodeSync(submitCtx, ep, rawTx, info.txid) - select { - case resultCh <- status: - case <-submitCtx.Done(): - return - } + status := e.submitToTeranodeSync(broadcastCtx, ep, rawTx, info.txid) + resultCh <- status }(endpoint) } go func() { wg.Wait() + broadcastCancel() close(resultCh) }() @@ -557,6 +532,15 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra if statusCode >= 400 && statusCode < 500 { status = models.StatusRejected } + // Map missing HTTP status to appropriate code + switch { + case statusCode != 0: + // pass through actual HTTP status + case ctx.Err() != nil: + statusCode = http.StatusGatewayTimeout + default: + statusCode = http.StatusServiceUnavailable + } e.logger.Debug("endpoint broadcast failed", slog.String("txid", txid), slog.String("endpoint", endpoint), @@ -565,10 +549,11 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra slog.String("error", err.Error()), ) return &models.TransactionStatus{ - TxID: txid, - Status: status, - Timestamp: time.Now(), - ExtraInfo: err.Error(), + TxID: txid, + Status: status, + StatusCode: statusCode, + Timestamp: time.Now(), + ExtraInfo: err.Error(), } } @@ -588,8 +573,9 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra } return &models.TransactionStatus{ - TxID: txid, - Status: txStatus, - Timestamp: time.Now(), + TxID: txid, + Status: txStatus, + StatusCode: http.StatusOK, + Timestamp: time.Now(), } }