Skip to content

Commit 3f87c8c

Browse files
committed
MEDIUM: Implement an EventListener to listen for ACME event
1 parent 09c54d3 commit 3f87c8c

3 files changed

Lines changed: 264 additions & 2 deletions

File tree

runtime/acme.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ func parseAcmeStatus(resp string) (models.AcmeStatus, error) {
6565
return false
6666
}
6767

68-
exp, e := time.Parse(time.RFC3339, parts[3])
68+
exp, e := parseAcmeDate(parts[3])
6969
if e != nil {
7070
err = fmt.Errorf("failed to parse date in response: '%s': %w", parts[3], e)
7171
return false
7272
}
73-
sched, e := time.Parse(time.RFC3339, parts[5])
73+
sched, e := parseAcmeDate(parts[5])
7474
if e != nil {
7575
err = fmt.Errorf("failed to parse date in response: '%s': %w", parts[5], e)
7676
return false
@@ -92,3 +92,10 @@ func parseAcmeStatus(resp string) (models.AcmeStatus, error) {
9292

9393
return status, err
9494
}
95+
96+
func parseAcmeDate(s string) (time.Time, error) {
97+
if s == "-" {
98+
return time.Time{}, nil
99+
}
100+
return time.Parse(time.RFC3339, s)
101+
}

runtime/runtime_event_listener.go

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
// Copyright 2025 HAProxy Technologies
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
16+
package runtime
17+
18+
import (
19+
"bufio"
20+
"bytes"
21+
"context"
22+
"fmt"
23+
"net"
24+
"slices"
25+
"strings"
26+
"sync/atomic"
27+
"time"
28+
)
29+
30+
// An Event sent by HAProxy.
31+
type Event struct {
32+
Timestamp time.Time
33+
Message string
34+
}
35+
36+
func (e Event) String() string {
37+
return e.Timestamp.Format(time.StampMicro) + " " + e.Message
38+
}
39+
40+
// An EventListener connects to HAProxy's master socket and listens
41+
// for events on a specific sink using the "show events" command.
42+
type EventListener struct {
43+
conn net.Conn
44+
// Buffered reader for conn.
45+
reader *bufio.Reader
46+
// Network to use with net.Dial (unix, udp...).
47+
network string
48+
// Address or path to HAProxy's master socket.
49+
address string
50+
// HAProxy's sink to read events from.
51+
sink string
52+
// Command used to listen for events.
53+
listenCmd string
54+
// Format used to parse events timestamps. Defaults to HAProxy's LOG_FORMAT_ISO.
55+
// Can be set before calling Listen.
56+
DateFormat string
57+
// Write timeout. Can be set before calling Listen.
58+
WriteTimeout time.Duration
59+
// Message delimiter. Either \n or 0 (zero).
60+
delim byte
61+
events chan Event
62+
lastError error
63+
closed atomic.Bool
64+
}
65+
66+
// NewEventListener connects to HAProxy's master socket and returns a new
67+
// EventListener configured for the given sink and flags. The timeout parameter
68+
// is used both as the default WriteTimeout and as the connect timeout.
69+
//
70+
// The caller must call Close to properly shutdown an EventListener.
71+
func NewEventListener(network, address, sink string, timeout time.Duration, flags ...string) (*EventListener, error) {
72+
var err error
73+
74+
l := &EventListener{
75+
network: network,
76+
address: address,
77+
sink: sink,
78+
listenCmd: fmt.Sprintf("show events %s %s", sink, strings.Join(flags, " ")),
79+
DateFormat: "2006-01-02T15:04:05.000000-07:00", // LOG_FORMAT_TIMED, LOG_FORMAT_ISO
80+
WriteTimeout: timeout,
81+
delim: '\n',
82+
events: make(chan Event),
83+
}
84+
85+
if slices.Contains(flags, "-0") {
86+
l.delim = byte(0)
87+
}
88+
89+
l.conn, err = net.DialTimeout(network, address, timeout)
90+
if err != nil {
91+
return nil, l.errorf("%w", err)
92+
}
93+
94+
l.reader = bufio.NewReader(l.conn)
95+
96+
go l.listen()
97+
98+
return l, nil
99+
}
100+
101+
// Listen for for new events. Blocks until a new event is available,
102+
// or until the ctx deadline is reached. On success, it returns the
103+
// event's payload without its timestamp.
104+
//
105+
// Listen can return an error in the following cases: a network error
106+
// (of type *net.OpError), EOF, an error returned by HAProxy or a parsing error.
107+
//
108+
// In case of a network error, the caller should Close the EventListener
109+
// and create a new one to continue to receive events.
110+
func (l *EventListener) Listen(ctx context.Context) (Event, error) {
111+
select {
112+
case event, ok := <-l.events:
113+
if !ok {
114+
return Event{}, l.lastError
115+
}
116+
return event, nil
117+
case <-ctx.Done():
118+
return Event{}, l.Close()
119+
}
120+
}
121+
122+
// Close the EventListener cleanly.
123+
func (l *EventListener) Close() error {
124+
if l.closed.CompareAndSwap(false, true) {
125+
defer close(l.events)
126+
if err := l.conn.Close(); err != nil {
127+
return l.errorf("%w", err)
128+
}
129+
}
130+
131+
return nil
132+
}
133+
134+
// Listen for events and push them on the events channel.
135+
func (l *EventListener) listen() {
136+
if l.WriteTimeout > 0 {
137+
_ = l.conn.SetWriteDeadline(time.Now().Add(l.WriteTimeout))
138+
}
139+
140+
_, err := fmt.Fprintf(l.conn, "@@1 set severity-output number;%s\n", l.listenCmd)
141+
if err != nil {
142+
l.lastError = l.errorf("%w", err)
143+
_ = l.Close()
144+
return
145+
}
146+
147+
for {
148+
data, err := l.reader.ReadBytes(l.delim)
149+
if err != nil {
150+
l.lastError = l.errorf("%w", err)
151+
_ = l.Close()
152+
return
153+
}
154+
155+
event, err := l.parseEvent(data)
156+
if err != nil {
157+
l.lastError = err
158+
_ = l.Close()
159+
return
160+
}
161+
162+
l.events <- event
163+
}
164+
}
165+
166+
func (l *EventListener) parseEvent(data []byte) (Event, error) {
167+
data = bytes.TrimPrefix(data, []byte{'\n'})
168+
data = bytes.TrimPrefix(data, []byte("<0>")) // syslog artefact
169+
if l.delim == 0 {
170+
data = bytes.TrimSuffix(data, []byte("\n\x00"))
171+
}
172+
data = bytes.TrimSpace(data)
173+
174+
event := string(data)
175+
176+
if len(event) > 4 && event[0] == '[' {
177+
switch event[0:4] {
178+
case "[3]:", "[2]:", "[1]:", "[0]:":
179+
return Event{}, l.errorf("[%c] %s [%s]", event[1], event[4:], l.listenCmd)
180+
}
181+
}
182+
183+
timestamp, msg, found := strings.Cut(event, " ")
184+
if !found {
185+
return Event{}, l.errorf("parsing error: '%s'", event)
186+
}
187+
188+
ts, err := time.Parse(l.DateFormat, timestamp)
189+
if err != nil {
190+
return Event{}, l.errorf("time parsing error: '%s'", timestamp)
191+
}
192+
193+
return Event{Timestamp: ts, Message: msg}, nil
194+
}
195+
196+
func (l *EventListener) errorf(format string, a ...any) error {
197+
format = fmt.Sprintf("EventListener(%s): %s", l.sink, format)
198+
return fmt.Errorf(format, a...)
199+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package runtime
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestEventListener(t *testing.T) {
9+
haProxy := NewHAProxyMock(t)
10+
haProxy.Start()
11+
defer haProxy.Stop()
12+
13+
type fields struct {
14+
address string
15+
sink string
16+
flags []string
17+
}
18+
tests := []struct {
19+
name string
20+
fields fields
21+
want string
22+
wantErr bool
23+
socketResponse map[string]string
24+
}{
25+
{
26+
name: "acme newcert event",
27+
fields: fields{address: haProxy.Addr().String(), sink: "dpapi", flags: []string{"-w", "-0"}},
28+
want: "acme newcert foobar.pem.rsa",
29+
socketResponse: map[string]string{
30+
"show events dpapi -w -0\n": "<0>2025-05-19T15:56:23.059755+02:00 acme newcert foobar.pem.rsa\n\x00",
31+
},
32+
},
33+
}
34+
for _, tt := range tests {
35+
t.Run(tt.name, func(t *testing.T) {
36+
haProxy.SetResponses(&tt.socketResponse)
37+
l, err := NewEventListener("unix", tt.fields.address, tt.fields.sink, time.Second, tt.fields.flags...)
38+
if err != nil {
39+
t.Errorf("NewEventListener() error = %v", err)
40+
return
41+
}
42+
got, err := l.Listen(t.Context())
43+
if (err != nil) != tt.wantErr {
44+
t.Errorf("EventListener.Listen() error = %v, wantErr %v", err, tt.wantErr)
45+
return
46+
}
47+
if got.Message != tt.want {
48+
t.Errorf("EventListener.Listen() = %v, want %v", got, tt.want)
49+
}
50+
// Already closed anyway.
51+
if err = l.Close(); err != nil {
52+
t.Errorf("EventListener.Close() error = %v", err)
53+
}
54+
})
55+
}
56+
}

0 commit comments

Comments
 (0)