Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 63 additions & 77 deletions service/embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
slog.Duration("timeout", 15*time.Second),
)
resultCh := make(chan *models.TransactionStatus, len(endpoints))
submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)

broadcastStart := time.Now()
var wg sync.WaitGroup
Expand All @@ -221,7 +220,7 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
go func(ep string) {
defer wg.Done()
epStart := time.Now()
status := e.submitToTeranodeSync(submitCtx, ep, tx.Bytes(), txid)
status := e.submitToTeranodeSync(broadcastCtx, ep, tx.Bytes(), txid)
if status != nil {
e.logger.Debug("endpoint responded",
slog.String("txid", txid),
Expand All @@ -230,80 +229,62 @@ func (e *Embedded) SubmitTransaction(ctx context.Context, rawTx []byte, opts *mo
slog.Duration("elapsed", time.Since(epStart)),
)
}
select {
case resultCh <- status:
case <-submitCtx.Done():
return
}
resultCh <- status
}(endpoint)
}

// Close channel when all goroutines complete
// Clean up when all goroutines complete
go func() {
wg.Wait()
broadcastCancel()
close(resultCh)
}()

// Collect results:
// - Success (ACCEPTED/SENT): return immediately
// - Success (ACCEPTED/SENT): return immediately, goroutines continue propagating
// - Rejected (4xx): return immediately, tx is invalid
// - Service error (5xx): wait for other endpoints
// - All service errors / timeout: return last error
// - All failed: return last error
var lastError *models.TransactionStatus
for {
select {
case status, ok := <-resultCh:
if !ok {
// All endpoints responded, none succeeded
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Debug("broadcast failed on all endpoints",
slog.String("txid", txid),
slog.String("status", string(lastError.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
}
if status == nil {
continue
}
switch status.Status {
case models.StatusAcceptedByNetwork, models.StatusSentToNetwork:
e.logger.Debug("broadcast complete",
slog.String("txid", txid),
slog.String("status", string(status.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusRejected:
e.logger.Debug("transaction rejected by network",
slog.String("txid", txid),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusServiceError:
lastError = status
case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork,
models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable:
lastError = status
}
case <-submitCtx.Done():
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Warn("broadcast timeout",
for status := range resultCh {
if status == nil {
continue
}
switch status.Status {
case models.StatusAcceptedByNetwork, models.StatusSentToNetwork:
e.logger.Debug("broadcast complete",
slog.String("txid", txid),
slog.Int("endpoints", len(endpoints)),
slog.String("status", string(status.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusRejected:
e.logger.Debug("transaction rejected by network",
slog.String("txid", txid),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, status)
return status, nil
case models.StatusServiceError:
lastError = status
case models.StatusUnknown, models.StatusReceived, models.StatusSeenOnNetwork,
models.StatusDoubleSpendAttempted, models.StatusMined, models.StatusImmutable:
lastError = status
}
}

// All endpoints responded, none succeeded
if lastError == nil {
lastError, _ = e.store.GetStatus(ctx, txid)
}
e.logger.Debug("broadcast failed on all endpoints",
slog.String("txid", txid),
slog.String("status", string(lastError.Status)),
slog.Duration("elapsed", time.Since(broadcastStart)),
)
e.applyBroadcastResult(ctx, txid, lastError)
return lastError, nil
}

// SubmitTransactions submits multiple transactions for broadcast.
Expand Down Expand Up @@ -410,10 +391,6 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts
txInfos = append(txInfos, txInfo{tx: tx, rawTx: rawTx, txid: txid, isNew: isNew, status: existingStatus})
}

// Submit all to teranode synchronously with timeout
submitCtx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

var responses []*models.TransactionStatus
for _, info := range txInfos {
// Skip rebroadcast if already confirmed on network or rejected
Expand All @@ -433,22 +410,20 @@ func (e *Embedded) SubmitTransactions(ctx context.Context, rawTxs [][]byte, opts

endpoints := e.teranodeClient.GetEndpoints()
resultCh := make(chan *models.TransactionStatus, len(endpoints))
broadcastCtx, broadcastCancel := context.WithTimeout(context.WithoutCancel(ctx), 15*time.Second)
var wg sync.WaitGroup
for _, endpoint := range endpoints {
wg.Add(1)
go func(ep string) {
defer wg.Done()
status := e.submitToTeranodeSync(submitCtx, ep, rawTx, info.txid)
select {
case resultCh <- status:
case <-submitCtx.Done():
return
}
status := e.submitToTeranodeSync(broadcastCtx, ep, rawTx, info.txid)
resultCh <- status
}(endpoint)
}

go func() {
wg.Wait()
broadcastCancel()
close(resultCh)
}()

Expand Down Expand Up @@ -557,6 +532,15 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
if statusCode >= 400 && statusCode < 500 {
status = models.StatusRejected
}
// Map missing HTTP status to appropriate code
switch {
case statusCode != 0:
// pass through actual HTTP status
case ctx.Err() != nil:
statusCode = http.StatusGatewayTimeout
default:
statusCode = http.StatusServiceUnavailable
}
e.logger.Debug("endpoint broadcast failed",
slog.String("txid", txid),
slog.String("endpoint", endpoint),
Expand All @@ -565,10 +549,11 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
slog.String("error", err.Error()),
)
return &models.TransactionStatus{
TxID: txid,
Status: status,
Timestamp: time.Now(),
ExtraInfo: err.Error(),
TxID: txid,
Status: status,
StatusCode: statusCode,
Timestamp: time.Now(),
ExtraInfo: err.Error(),
}
}

Expand All @@ -588,8 +573,9 @@ func (e *Embedded) submitToTeranodeSync(ctx context.Context, endpoint string, ra
}

return &models.TransactionStatus{
TxID: txid,
Status: txStatus,
Timestamp: time.Now(),
TxID: txid,
Status: txStatus,
StatusCode: http.StatusOK,
Timestamp: time.Now(),
}
}
Loading