Skip to content

Commit 3dbfea9

Browse files
provider: default options (#1153)
* refactor(provider): default options * refactor config validation * connectivity options * buffered lints * dual provider options cleanup
1 parent 6f794d8 commit 3dbfea9

8 files changed

Lines changed: 114 additions & 108 deletions

File tree

provider/buffered/provider.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const (
2929

3030
var _ internal.Provider = (*SweepingProvider)(nil)
3131

32-
// buffered.SweepingProvider is a wrapper around a SweepingProvider buffering
32+
// SweepingProvider (buffered) is a wrapper around a SweepingProvider buffering
3333
// requests, to allow core operations to return instantly. Operations are
3434
// queued and processed asynchronously in batches for improved performance.
3535
type SweepingProvider struct {

provider/buffered/provider_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func TestQueueingMechanism(t *testing.T) {
129129
}
130130

131131
// Wait for operations to be processed by expecting 4 signals
132-
for i := 0; i < 4; i++ {
132+
for i := range 4 {
133133
select {
134134
case <-fake.processed:
135135
case <-time.After(time.Second):

provider/dual/options.go

Lines changed: 26 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"time"
77

8-
ds "github.com/ipfs/go-datastore"
98
"github.com/libp2p/go-libp2p-kad-dht/amino"
109
"github.com/libp2p/go-libp2p-kad-dht/dual"
1110
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
@@ -38,53 +37,44 @@ type config struct {
3837

3938
type Option func(opt *config) error
4039

41-
func (cfg *config) apply(opts ...Option) error {
42-
for i, o := range opts {
43-
if err := o(cfg); err != nil {
44-
return fmt.Errorf("dual dht provider option %d failed: %w", i, err)
40+
// getOpts creates a config and applies Options to it.
41+
func getOpts(opts []Option, d *dual.DHT) (config, error) {
42+
cfg := config{
43+
reprovideInterval: [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval},
44+
maxReprovideDelay: [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay},
45+
46+
offlineDelay: [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay},
47+
connectivityCheckOnlineInterval: [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval},
48+
49+
maxWorkers: [2]int{4, 4},
50+
dedicatedPeriodicWorkers: [2]int{2, 2},
51+
dedicatedBurstWorkers: [2]int{1, 1},
52+
maxProvideConnsPerWorker: [2]int{20, 20},
53+
}
54+
55+
// Apply options
56+
for i, opt := range opts {
57+
if err := opt(&cfg); err != nil {
58+
return config{}, fmt.Errorf("dual dht provider option %d failed: %w", i, err)
4559
}
4660
}
47-
return nil
48-
}
4961

50-
func (cfg *config) resolveDefaults(d *dual.DHT) {
62+
// Resolve defaults
5163
if cfg.msgSenders[lanID] == nil {
5264
cfg.msgSenders[lanID] = d.LAN.MessageSender()
5365
}
5466
if cfg.msgSenders[wanID] == nil {
5567
cfg.msgSenders[wanID] = d.WAN.MessageSender()
5668
}
57-
}
5869

59-
func (c *config) validate() error {
60-
if c.dedicatedPeriodicWorkers[lanID]+c.dedicatedBurstWorkers[lanID] > c.maxWorkers[lanID] {
61-
return errors.New("provider config: total dedicated workers exceed max workers")
70+
// Validate config
71+
if cfg.dedicatedPeriodicWorkers[lanID]+cfg.dedicatedBurstWorkers[lanID] > cfg.maxWorkers[lanID] {
72+
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
6273
}
63-
if c.dedicatedPeriodicWorkers[wanID]+c.dedicatedBurstWorkers[wanID] > c.maxWorkers[wanID] {
64-
return errors.New("provider config: total dedicated workers exceed max workers")
74+
if cfg.dedicatedPeriodicWorkers[wanID]+cfg.dedicatedBurstWorkers[wanID] > cfg.maxWorkers[wanID] {
75+
return config{}, errors.New("provider config: total dedicated workers exceed max workers")
6576
}
66-
return nil
67-
}
68-
69-
var DefaultConfig = func(cfg *config) error {
70-
var err error
71-
cfg.keystore, err = keystore.NewKeystore(ds.NewMapDatastore())
72-
if err != nil {
73-
return err
74-
}
75-
76-
cfg.reprovideInterval = [2]time.Duration{amino.DefaultReprovideInterval, amino.DefaultReprovideInterval}
77-
cfg.maxReprovideDelay = [2]time.Duration{provider.DefaultMaxReprovideDelay, provider.DefaultMaxReprovideDelay}
78-
79-
cfg.offlineDelay = [2]time.Duration{provider.DefaultOfflineDelay, provider.DefaultOfflineDelay}
80-
cfg.connectivityCheckOnlineInterval = [2]time.Duration{provider.DefaultConnectivityCheckOnlineInterval, provider.DefaultConnectivityCheckOnlineInterval}
81-
82-
cfg.maxWorkers = [2]int{4, 4}
83-
cfg.dedicatedPeriodicWorkers = [2]int{2, 2}
84-
cfg.dedicatedBurstWorkers = [2]int{1, 1}
85-
cfg.maxProvideConnsPerWorker = [2]int{20, 20}
86-
87-
return nil
77+
return cfg, nil
8878
}
8979

9080
func WithKeystore(ks keystore.Keystore) Option {

provider/dual/provider.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77

88
"github.com/ipfs/go-cid"
9+
"github.com/ipfs/go-datastore"
910
dht "github.com/libp2p/go-libp2p-kad-dht"
1011
"github.com/libp2p/go-libp2p-kad-dht/dual"
1112
"github.com/libp2p/go-libp2p-kad-dht/provider"
@@ -23,6 +24,8 @@ type SweepingProvider struct {
2324
LAN *provider.SweepingProvider
2425
WAN *provider.SweepingProvider
2526
keystore keystore.Keystore
27+
28+
cleanupFuncs []func() error
2629
}
2730

2831
// New creates a new SweepingProvider that manages provides and reprovides for
@@ -32,15 +35,19 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
3235
return nil, errors.New("cannot create sweeping provider for nil dual DHT")
3336
}
3437

35-
var cfg config
36-
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
38+
cfg, err := getOpts(opts, d)
3739
if err != nil {
3840
return nil, err
3941
}
40-
cfg.resolveDefaults(d)
41-
err = cfg.validate()
42-
if err != nil {
43-
return nil, err
42+
var cleanupFuncs []func() error
43+
if cfg.keystore == nil {
44+
ds := datastore.NewMapDatastore()
45+
cfg.keystore, err = keystore.NewKeystore(ds)
46+
if err != nil {
47+
ds.Close()
48+
return nil, fmt.Errorf("couldn't create a keystore: %w", err)
49+
}
50+
cleanupFuncs = []func() error{ds.Close, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) }}
4451
}
4552

4653
sweepingProviders := make([]*provider.SweepingProvider, 2)
@@ -74,10 +81,11 @@ func New(d *dual.DHT, opts ...Option) (*SweepingProvider, error) {
7481
}
7582

7683
return &SweepingProvider{
77-
dht: d,
78-
LAN: sweepingProviders[0],
79-
WAN: sweepingProviders[1],
80-
keystore: cfg.keystore,
84+
dht: d,
85+
LAN: sweepingProviders[0],
86+
WAN: sweepingProviders[1],
87+
keystore: cfg.keystore,
88+
cleanupFuncs: cleanupFuncs,
8189
}, nil
8290
}
8391

@@ -102,9 +110,25 @@ func (s *SweepingProvider) runOnBoth(f func(*provider.SweepingProvider) error) e
102110

103111
// Close stops both DHT providers and releases associated resources.
104112
func (s *SweepingProvider) Close() error {
105-
return s.runOnBoth(func(p *provider.SweepingProvider) error {
113+
err := s.runOnBoth(func(p *provider.SweepingProvider) error {
106114
return p.Close()
107115
})
116+
117+
if s.cleanupFuncs != nil {
118+
// Cleanup keystore and datastore if we created them
119+
var errs []error
120+
for i := len(s.cleanupFuncs) - 1; i >= 0; i-- { // LIFO: last-added is cleaned up first
121+
if f := s.cleanupFuncs[i]; f != nil {
122+
if err := f(); err != nil {
123+
errs = append(errs, err)
124+
}
125+
}
126+
}
127+
if len(errs) > 0 {
128+
err = errors.Join(append(errs, err)...)
129+
}
130+
}
131+
return err
108132
}
109133

110134
// ProvideOnce sends provider records for the specified keys to both DHT swarms

provider/internal/connectivity/connectivity.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ type ConnectivityChecker struct {
5353

5454
// New creates a new ConnectivityChecker instance.
5555
func New(checkFunc func() bool, opts ...Option) (*ConnectivityChecker, error) {
56-
var cfg config
57-
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
56+
cfg, err := getOpts(opts)
5857
if err != nil {
5958
return nil, err
6059
}

provider/internal/connectivity/options.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,21 @@ type config struct {
1414
onOnline func()
1515
}
1616

17-
func (cfg *config) apply(opts ...Option) error {
18-
for i, o := range opts {
19-
if err := o(cfg); err != nil {
20-
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
21-
}
22-
}
23-
return nil
24-
}
25-
2617
type Option func(opt *config) error
2718

28-
var DefaultConfig = func(cfg *config) error {
29-
cfg.onlineCheckInterval = 1 * time.Minute
30-
cfg.offlineDelay = 2 * time.Hour
31-
return nil
19+
// getOpts creates a config and applies Options to it.
20+
func getOpts(opts []Option) (config, error) {
21+
cfg := config{
22+
onlineCheckInterval: 1 * time.Minute,
23+
offlineDelay: 2 * time.Hour,
24+
}
25+
26+
for i, opt := range opts {
27+
if err := opt(&cfg); err != nil {
28+
return config{}, fmt.Errorf("connectivity option %d error: %s", i, err)
29+
}
30+
}
31+
return cfg, nil
3232
}
3333

3434
// WithOnlineCheckInterval sets the minimum interval between online checks.

provider/options.go

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,51 +53,49 @@ type config struct {
5353
maxProvideConnsPerWorker int
5454
}
5555

56-
func (cfg *config) apply(opts ...Option) error {
57-
for i, o := range opts {
58-
if err := o(cfg); err != nil {
59-
return fmt.Errorf("reprovider dht option %d failed: %w", i, err)
56+
type Option func(opt *config) error
57+
58+
// getOpts creates a config and applies Options to it.
59+
func getOpts(opts []Option) (config, error) {
60+
cfg := config{
61+
replicationFactor: amino.DefaultBucketSize,
62+
reprovideInterval: amino.DefaultReprovideInterval,
63+
maxReprovideDelay: DefaultMaxReprovideDelay,
64+
offlineDelay: DefaultOfflineDelay,
65+
connectivityCheckOnlineInterval: DefaultConnectivityCheckOnlineInterval,
66+
67+
maxWorkers: 4,
68+
dedicatedPeriodicWorkers: 2,
69+
dedicatedBurstWorkers: 1,
70+
maxProvideConnsPerWorker: 20,
71+
72+
addLocalRecord: func(mh mh.Multihash) error { return nil },
73+
}
74+
75+
// Apply options
76+
for i, opt := range opts {
77+
if err := opt(&cfg); err != nil {
78+
return config{}, fmt.Errorf("reprovider dht option %d error: %s", i, err)
6079
}
6180
}
62-
return nil
63-
}
6481

65-
func (c *config) validate() error {
66-
if len(c.peerid) == 0 {
67-
return errors.New("reprovider config: peer id is required")
82+
// Validate config
83+
if len(cfg.peerid) == 0 {
84+
return config{}, errors.New("reprovider config: peer id is required")
6885
}
69-
if c.router == nil {
70-
return errors.New("reprovider config: router is required")
86+
if cfg.router == nil {
87+
return config{}, errors.New("reprovider config: router is required")
7188
}
72-
if c.msgSender == nil {
73-
return errors.New("reprovider config: message sender is required")
89+
if cfg.msgSender == nil {
90+
return config{}, errors.New("reprovider config: message sender is required")
7491
}
75-
if c.selfAddrs == nil {
76-
return errors.New("reprovider config: self addrs func is required")
92+
if cfg.selfAddrs == nil {
93+
return config{}, errors.New("reprovider config: self addrs func is required")
7794
}
78-
if c.dedicatedPeriodicWorkers+c.dedicatedBurstWorkers > c.maxWorkers {
79-
return errors.New("reprovider config: total dedicated workers exceed max workers")
95+
if cfg.dedicatedPeriodicWorkers+cfg.dedicatedBurstWorkers > cfg.maxWorkers {
96+
return config{}, errors.New("reprovider config: total dedicated workers exceed max workers")
8097
}
81-
return nil
82-
}
83-
84-
type Option func(opt *config) error
85-
86-
var DefaultConfig = func(cfg *config) error {
87-
cfg.replicationFactor = amino.DefaultBucketSize
88-
cfg.reprovideInterval = amino.DefaultReprovideInterval
89-
cfg.maxReprovideDelay = DefaultMaxReprovideDelay
90-
cfg.offlineDelay = DefaultOfflineDelay
91-
cfg.connectivityCheckOnlineInterval = DefaultConnectivityCheckOnlineInterval
92-
93-
cfg.maxWorkers = 4
94-
cfg.dedicatedPeriodicWorkers = 2
95-
cfg.dedicatedBurstWorkers = 1
96-
cfg.maxProvideConnsPerWorker = 20
97-
98-
cfg.addLocalRecord = func(mh mh.Multihash) error { return nil }
99-
100-
return nil
98+
return cfg, nil
10199
}
102100

103101
// WithReplicationFactor sets the replication factor for provider records. It

provider/provider.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ type SweepingProvider struct {
160160

161161
// New creates a new SweepingProvider instance with the supplied options.
162162
func New(opts ...Option) (*SweepingProvider, error) {
163-
var cfg config
164-
err := cfg.apply(append([]Option{DefaultConfig}, opts...)...)
163+
cfg, err := getOpts(opts)
165164
if err != nil {
166165
return nil, err
167166
}
@@ -176,11 +175,7 @@ func New(opts ...Option) (*SweepingProvider, error) {
176175
cleanup(cleanupFuncs)
177176
return nil, err
178177
}
179-
}
180-
cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close)
181-
if err := cfg.validate(); err != nil {
182-
cleanup(cleanupFuncs)
183-
return nil, err
178+
cleanupFuncs = append(cleanupFuncs, cfg.keystore.Close, func() error { return cfg.keystore.Empty(context.Background()) })
184179
}
185180
meter := otel.Meter("github.com/libp2p/go-libp2p-kad-dht/provider")
186181
providerCounter, err := meter.Int64Counter(

0 commit comments

Comments
 (0)