-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintegration_test.go
More file actions
128 lines (115 loc) · 2.88 KB
/
integration_test.go
File metadata and controls
128 lines (115 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package otfranz
import (
"context"
"encoding/json"
"io/ioutil"
"os"
"strings"
"testing"
"github.com/DoNewsCode/core"
"github.com/DoNewsCode/core/config"
knoaf_json "github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/providers/file"
"github.com/oklog/run"
"github.com/stretchr/testify/assert"
"github.com/twmb/franz-go/pkg/kgo"
)
type channelWatcher struct {
ch chan struct{}
afterReload chan struct{}
}
func (c *channelWatcher) Watch(ctx context.Context, reload func() error) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
reload()
c.afterReload <- struct{}{}
}
}
}
func TestModule_hotReload(t *testing.T) {
if os.Getenv("KAFKA_ADDR") == "" {
t.Skip("set KAFKA_ADDR to run TestModule_ProvideRunGroup")
return
}
addrs := strings.Split(os.Getenv("KAFKA_ADDR"), ",")
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cw := &channelWatcher{}
cw.ch = make(chan struct{})
cw.afterReload = make(chan struct{})
conf := map[string]interface{}{
"http": map[string]bool{
"disable": true,
},
"grpc": map[string]bool{
"disable": true,
},
"cron": map[string]bool{
"disable": true,
},
"kafka": map[string]interface{}{
"default": map[string]interface{}{
"seed_brokers": addrs,
"default_produce_topic": "foo",
},
},
"log": map[string]string{
"level": "none",
},
}
path := createFile(conf)
c := core.Default(core.WithConfigStack(file.Provider(path), knoaf_json.Parser()), core.WithConfigWatcher(cw))
defer c.Shutdown()
c.Provide(Providers(
WithReload(true),
WithInterceptor(func(name string, config *Config) {
config.MaxBytes = 100
}),
))
c.AddModuleFunc(config.New)
var group run.Group
c.ApplyRunGroup(&group)
group.Add(func() error {
<-ctx.Done()
return ctx.Err()
}, func(err error) {
cancel()
})
go group.Run()
c.Invoke(func(f Factory) {
cli, err := f.Make("default")
assert.NoError(t, err)
record := &kgo.Record{Value: []byte("bar")}
cli.Produce(ctx, record, func(r *kgo.Record, err error) {
assert.Equal(t, "foo", r.Topic)
})
})
// Reload config
conf["kafka"].(map[string]interface{})["default"].(map[string]interface{})["default_produce_topic"] = "bar"
overwriteFile(path, conf)
cw.ch <- struct{}{}
<-cw.afterReload
// Test reloaded values
c.Invoke(func(f Factory) {
cli, err := f.Make("default")
assert.NoError(t, err)
record := &kgo.Record{Value: []byte("bar")}
cli.Produce(ctx, record, func(r *kgo.Record, err error) {
assert.Equal(t, "bar", r.Topic)
})
})
}
func createFile(content map[string]interface{}) string {
f, _ := ioutil.TempFile("", "*")
data, _ := json.Marshal(content)
ioutil.WriteFile(f.Name(), data, os.ModePerm)
return f.Name()
}
func overwriteFile(path string, content map[string]interface{}) {
data, _ := json.Marshal(content)
ioutil.WriteFile(path, data, os.ModePerm)
}