Skip to content

Commit 4ac2423

Browse files
authored
Add SSE support (#50)
* Add SSE support * Add cors * Fix linting
1 parent d0a3a39 commit 4ac2423

21 files changed

Lines changed: 1815 additions & 40 deletions

File tree

cmd/arcade/main.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"go.uber.org/zap/zapcore"
1515

1616
"github.com/bsv-blockchain/arcade/config"
17+
"github.com/bsv-blockchain/arcade/events"
1718
"github.com/bsv-blockchain/arcade/kafka"
1819
"github.com/bsv-blockchain/arcade/merkleservice"
1920
"github.com/bsv-blockchain/arcade/services"
@@ -22,6 +23,7 @@ import (
2223
"github.com/bsv-blockchain/arcade/services/p2p_client"
2324
"github.com/bsv-blockchain/arcade/services/propagation"
2425
"github.com/bsv-blockchain/arcade/services/tx_validator"
26+
"github.com/bsv-blockchain/arcade/services/webhook"
2527
"github.com/bsv-blockchain/arcade/store"
2628
storefactory "github.com/bsv-blockchain/arcade/store/factory"
2729
"github.com/bsv-blockchain/arcade/teranode"
@@ -129,7 +131,16 @@ func run(cmd *cobra.Command, _ []string) error {
129131

130132
txVal := validator.NewValidator(nil, nil) // Default policy, no chain tracker yet
131133

132-
svcs := buildServices(cfg, logger, producer, st, leaser, txTracker, teranodeClient, merkleClient, txVal)
134+
// One process-wide events.Publisher routes status updates from every
135+
// service that mutates state (validator, propagation, bump-builder,
136+
// api-server) onto a shared Kafka topic. The api-server SSE handler and
137+
// the webhook delivery service consume from that topic, so the
138+
// transaction-status fan-out works whether the deployment is monolithic
139+
// (mode=all) or split across pods.
140+
publisher := events.NewKafkaPublisher(producer, logger)
141+
defer func() { _ = publisher.Close() }()
142+
143+
svcs := buildServices(cfg, logger, producer, publisher, st, leaser, txTracker, teranodeClient, merkleClient, txVal)
133144

134145
ctx, cancel := context.WithCancel(context.Background())
135146
defer cancel()
@@ -188,6 +199,7 @@ func buildServices(
188199
cfg *config.Config,
189200
logger *zap.Logger,
190201
producer *kafka.Producer,
202+
publisher events.Publisher,
191203
st store.Store,
192204
leaser store.Leaser,
193205
txTracker *store.TxTracker,
@@ -202,16 +214,22 @@ func buildServices(
202214
}
203215

204216
if shouldRun("api-server") {
205-
svcs = append(svcs, api_server.New(cfg, logger, producer, st, txTracker, teranodeClient))
217+
svcs = append(svcs, api_server.New(cfg, logger, producer, publisher, st, txTracker, teranodeClient))
206218
}
207219
if shouldRun("bump-builder") {
208-
svcs = append(svcs, bump_builder.New(cfg, logger, producer, st, teranodeClient))
220+
svcs = append(svcs, bump_builder.New(cfg, logger, producer, publisher, st, teranodeClient))
209221
}
210222
if shouldRun("tx-validator") {
211-
svcs = append(svcs, tx_validator.New(cfg, logger, producer, st, txTracker, txVal))
223+
svcs = append(svcs, tx_validator.New(cfg, logger, producer, publisher, st, txTracker, txVal))
212224
}
213225
if shouldRun("propagation") {
214-
svcs = append(svcs, propagation.New(cfg, logger, producer, st, leaser, teranodeClient, merkleClient))
226+
svcs = append(svcs, propagation.New(cfg, logger, producer, publisher, st, leaser, teranodeClient, merkleClient))
227+
}
228+
// Webhook delivery runs alongside api-server by default (so a single-binary
229+
// deployment ships callbacks without extra config) but can be split into
230+
// its own pod by setting mode=webhook.
231+
if shouldRun("api-server") || shouldRun("webhook") {
232+
svcs = append(svcs, webhook.New(cfg.Webhook, logger, publisher, st))
215233
}
216234
// p2p_client is its own service; it's needed both by mode=propagation
217235
// (where it feeds the local teranode.Client directly) and by mode=p2p-client

config/config.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ type Config struct {
108108
Propagation PropagationConfig `mapstructure:"propagation"`
109109
TxValidator TxValidatorConfig `mapstructure:"tx_validator"`
110110
BumpBuilder BumpBuilderConfig `mapstructure:"bump_builder"`
111+
Webhook WebhookConfig `mapstructure:"webhook"`
111112
// ChaintracksServer gates whether the embedded go-chaintracks HTTP API
112113
// runs alongside api-server. Default is on so the refactor is a drop-in
113114
// replacement for the original single-binary arcade.
@@ -277,6 +278,29 @@ type BumpBuilderConfig struct {
277278
GraceWindowMs int `mapstructure:"grace_window_ms"`
278279
}
279280

281+
// WebhookConfig tunes the HTTP webhook delivery service. The service
282+
// subscribes to status updates and POSTs them to each submission's
283+
// CallbackURL; failures are retried with exponential backoff persisted via
284+
// the store's UpdateDeliveryStatus.
285+
type WebhookConfig struct {
286+
// MaxRetries caps how many times a failed POST is re-attempted before
287+
// the submission is given up on. Mirrors arc's default of 10.
288+
MaxRetries int `mapstructure:"max_retries"`
289+
// ExpirationMinutes bounds the total wall-clock lifetime of a webhook
290+
// delivery. Past this point the service stops retrying even if
291+
// MaxRetries hasn't been hit. Defaults to 24 hours.
292+
ExpirationMinutes int `mapstructure:"expiration_minutes"`
293+
// InitialBackoffMs is the first retry delay; subsequent retries double
294+
// it (capped). Defaults to 5s, matching arc.
295+
InitialBackoffMs int `mapstructure:"initial_backoff_ms"`
296+
// MaxBackoffMs caps how long backoff can grow between retries. Default
297+
// 5 minutes.
298+
MaxBackoffMs int `mapstructure:"max_backoff_ms"`
299+
// HTTPTimeoutMs caps how long a single POST attempt may run. Default
300+
// 10s — webhook receivers should ack fast or risk being timed out.
301+
HTTPTimeoutMs int `mapstructure:"http_timeout_ms"`
302+
}
303+
280304
// TxValidatorConfig tunes the parallel batch validation pipeline. Parallelism
281305
// caps how many transactions are parsed and validated concurrently inside a
282306
// single flush window — bounded so a huge in-flight batch can't open more
@@ -409,6 +433,12 @@ func setDefaults() {
409433
viper.SetDefault("p2p.enable_mdns", false)
410434
viper.SetDefault("p2p.allow_private_urls", false)
411435

436+
viper.SetDefault("webhook.max_retries", 10)
437+
viper.SetDefault("webhook.expiration_minutes", 60*24)
438+
viper.SetDefault("webhook.initial_backoff_ms", 5000)
439+
viper.SetDefault("webhook.max_backoff_ms", 300000)
440+
viper.SetDefault("webhook.http_timeout_ms", 10000)
441+
412442
viper.SetDefault("storage_path", "~/.arcade")
413443
viper.SetDefault("chaintracks_server.enabled", true)
414444
// Delegate chaintracks-library defaults (mode, network, bootstrap, p2p, …)

events/events.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Package events provides a publish/subscribe abstraction for transaction
2+
// status updates. Mutations to TransactionStatus that originate in the
3+
// validator, propagation, bump-builder, and api-server services flow through
4+
// a Publisher so the SSE handler and webhook delivery service can fan them
5+
// out to clients without depending on which pod the mutation came from.
6+
//
7+
// The default backend is Kafka — every Subscribe call mints a unique consumer
8+
// group so each subscriber receives every event (mirroring the in-memory
9+
// broadcast semantics of the old arcade's monolithic Publisher).
10+
package events
11+
12+
import (
13+
"context"
14+
15+
"github.com/bsv-blockchain/arcade/models"
16+
)
17+
18+
// Publisher is the contract every status-update fan-out backend implements.
19+
// Publish must be safe for concurrent use across goroutines. Subscribe
20+
// returns a channel the caller drains until ctx is canceled; on cancellation,
21+
// the channel is closed and any backend resources released.
22+
type Publisher interface {
23+
Publish(ctx context.Context, status *models.TransactionStatus) error
24+
Subscribe(ctx context.Context) (<-chan *models.TransactionStatus, error)
25+
Close() error
26+
}

events/kafka_publisher.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package events
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"encoding/hex"
7+
"encoding/json"
8+
"fmt"
9+
"sync"
10+
11+
"go.uber.org/zap"
12+
13+
"github.com/bsv-blockchain/arcade/kafka"
14+
"github.com/bsv-blockchain/arcade/models"
15+
)
16+
17+
// KafkaPublisher fans transaction status updates through a Kafka topic so
18+
// services running in different processes share a view of every status
19+
// change. Publish JSON-encodes the TransactionStatus and sends it under
20+
// kafka.TopicStatusUpdate keyed by txid (so updates for the same tx land on
21+
// the same partition in real Kafka). Subscribe spins up a dedicated
22+
// consumer group with a random ID — every caller gets every message,
23+
// regardless of how many other subscribers are running.
24+
type KafkaPublisher struct {
25+
producer *kafka.Producer
26+
logger *zap.Logger
27+
28+
mu sync.Mutex
29+
closed bool
30+
subs []*kafkaSubscription
31+
}
32+
33+
// NewKafkaPublisher wraps a kafka.Producer. The producer's underlying broker
34+
// is also used for Subscribe, so a single Publisher serves both publishing
35+
// and subscribing.
36+
func NewKafkaPublisher(producer *kafka.Producer, logger *zap.Logger) *KafkaPublisher {
37+
return &KafkaPublisher{
38+
producer: producer,
39+
logger: logger.Named("events.kafka"),
40+
}
41+
}
42+
43+
// Publish serializes status to JSON and sends it on TopicStatusUpdate. Errors
44+
// are returned to the caller; the call site decides whether to log-and-continue
45+
// (the default for status mutations) or propagate.
46+
//
47+
// The kafka.Producer.Send signature does not take a context — the underlying
48+
// broker uses an internal background context for at-most-once produce; we
49+
// honor cancellation by short-circuiting before the call.
50+
func (p *KafkaPublisher) Publish(ctx context.Context, status *models.TransactionStatus) error {
51+
if status == nil {
52+
return fmt.Errorf("nil status")
53+
}
54+
if err := ctx.Err(); err != nil {
55+
return err
56+
}
57+
return p.producer.Send(kafka.TopicStatusUpdate, status.TxID, status) //nolint:contextcheck // kafka.Producer.Send doesn't take a context; ctx already checked above
58+
}
59+
60+
// Subscribe joins a fresh consumer group on TopicStatusUpdate and returns a
61+
// channel that yields decoded TransactionStatus values until ctx is canceled.
62+
// The unique groupID guarantees this subscriber sees every message — useful
63+
// when multiple subscribers (SSE manager + webhook service) coexist in the
64+
// same process or across pods.
65+
func (p *KafkaPublisher) Subscribe(ctx context.Context) (<-chan *models.TransactionStatus, error) {
66+
p.mu.Lock()
67+
if p.closed {
68+
p.mu.Unlock()
69+
return nil, fmt.Errorf("publisher closed")
70+
}
71+
p.mu.Unlock()
72+
73+
groupID, err := uniqueGroupID()
74+
if err != nil {
75+
return nil, fmt.Errorf("generating group id: %w", err)
76+
}
77+
78+
out := make(chan *models.TransactionStatus, 256)
79+
80+
cg, err := kafka.NewConsumerGroup(kafka.ConsumerConfig{
81+
Broker: p.producer.Broker(),
82+
GroupID: groupID,
83+
Topics: []string{kafka.TopicStatusUpdate},
84+
Handler: func(ctx context.Context, msg *kafka.Message) error {
85+
var status models.TransactionStatus
86+
if jsonErr := json.Unmarshal(msg.Value, &status); jsonErr != nil {
87+
p.logger.Warn("dropping malformed status update", zap.Error(jsonErr))
88+
return nil
89+
}
90+
select {
91+
case out <- &status:
92+
case <-ctx.Done():
93+
return nil
94+
default:
95+
// Slow consumer — drop rather than block the broker. Matches the
96+
// old arcade's non-blocking fan-out semantics; SSE clients
97+
// recover via Last-Event-ID catchup on reconnect.
98+
p.logger.Warn("subscriber channel full, dropping update",
99+
zap.String("txid", status.TxID),
100+
zap.String("status", string(status.Status)),
101+
)
102+
}
103+
return nil
104+
},
105+
Producer: p.producer,
106+
Logger: p.logger,
107+
})
108+
if err != nil {
109+
return nil, fmt.Errorf("creating consumer group: %w", err)
110+
}
111+
112+
sub := &kafkaSubscription{cg: cg, out: out}
113+
114+
p.mu.Lock()
115+
p.subs = append(p.subs, sub)
116+
p.mu.Unlock()
117+
118+
go func() {
119+
// Run blocks until ctx is canceled or the broker closes.
120+
if err := cg.Run(ctx); err != nil {
121+
p.logger.Warn("consumer group exited with error", zap.Error(err))
122+
}
123+
_ = cg.Close()
124+
close(out)
125+
}()
126+
127+
return out, nil
128+
}
129+
130+
// Close stops the publisher. Existing subscriptions are released through
131+
// their context cancellation; Close does not close subscriber channels
132+
// directly because the consumer goroutine owns the close.
133+
func (p *KafkaPublisher) Close() error {
134+
p.mu.Lock()
135+
defer p.mu.Unlock()
136+
if p.closed {
137+
return nil
138+
}
139+
p.closed = true
140+
for _, s := range p.subs {
141+
_ = s.cg.Close()
142+
}
143+
p.subs = nil
144+
return nil
145+
}
146+
147+
type kafkaSubscription struct {
148+
cg *kafka.ConsumerGroup
149+
out chan *models.TransactionStatus
150+
}
151+
152+
// uniqueGroupID returns a per-call group identifier. Used so each Subscribe
153+
// gets its own consumer group, which in turn guarantees every subscriber
154+
// sees every message (the broker fans out across distinct groups).
155+
func uniqueGroupID() (string, error) {
156+
var b [12]byte
157+
if _, err := rand.Read(b[:]); err != nil {
158+
return "", err
159+
}
160+
return "arcade-events-" + hex.EncodeToString(b[:]), nil
161+
}

0 commit comments

Comments
 (0)