Skip to content

Commit 7563669

Browse files
committed
Add PeriodicWorker to worker package
Signed-off-by: Bryan Frimin <bryan@frimin.fr>
1 parent 816f0c9 commit 7563669

2 files changed

Lines changed: 399 additions & 0 deletions

File tree

worker/periodic.go

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
// Copyright (c) 2024 Bryan Frimin <bryan@frimin.fr>.
2+
//
3+
// Permission to use, copy, modify, and/or distribute this software
4+
// for any purpose with or without fee is hereby granted, provided
5+
// that the above copyright notice and this permission notice appear
6+
// in all copies.
7+
//
8+
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL
9+
// WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED
10+
// WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
11+
// AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR
12+
// CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
13+
// OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
14+
// NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
15+
// CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16+
17+
package worker
18+
19+
import (
20+
"context"
21+
"sync"
22+
"time"
23+
24+
"github.com/prometheus/client_golang/prometheus"
25+
"go.gearno.de/kit/internal/version"
26+
"go.gearno.de/kit/log"
27+
"go.opentelemetry.io/otel"
28+
"go.opentelemetry.io/otel/attribute"
29+
"go.opentelemetry.io/otel/codes"
30+
"go.opentelemetry.io/otel/trace"
31+
)
32+
33+
type (
34+
// PeriodicHandler defines a single operation to execute on each
35+
// tick of a PeriodicWorker.
36+
PeriodicHandler interface {
37+
Run(ctx context.Context) error
38+
}
39+
40+
// PeriodicWorker runs a PeriodicHandler on a fixed interval,
41+
// skipping ticks when the previous run is still in progress.
42+
PeriodicWorker struct {
43+
name string
44+
handler PeriodicHandler
45+
logger *log.Logger
46+
tracer trace.Tracer
47+
interval time.Duration
48+
49+
cyclesTotal *prometheus.CounterVec
50+
runsTotal *prometheus.CounterVec
51+
runDuration *prometheus.HistogramVec
52+
skippedTotal *prometheus.CounterVec
53+
}
54+
)
55+
56+
// NewPeriodic creates a PeriodicWorker named name that runs handler
57+
// on every tick. The name identifies this worker in metrics, logs,
58+
// and traces.
59+
func NewPeriodic(name string, handler PeriodicHandler, logger *log.Logger, opts ...Option) *PeriodicWorker {
60+
o := options{
61+
interval: 10 * time.Second,
62+
tracerProvider: otel.GetTracerProvider(),
63+
registerer: prometheus.DefaultRegisterer,
64+
}
65+
66+
for _, opt := range opts {
67+
opt(&o)
68+
}
69+
70+
workerLabel := []string{"worker"}
71+
statusLabels := []string{"worker", "status"}
72+
73+
cyclesTotal := prometheus.NewCounterVec(
74+
prometheus.CounterOpts{
75+
Subsystem: "periodic_worker",
76+
Name: "cycles_total",
77+
Help: "Total number of periodic worker cycles.",
78+
},
79+
workerLabel,
80+
)
81+
if err := o.registerer.Register(cyclesTotal); err != nil {
82+
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
83+
cyclesTotal = are.ExistingCollector.(*prometheus.CounterVec)
84+
} else {
85+
panic(err)
86+
}
87+
}
88+
89+
runsTotal := prometheus.NewCounterVec(
90+
prometheus.CounterOpts{
91+
Subsystem: "periodic_worker",
92+
Name: "runs_total",
93+
Help: "Total number of periodic handler runs.",
94+
},
95+
statusLabels,
96+
)
97+
if err := o.registerer.Register(runsTotal); err != nil {
98+
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
99+
runsTotal = are.ExistingCollector.(*prometheus.CounterVec)
100+
} else {
101+
panic(err)
102+
}
103+
}
104+
105+
runDuration := prometheus.NewHistogramVec(
106+
prometheus.HistogramOpts{
107+
Subsystem: "periodic_worker",
108+
Name: "run_duration_seconds",
109+
Help: "Duration of periodic handler runs in seconds.",
110+
Buckets: prometheus.DefBuckets,
111+
},
112+
statusLabels,
113+
)
114+
if err := o.registerer.Register(runDuration); err != nil {
115+
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
116+
runDuration = are.ExistingCollector.(*prometheus.HistogramVec)
117+
} else {
118+
panic(err)
119+
}
120+
}
121+
122+
skippedTotal := prometheus.NewCounterVec(
123+
prometheus.CounterOpts{
124+
Subsystem: "periodic_worker",
125+
Name: "skipped_total",
126+
Help: "Total number of ticks skipped because the previous run was still in progress.",
127+
},
128+
workerLabel,
129+
)
130+
if err := o.registerer.Register(skippedTotal); err != nil {
131+
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
132+
skippedTotal = are.ExistingCollector.(*prometheus.CounterVec)
133+
} else {
134+
panic(err)
135+
}
136+
}
137+
138+
return &PeriodicWorker{
139+
name: name,
140+
handler: handler,
141+
logger: logger.Named(name),
142+
tracer: o.tracerProvider.Tracer(
143+
tracerName,
144+
trace.WithInstrumentationVersion(
145+
version.New(0).Alpha(1),
146+
),
147+
),
148+
interval: o.interval,
149+
cyclesTotal: cyclesTotal,
150+
runsTotal: runsTotal,
151+
runDuration: runDuration,
152+
skippedTotal: skippedTotal,
153+
}
154+
}
155+
156+
// Run starts the periodic worker loop. It blocks until ctx is
157+
// cancelled, then waits for the in-flight run to complete before
158+
// returning.
159+
func (w *PeriodicWorker) Run(ctx context.Context) error {
160+
var (
161+
wg sync.WaitGroup
162+
mu sync.Mutex
163+
ticker = time.NewTicker(w.interval)
164+
)
165+
166+
defer ticker.Stop()
167+
defer wg.Wait()
168+
169+
for {
170+
select {
171+
case <-ctx.Done():
172+
return context.Cause(ctx)
173+
case <-ticker.C:
174+
w.cyclesTotal.WithLabelValues(w.name).Inc()
175+
176+
if !mu.TryLock() {
177+
w.skippedTotal.WithLabelValues(w.name).Inc()
178+
w.logger.WarnCtx(ctx, "skipping tick: previous run still in progress")
179+
continue
180+
}
181+
182+
wg.Go(func() {
183+
defer mu.Unlock()
184+
w.run(ctx)
185+
})
186+
}
187+
}
188+
}
189+
190+
func (w *PeriodicWorker) run(ctx context.Context) {
191+
nonCancelableCtx := context.WithoutCancel(ctx)
192+
193+
runCtx, span := w.tracer.Start(
194+
nonCancelableCtx,
195+
"periodic_worker.run",
196+
trace.WithSpanKind(trace.SpanKindInternal),
197+
trace.WithAttributes(
198+
attribute.String("worker.name", w.name),
199+
),
200+
)
201+
defer span.End()
202+
203+
start := time.Now()
204+
err := w.handler.Run(runCtx)
205+
duration := time.Since(start)
206+
207+
status := "succeeded"
208+
if err != nil {
209+
status = "failed"
210+
span.RecordError(err)
211+
span.SetStatus(codes.Error, err.Error())
212+
w.logger.ErrorCtx(
213+
runCtx,
214+
"run failed",
215+
log.Error(err),
216+
log.Duration("duration", duration),
217+
)
218+
} else {
219+
w.logger.InfoCtx(
220+
runCtx,
221+
"run succeeded",
222+
log.Duration("duration", duration),
223+
)
224+
}
225+
226+
w.runsTotal.WithLabelValues(w.name, status).Inc()
227+
w.runDuration.WithLabelValues(w.name, status).Observe(duration.Seconds())
228+
}

0 commit comments

Comments
 (0)