Skip to content

Commit a31ed41

Browse files
authored
Merge pull request #60 from csfloat/fix/leader-election
Fix: Hold Leader Election
2 parents d3219c1 + 94cd342 commit a31ed41

10 files changed

Lines changed: 503 additions & 33 deletions

File tree

config/config.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ func load() Config {
110110
panic("csfloat ingestor configuration is required when enabled")
111111
}
112112
}
113-
114113
return cfg
115114
}
116115

domain/ingestors/ingestors.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package ingestors
2+
3+
type Ingestor interface {
4+
Start()
5+
Stop()
6+
}
7+
8+
type Manager interface {
9+
StartIngestors()
10+
StopIngestors()
11+
}
12+
13+
type IngestorType string
14+
15+
const (
16+
IngestorTypeCSFloat IngestorType = "csfloat"
17+
)

domain/leader/leader.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package leader
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type Elector interface {
9+
Run(ctx context.Context, lockKey uint32, period time.Duration, onWork func(ctx context.Context))
10+
}

domain/repository/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ type Factory interface {
3030

3131
NewPrivateTransaction() PrivateTransaction
3232
RunInTransactionPrivate(fn func(PrivateTransaction) error) error
33+
PrivateDB() *gorm.DB
3334

3435
NewPublicTransaction() PublicTransaction
3536
RunInTransactionPublic(fn func(PublicTransaction) error) error
37+
PublicDB() *gorm.DB
3638
}

ingestors/csfloat/csfloat.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"hash/fnv"
78
"net/http"
89
"time"
910

1011
"reverse-watch/config"
1112
"reverse-watch/domain/dto"
13+
"reverse-watch/domain/ingestors"
14+
"reverse-watch/domain/leader"
1215
"reverse-watch/domain/models"
1316
"reverse-watch/domain/repository"
1417
"reverse-watch/errors"
@@ -21,18 +24,22 @@ type csfloatIngestor struct {
2124
log *zap.SugaredLogger
2225
cfg *config.Config
2326
factory repository.Factory
27+
elector leader.Elector
2428

2529
ctx context.Context
2630
cancel context.CancelFunc
2731
stopped chan struct{}
2832
}
2933

30-
func NewCSFloatIngestor(ctx context.Context, factory repository.Factory, cfg *config.Config, logger *zap.SugaredLogger) *csfloatIngestor {
34+
var _ ingestors.Ingestor = (*csfloatIngestor)(nil)
35+
36+
func NewCSFloatIngestor(ctx context.Context, factory repository.Factory, elector leader.Elector, cfg *config.Config, log *zap.SugaredLogger) ingestors.Ingestor {
3137
ingestorCtx, cancel := context.WithCancel(ctx)
3238
return &csfloatIngestor{
33-
log: logger,
39+
log: log,
3440
cfg: cfg,
3541
factory: factory,
42+
elector: elector,
3643
ctx: ingestorCtx,
3744
cancel: cancel,
3845
stopped: make(chan struct{}),
@@ -56,7 +63,7 @@ type errorResponse struct {
5663
}
5764

5865
// fetch reversal warnings from CSFloat
59-
func (i *csfloatIngestor) fetch(cursor *uint) ([]*slimWarning, *uint, error) {
66+
func (i *csfloatIngestor) fetch(ctx context.Context, cursor *uint) ([]*slimWarning, *uint, error) {
6067
// The day before Valve introduced trade reversals
6168
startTime := time.Date(2025, 7, 14, 0, 0, 0, 0, time.UTC)
6269
endTime := time.Now().Add(-5 * time.Minute)
@@ -71,7 +78,7 @@ func (i *csfloatIngestor) fetch(cursor *uint) ([]*slimWarning, *uint, error) {
7178
return nil, nil, err
7279
}
7380
r.Header.Set("X-Secret-Key", i.cfg.Ingestors.CSFloat.SecretKey)
74-
r = r.WithContext(i.ctx)
81+
r = r.WithContext(ctx)
7582

7683
client := &http.Client{}
7784
resp, err := client.Do(r)
@@ -120,7 +127,7 @@ func (i *csfloatIngestor) process(warnings []*slimWarning) error {
120127
return nil
121128
}
122129

123-
func (i *csfloatIngestor) sync() error {
130+
func (i *csfloatIngestor) sync(ctx context.Context) error {
124131
// Fetch the most recent reversals created by CSFloat
125132
reversals, err := i.factory.Reversal().List(&dto.ReversalListOptions{
126133
MarketplaceSlug: util.Ptr("csfloat"),
@@ -140,7 +147,13 @@ func (i *csfloatIngestor) sync() error {
140147
}
141148

142149
for {
143-
warnings, nextCursor, err := i.fetch(cursor)
150+
select {
151+
case <-ctx.Done():
152+
return ctx.Err()
153+
default:
154+
}
155+
156+
warnings, nextCursor, err := i.fetch(ctx, cursor)
144157
if err != nil {
145158
return fmt.Errorf("failed to fetch warnings with cursor %v: %v", cursor, err)
146159
}
@@ -161,26 +174,24 @@ func (i *csfloatIngestor) sync() error {
161174
}
162175

163176
func (i *csfloatIngestor) Start() {
164-
i.log.Info("Starting CSFloat ingestor")
177+
i.log.Info("starting csfloat ingestor")
165178
go func() {
166179
defer close(i.stopped)
167180

168-
for {
169-
if err := i.sync(); err != nil {
170-
i.log.Errorf("failed to sync reversals: %v", err)
171-
}
181+
h := fnv.New32a()
182+
h.Write([]byte("csfloat_ingestor_leader"))
183+
lockKey := h.Sum32()
172184

173-
select {
174-
case <-time.After(30 * time.Minute):
175-
case <-i.ctx.Done():
176-
return
185+
i.elector.Run(i.ctx, lockKey, 30*time.Minute, func(ctx context.Context) {
186+
if err := i.sync(ctx); err != nil {
187+
i.log.Errorf("failed to sync reversals: %v", err)
177188
}
178-
}
189+
})
179190
}()
180191
}
181192

182193
func (i *csfloatIngestor) Stop() {
183-
i.log.Infof("Stopping CSFloat ingestor")
194+
i.log.Infof("stopping csfloat ingestor")
184195
i.cancel()
185196
<-i.stopped
186197
}

ingestors/manager.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,36 @@ import (
44
"context"
55

66
"reverse-watch/config"
7+
"reverse-watch/domain/ingestors"
78
"reverse-watch/domain/repository"
89
"reverse-watch/ingestors/csfloat"
10+
"reverse-watch/leader"
911

1012
"go.uber.org/zap"
1113
)
1214

13-
type ingestor interface {
14-
Start()
15-
Stop()
16-
}
17-
1815
type manager struct {
1916
log *zap.SugaredLogger
20-
ingestors []ingestor
17+
ingestors map[ingestors.IngestorType]ingestors.Ingestor
2118

2219
ctx context.Context
2320
cancel context.CancelFunc
2421
}
2522

26-
func New(factory repository.Factory, cfg *config.Config, logger *zap.SugaredLogger) *manager {
23+
var _ ingestors.Manager = (*manager)(nil)
24+
25+
func New(factory repository.Factory, cfg *config.Config, log *zap.SugaredLogger) ingestors.Manager {
2726
ctx, cancel := context.WithCancel(context.Background())
2827
m := &manager{
29-
log: logger,
30-
ingestors: make([]ingestor, 0),
28+
log: log,
29+
ingestors: make(map[ingestors.IngestorType]ingestors.Ingestor),
3130
ctx: ctx,
3231
cancel: cancel,
3332
}
3433

34+
elector := leader.New(factory, log)
3535
if cfg.Ingestors.CSFloat.Enable {
36-
m.ingestors = append(m.ingestors, csfloat.NewCSFloatIngestor(ctx, factory, cfg, logger))
36+
m.ingestors[ingestors.IngestorTypeCSFloat] = csfloat.NewCSFloatIngestor(ctx, factory, elector, cfg, log)
3737
}
3838
return m
3939
}
@@ -44,7 +44,7 @@ func (m *manager) StartIngestors() {
4444
}
4545
}
4646

47-
func (m *manager) Stop() {
47+
func (m *manager) StopIngestors() {
4848
m.cancel()
4949
for _, ingestor := range m.ingestors {
5050
ingestor.Stop()

leader/leader.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package leader
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"time"
7+
8+
"reverse-watch/domain/leader"
9+
"reverse-watch/domain/repository"
10+
11+
"go.uber.org/zap"
12+
)
13+
14+
type elector struct {
15+
log *zap.SugaredLogger
16+
factory repository.Factory
17+
}
18+
19+
var _ leader.Elector = (*elector)(nil)
20+
21+
func New(factory repository.Factory, log *zap.SugaredLogger) leader.Elector {
22+
return &elector{
23+
log: log,
24+
factory: factory,
25+
}
26+
}
27+
28+
func (e *elector) tryAdvisoryLock(ctx context.Context, lockKey uint32) (*sql.Tx, bool) {
29+
db, err := e.factory.PublicDB().DB()
30+
if err != nil {
31+
e.log.Errorf("failed to get database connection: %v", err)
32+
return nil, false
33+
}
34+
35+
// Use background context so transaction isn't auto-cancelled.
36+
// Very unlikely, but we don't want another instance to acquire the lock and begin work before we have exited.
37+
tx, err := db.BeginTx(context.Background(), nil)
38+
if err != nil {
39+
e.log.Errorf("failed to begin transaction: %v", err)
40+
return nil, false
41+
}
42+
43+
var hasLock bool
44+
err = tx.QueryRowContext(ctx, "SELECT pg_try_advisory_xact_lock($1)", lockKey).Scan(&hasLock)
45+
if err != nil {
46+
tx.Rollback()
47+
e.log.Errorf("failed to acquire advisory lock: %v", err)
48+
return nil, false
49+
}
50+
51+
if !hasLock {
52+
tx.Rollback()
53+
e.log.Errorf("failed to acquire advisory lock")
54+
return nil, false
55+
}
56+
return tx, true
57+
}
58+
59+
// Run will run the elector in a loop, acquiring the leader lock and calling the onWork function when the leader is acquired.
60+
// If the leader lock is not acquired, it will wait for the next period and try again.
61+
// If the context is done, it will return.
62+
func (e *elector) Run(ctx context.Context, lockKey uint32, period time.Duration, onWork func(ctx context.Context)) {
63+
for {
64+
select {
65+
case <-ctx.Done():
66+
return
67+
default:
68+
}
69+
70+
func() {
71+
workCtx, workCancel := context.WithCancel(ctx)
72+
defer workCancel()
73+
defer waitUntilNextBoundary(ctx, period)
74+
tx, acquired := e.tryAdvisoryLock(workCtx, lockKey)
75+
if !acquired {
76+
return
77+
}
78+
defer tx.Rollback()
79+
80+
onWork(workCtx)
81+
}()
82+
83+
}
84+
}
85+
86+
func waitUntilNextBoundary(ctx context.Context, period time.Duration) {
87+
nextBoundary := time.Now().Truncate(period).Add(period)
88+
select {
89+
case <-ctx.Done():
90+
return
91+
case <-time.After(time.Until(nextBoundary)):
92+
}
93+
}

0 commit comments

Comments
 (0)