Skip to content

Commit e44058a

Browse files
authored
Merge pull request #91 from hyperledger/sync-init
Provide synchronous init option for FS wallet
2 parents f100c35 + 48a5c9e commit e44058a

7 files changed

Lines changed: 119 additions & 21 deletions

File tree

config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ nav_order: 2
5454
|count|The maximum number of times to retry|`int`|`5`
5555
|enabled|Enables retries|`boolean`|`false`
5656
|errorStatusCodeRegex|The regex that the error response status code must match to trigger retry|`string`|`<nil>`
57+
|factor|The retry backoff factor|`float32`|`2`
5758
|initWaitTime|The initial retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`250ms`
5859
|maxWaitTime|The maximum retry delay|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
5960

@@ -83,6 +84,7 @@ nav_order: 2
8384

8485
|Key|Description|Type|Default Value|
8586
|---|-----------|----|-------------|
87+
|backgroundConnect|When true the connection is established in the background with infinite reconnect (makes initialConnectAttempts redundant when set)|`boolean`|`false`
8688
|connectionTimeout|The amount of time to wait while establishing a connection (or auto-reconnection)|[`time.Duration`](https://pkg.go.dev/time#Duration)|`45s`
8789
|heartbeatInterval|The amount of time to wait between heartbeat signals on the WebSocket connection|[`time.Duration`](https://pkg.go.dev/time#Duration)|`30s`
8890
|initialConnectAttempts|The number of attempts FireFly will make to connect to the WebSocket when starting up, before failing|`int`|`5`

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/fsnotify/fsnotify v1.7.0
88
github.com/go-resty/resty/v2 v2.11.0
99
github.com/gorilla/mux v1.8.1
10-
github.com/hyperledger/firefly-common v1.5.1
10+
github.com/hyperledger/firefly-common v1.5.5
1111
github.com/karlseguin/ccache v2.0.3+incompatible
1212
github.com/pelletier/go-toml v1.9.5
1313
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
4646
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
4747
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
4848
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
49-
github.com/hyperledger/firefly-common v1.5.1 h1:/DREi1ye1HfYr3GDLBhXuugeMzT4zg9EN2uTYFlVY6M=
50-
github.com/hyperledger/firefly-common v1.5.1/go.mod h1:1Xawm5PUhxT7k+CL/Kr3i1LE3cTTzoQwZMLimvlW8rs=
49+
github.com/hyperledger/firefly-common v1.5.5 h1:UaANgBIT0aBvAk0Yt+Qrn6qXxpwNIrFfwnW3EBivrQs=
50+
github.com/hyperledger/firefly-common v1.5.5/go.mod h1:1Xawm5PUhxT7k+CL/Kr3i1LE3cTTzoQwZMLimvlW8rs=
5151
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
5252
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
5353
github.com/jarcoal/httpmock v1.2.0 h1:gSvTxxFR/MEMfsGrvRbdfpRUMBStovlSRLw0Ep1bwwc=

pkg/fswallet/fslistener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (w *fsWallet) fsListenerLoop(ctx context.Context, done func(), events chan
6060
log.L(ctx).Tracef("FSEvent [%s]: %s", event.Op, event.Name)
6161
fi, err := os.Stat(event.Name)
6262
if err == nil {
63-
w.notifyNewFiles(ctx, fi)
63+
_ = w.notifyNewFiles(ctx, fi)
6464
}
6565
}
6666
case err, ok := <-errors:

pkg/fswallet/fslistener_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package fswallet
1919
import (
2020
"context"
2121
"fmt"
22-
"io/ioutil"
2322
"os"
2423
"path"
2524
"testing"
@@ -65,16 +64,16 @@ func TestFileListener(t *testing.T) {
6564
listener2 := make(chan ethtypes.Address0xHex, 1)
6665
f.AddListener(listener2)
6766

68-
testPWFIle, err := ioutil.ReadFile("../../test/keystore_toml/1f185718734552d08278aa70f804580bab5fd2b4.pwd")
67+
testPWFIle, err := os.ReadFile("../../test/keystore_toml/1f185718734552d08278aa70f804580bab5fd2b4.pwd")
6968
assert.NoError(t, err)
7069

71-
err = ioutil.WriteFile(path.Join(f.conf.Path, "1f185718734552d08278aa70f804580bab5fd2b4.pwd"), testPWFIle, 0644)
70+
err = os.WriteFile(path.Join(f.conf.Path, "1f185718734552d08278aa70f804580bab5fd2b4.pwd"), testPWFIle, 0644)
7271
assert.NoError(t, err)
7372

74-
testKeyFIle, err := ioutil.ReadFile("../../test/keystore_toml/1f185718734552d08278aa70f804580bab5fd2b4.key.json")
73+
testKeyFIle, err := os.ReadFile("../../test/keystore_toml/1f185718734552d08278aa70f804580bab5fd2b4.key.json")
7574
assert.NoError(t, err)
7675

77-
err = ioutil.WriteFile(path.Join(f.conf.Path, "1f185718734552d08278aa70f804580bab5fd2b4.key.json"), testKeyFIle, 0644)
76+
err = os.WriteFile(path.Join(f.conf.Path, "1f185718734552d08278aa70f804580bab5fd2b4.key.json"), testKeyFIle, 0644)
7877
assert.NoError(t, err)
7978

8079
newAddr1 := <-listener1

pkg/fswallet/fswallet.go

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,15 @@ import (
4242
"gopkg.in/yaml.v2"
4343
)
4444

45+
type SyncAddressCallback func(context.Context, ethtypes.Address0xHex) error
46+
4547
// Wallet is a directory containing a set of KeystoreV3 files, conforming
4648
// to the ethsigner.Wallet interface and providing notifications when new
4749
// keys are added to the wallet (via FS listener).
4850
type Wallet interface {
4951
ethsigner.WalletTypedData
5052
GetWalletFile(ctx context.Context, addr ethtypes.Address0xHex) (keystorev3.WalletFile, error)
53+
SetSyncAddressCallback(SyncAddressCallback)
5154
AddListener(listener chan<- ethtypes.Address0xHex)
5255
}
5356

@@ -99,6 +102,7 @@ type fsWallet struct {
99102
metadataKeyFileProperty *template.Template
100103
metadataPasswordFileProperty *template.Template
101104
primaryMatchRegex *regexp.Regexp
105+
syncAddressCallback SyncAddressCallback
102106

103107
mux sync.Mutex
104108
addressToFileMap map[ethtypes.Address0xHex]string // map for lookup to filename
@@ -139,12 +143,29 @@ func (w *fsWallet) Initialize(ctx context.Context) error {
139143
return w.Refresh(ctx)
140144
}
141145

146+
// Asynchronously listen for all addresses as they are detected - during startup, or after startup
142147
func (w *fsWallet) AddListener(listener chan<- ethtypes.Address0xHex) {
143148
w.mux.Lock()
144149
defer w.mux.Unlock()
145150
w.listeners = append(w.listeners, listener)
146151
}
147152

153+
// As an alternative to registering a listener are able to supply a single *synchronous* callback
154+
// that will process all of the addresses that exist on-disk at the time of refresh in-line.
155+
// This is very useful if you want to be sure your application using this module does not advertise it
156+
// is available until it has built a lookup map for all of the files that existed before it started.
157+
//
158+
// This will (by definition) delay initialize/refresh while that processing happens.
159+
//
160+
// Note there is no guarantee that this callback will be called on a single go-routine, as no
161+
// locks are held while calling it. So if refresh is called on two goroutines concurrently, it
162+
// will be called concurrently. However, it is guaranteed to be called in-line with the
163+
// initialize/refresh so you know once that calls returns all new keys detected by it have
164+
// driven the callback.
165+
func (w *fsWallet) SetSyncAddressCallback(callback SyncAddressCallback) {
166+
w.syncAddressCallback = callback
167+
}
168+
148169
// GetAccounts returns the currently cached list of known addresses
149170
func (w *fsWallet) GetAccounts(_ context.Context) ([]*ethtypes.Address0xHex, error) {
150171
w.mux.Lock()
@@ -197,17 +218,14 @@ func (w *fsWallet) Refresh(ctx context.Context) error {
197218
files = append(files, fi)
198219
}
199220
}
200-
if len(files) > 0 {
201-
w.notifyNewFiles(ctx, files...)
202-
}
203-
return nil
221+
return w.notifyNewFiles(ctx, files...)
204222
}
205223

206-
func (w *fsWallet) notifyNewFiles(ctx context.Context, files ...fs.FileInfo) {
224+
func (w *fsWallet) processNewFiles(ctx context.Context, files ...fs.FileInfo) (listeners []chan<- ethtypes.Address0xHex, newAddresses []*ethtypes.Address0xHex) {
207225
// Lock now we have the list
208226
w.mux.Lock()
209227
defer w.mux.Unlock()
210-
newAddresses := make([]*ethtypes.Address0xHex, 0)
228+
newAddresses = make([]*ethtypes.Address0xHex, 0)
211229
for _, f := range files {
212230
addr := w.matchFilename(ctx, f)
213231
if addr != nil {
@@ -221,17 +239,43 @@ func (w *fsWallet) notifyNewFiles(ctx context.Context, files ...fs.FileInfo) {
221239
}
222240
}
223241
}
224-
listeners := make([]chan<- ethtypes.Address0xHex, len(w.listeners))
242+
listeners = make([]chan<- ethtypes.Address0xHex, len(w.listeners))
225243
copy(listeners, w.listeners)
226244
log.L(ctx).Debugf("Processed %d files. Found %d new addresses", len(files), len(newAddresses))
227-
// Avoid holding the lock while calling the listeners, by using a go-routine
228-
go func() {
229-
for _, l := range w.listeners {
245+
return listeners, newAddresses
246+
}
247+
248+
func (w *fsWallet) notifyNewFiles(ctx context.Context, files ...fs.FileInfo) error {
249+
250+
// This function takes the lock and releases with a copy of the listeners, and a list of new addresses
251+
listeners, newAddresses := w.processNewFiles(ctx, files...)
252+
253+
if len(newAddresses) > 0 {
254+
255+
if w.syncAddressCallback != nil {
256+
// Sync callbacks are called here in-line, but outside the lock.
230257
for _, addr := range newAddresses {
231-
l <- *addr
258+
if err := w.syncAddressCallback(ctx, *addr); err != nil {
259+
log.L(ctx).Errorf("sync listener returned error for address %s: %s", addr, err)
260+
return err
261+
}
232262
}
233263
}
234-
}()
264+
265+
if len(listeners) > 0 {
266+
// Avoid any blocking of this routine using a separate go-routine that will deliver async callbacks
267+
go func() {
268+
for _, l := range listeners {
269+
for _, addr := range newAddresses {
270+
l <- *addr
271+
}
272+
}
273+
}()
274+
}
275+
276+
}
277+
278+
return nil
235279
}
236280

237281
func (w *fsWallet) Close() error {

pkg/fswallet/fswallet_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package fswallet
1919
import (
2020
"context"
2121
"encoding/json"
22+
"fmt"
2223
"os"
2324
"path"
2425
"testing"
@@ -436,3 +437,55 @@ func TestLoadKeyBadPath(t *testing.T) {
436437
assert.Regexp(t, "FF22015", err)
437438

438439
}
440+
441+
func TestSyncInitCallbackOK(t *testing.T) {
442+
config.RootConfigReset()
443+
logrus.SetLevel(logrus.TraceLevel)
444+
445+
unitTestConfig := config.RootSection("ut_fs_config")
446+
InitConfig(unitTestConfig)
447+
unitTestConfig.Set(ConfigPath, "../../test/keystore_toml")
448+
unitTestConfig.Set(ConfigFilenamesPrimaryMatchRegex, "^((0x)?[0-9a-z]+).key.json$")
449+
unitTestConfig.Set(ConfigFilenamesPasswordExt, ".pwd")
450+
unitTestConfig.Set(ConfigDisableListener, true)
451+
ctx := context.Background()
452+
453+
ff, err := NewFilesystemWallet(ctx, ReadConfig(unitTestConfig))
454+
assert.NoError(t, err)
455+
defer ff.Close()
456+
457+
callCount := 0
458+
ff.SetSyncAddressCallback(func(ctx context.Context, ah ethtypes.Address0xHex) error {
459+
callCount++
460+
return nil
461+
})
462+
463+
err = ff.Initialize(ctx)
464+
assert.NoError(t, err)
465+
466+
assert.Equal(t, callCount, 2)
467+
}
468+
469+
func TestSyncInitCallbackError(t *testing.T) {
470+
config.RootConfigReset()
471+
logrus.SetLevel(logrus.TraceLevel)
472+
473+
unitTestConfig := config.RootSection("ut_fs_config")
474+
InitConfig(unitTestConfig)
475+
unitTestConfig.Set(ConfigPath, "../../test/keystore_toml")
476+
unitTestConfig.Set(ConfigFilenamesPrimaryMatchRegex, "^((0x)?[0-9a-z]+).key.json$")
477+
unitTestConfig.Set(ConfigFilenamesPasswordExt, ".pwd")
478+
unitTestConfig.Set(ConfigDisableListener, true)
479+
ctx := context.Background()
480+
481+
ff, err := NewFilesystemWallet(ctx, ReadConfig(unitTestConfig))
482+
assert.NoError(t, err)
483+
defer ff.Close()
484+
485+
ff.SetSyncAddressCallback(func(ctx context.Context, ah ethtypes.Address0xHex) error {
486+
return fmt.Errorf("pop")
487+
})
488+
489+
err = ff.Initialize(ctx)
490+
assert.Regexp(t, "pop", err)
491+
}

0 commit comments

Comments
 (0)