-
Notifications
You must be signed in to change notification settings - Fork 856
Expand file tree
/
Copy pathmanager.go
More file actions
253 lines (207 loc) · 6.94 KB
/
manager.go
File metadata and controls
253 lines (207 loc) · 6.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package runtimeconfig
import (
"bytes"
"context"
"crypto/sha256"
"flag"
"fmt"
"io"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/objstore"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/services"
)
type BucketClientFactory func(ctx context.Context) (objstore.Bucket, error)
// Loader loads the configuration from file.
type Loader func(r io.Reader) (any, error)
// Config holds the config for an Manager instance.
// It holds config related to loading per-tenant config.
type Config struct {
ReloadPeriod time.Duration `yaml:"period"`
// LoadPath contains the path to the runtime config file, requires an
// non-empty value
LoadPath string `yaml:"file"`
Loader Loader `yaml:"-"`
StorageConfig bucket.Config `yaml:",inline"`
// DefaultTenantID is the synthetic tenant ID used as a fallback for default
// runtime config values. When set, overrides for this tenant ID are applied
// to all tenants that do not have their own per-tenant override, before
// falling back to CLI flag defaults. This is an experimental feature.
DefaultTenantID string `yaml:"default_tenant_id"`
}
// RegisterFlags registers flags.
func (mc *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&mc.LoadPath, "runtime-config.file", "", "File with the configuration that can be updated in runtime.")
f.DurationVar(&mc.ReloadPeriod, "runtime-config.reload-period", 10*time.Second, "How often to check runtime config file.")
f.StringVar(&mc.DefaultTenantID, "runtime-config.default-tenant-id", "", "[Experimental] Synthetic tenant ID used as fallback for runtime config defaults. When set, overrides for this tenant ID apply to all tenants without per-tenant overrides, before falling back to CLI flag defaults.")
mc.StorageConfig.RegisterFlagsWithPrefixAndBackend("runtime-config.", f, bucket.Filesystem)
}
// Manager periodically reloads the configuration from a file, and keeps this
// configuration available for clients.
type Manager struct {
services.Service
cfg Config
logger log.Logger
listenersMtx sync.Mutex
listeners []chan any
configMtx sync.RWMutex
config any
configLoadSuccess prometheus.Gauge
configHash *prometheus.GaugeVec
bucketClient objstore.Bucket
bucketClientFactory BucketClientFactory
}
// New creates an instance of Manager and starts reload config loop based on config
func New(cfg Config, registerer prometheus.Registerer, logger log.Logger, factory BucketClientFactory) (*Manager, error) {
if cfg.LoadPath == "" {
return nil, errors.New("LoadPath is empty")
}
if cfg.StorageConfig.Backend == "" {
return nil, errors.New("Backend should not be explicitly empty")
}
mgr := Manager{
cfg: cfg,
configLoadSuccess: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "runtime_config_last_reload_successful",
Help: "Whether the last runtime-config reload attempt was successful.",
}),
configHash: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Name: "runtime_config_hash",
Help: "Hash of the currently active runtime config file.",
}, []string{"sha256"}),
logger: logger,
bucketClientFactory: factory,
}
mgr.Service = services.NewBasicService(mgr.starting, mgr.loop, mgr.stopping)
return &mgr, nil
}
func (om *Manager) starting(ctx context.Context) error {
if om.cfg.LoadPath == "" {
return nil
}
var err error
om.bucketClient, err = om.bucketClientFactory(ctx)
if err != nil {
return err
}
return errors.Wrap(om.loadConfig(ctx), "failed to load runtime config")
}
// CreateListenerChannel creates new channel that can be used to receive new config values.
// If there is no receiver waiting for value when config manager tries to send the update,
// or channel buffer is full, update is discarded.
//
// When config manager is stopped, it closes all channels to notify receivers that they will
// not receive any more updates.
func (om *Manager) CreateListenerChannel(buffer int) <-chan any {
ch := make(chan any, buffer)
om.listenersMtx.Lock()
defer om.listenersMtx.Unlock()
om.listeners = append(om.listeners, ch)
return ch
}
// CloseListenerChannel removes given channel from list of channels to send notifications to and closes channel.
func (om *Manager) CloseListenerChannel(listener <-chan any) {
om.listenersMtx.Lock()
defer om.listenersMtx.Unlock()
for ix, ch := range om.listeners {
if ch == listener {
om.listeners = append(om.listeners[:ix], om.listeners[ix+1:]...)
close(ch)
break
}
}
}
func (om *Manager) loop(ctx context.Context) error {
if om.cfg.LoadPath == "" {
level.Info(om.logger).Log("msg", "runtime config disabled: file not specified")
<-ctx.Done()
return nil
}
ticker := time.NewTicker(om.cfg.ReloadPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := om.loadConfig(ctx)
if err != nil {
// Log but don't stop on error - we don't want to halt all ingesters because of a typo
level.Error(om.logger).Log("msg", "failed to load config", "err", err)
}
case <-ctx.Done():
return nil
}
}
}
// loadConfig loads configuration using the loader function, and if successful,
// stores it as current configuration and notifies listeners.
func (om *Manager) loadConfig(ctx context.Context) error {
buf, err := om.loadConfigFromBucket(ctx)
if err != nil {
om.configLoadSuccess.Set(0)
return errors.Wrap(err, "read file")
}
hash := sha256.Sum256(buf)
cfg, err := om.cfg.Loader(bytes.NewReader(buf))
if err != nil {
om.configLoadSuccess.Set(0)
return errors.Wrap(err, "load file")
}
om.configLoadSuccess.Set(1)
om.setConfig(cfg)
om.callListeners(cfg)
// expose hash of runtime config
om.configHash.Reset()
om.configHash.WithLabelValues(fmt.Sprintf("%x", hash[:])).Set(1)
return nil
}
func (om *Manager) loadConfigFromBucket(ctx context.Context) ([]byte, error) {
readCloser, err := om.bucketClient.Get(ctx, om.cfg.LoadPath)
if err != nil {
return nil, errors.Wrap(err, "open file")
}
buf, err := io.ReadAll(readCloser)
if err != nil {
return nil, errors.Wrap(err, "read entire file")
}
err = readCloser.Close()
return buf, err
}
func (om *Manager) setConfig(config any) {
om.configMtx.Lock()
defer om.configMtx.Unlock()
om.config = config
}
func (om *Manager) callListeners(newValue any) {
om.listenersMtx.Lock()
defer om.listenersMtx.Unlock()
for _, ch := range om.listeners {
select {
case ch <- newValue:
// ok
default:
// nobody is listening or buffer full.
}
}
}
// Stop stops the Manager
func (om *Manager) stopping(_ error) error {
om.listenersMtx.Lock()
defer om.listenersMtx.Unlock()
for _, ch := range om.listeners {
close(ch)
}
om.listeners = nil
return nil
}
// GetConfig returns last loaded config value, possibly nil.
func (om *Manager) GetConfig() any {
om.configMtx.RLock()
defer om.configMtx.RUnlock()
return om.config
}