Skip to content

Commit b679ec6

Browse files
committed
feat(agent/syslog): add RFC 5424 octet counting framing support and improve message handling
1 parent ef7f47d commit b679ec6

1 file changed

Lines changed: 109 additions & 5 deletions

File tree

agent/modules/syslog.go

Lines changed: 109 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"io"
1010
"net"
1111
"os"
12+
"strconv"
1213
"strings"
1314
"time"
1415

@@ -20,6 +21,20 @@ import (
2021
"github.com/utmstack/UTMStack/agent/utils"
2122
)
2223

24+
const (
25+
MinBufferSize = 480
26+
RecommendedBufferSize = 2048
27+
MaxBufferSize = 8192
28+
UDPBufferSize = 2048
29+
)
30+
31+
type FramingMethod int
32+
33+
const (
34+
FramingNewline FramingMethod = iota
35+
FramingOctetCounting
36+
)
37+
2338
type SyslogModule struct {
2439
DataType string
2540
TCPListener listenerTCP
@@ -204,7 +219,7 @@ func (m *SyslogModule) enableUDP() {
204219
m.UDPListener.Listener = listener
205220
m.UDPListener.CTX, m.UDPListener.Cancel = context.WithCancel(context.Background())
206221

207-
buffer := make([]byte, 1024)
222+
buffer := make([]byte, UDPBufferSize)
208223
msgChannel := make(chan config.MSGDS)
209224

210225
go m.handleConnectionUDP(msgChannel)
@@ -291,6 +306,88 @@ func (m *SyslogModule) disableUDP() {
291306
}
292307
}
293308

309+
// detectFramingMethod detects the syslog framing method by peeking at the first byte
310+
func detectFramingMethod(reader *bufio.Reader) (FramingMethod, error) {
311+
firstByte, err := reader.Peek(1)
312+
if err != nil {
313+
utils.Logger.ErrorF("failed to peek first byte for framing detection: %v", err)
314+
return 0, fmt.Errorf("failed to peek first byte: %w", err)
315+
}
316+
317+
if firstByte[0] >= '0' && firstByte[0] <= '9' {
318+
return FramingOctetCounting, nil
319+
}
320+
321+
if firstByte[0] == '<' {
322+
return FramingNewline, nil
323+
}
324+
325+
utils.Logger.ErrorF("unknown framing method detected, first byte: 0x%02x", firstByte[0])
326+
return 0, fmt.Errorf("unknown framing method, first byte: 0x%02x", firstByte[0])
327+
}
328+
329+
// readOctetCountingFrame reads a syslog message using octet counting framing method
330+
func readOctetCountingFrame(reader *bufio.Reader) (string, error) {
331+
lengthStr, err := reader.ReadString(' ')
332+
if err != nil {
333+
utils.Logger.ErrorF("failed to read message length in octet counting frame: %v", err)
334+
return "", fmt.Errorf("failed to read message length: %w", err)
335+
}
336+
337+
lengthStr = strings.TrimSuffix(lengthStr, " ")
338+
msgLen, err := strconv.Atoi(lengthStr)
339+
if err != nil {
340+
utils.Logger.ErrorF("invalid message length '%s' in octet counting frame: %v", lengthStr, err)
341+
return "", fmt.Errorf("invalid message length '%s': %w", lengthStr, err)
342+
}
343+
344+
if msgLen < 1 {
345+
utils.Logger.ErrorF("message length %d is too small (minimum 1 byte)", msgLen)
346+
return "", fmt.Errorf("message length %d is too small (minimum 1)", msgLen)
347+
}
348+
if msgLen > MaxBufferSize {
349+
utils.Logger.ErrorF("message length %d exceeds maximum %d bytes", msgLen, MaxBufferSize)
350+
return "", fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxBufferSize)
351+
}
352+
353+
msgBytes := make([]byte, msgLen)
354+
_, err = io.ReadFull(reader, msgBytes)
355+
if err != nil {
356+
utils.Logger.ErrorF("failed to read %d byte message body: %v", msgLen, err)
357+
return "", fmt.Errorf("failed to read %d byte message body: %w", msgLen, err)
358+
}
359+
360+
return string(msgBytes), nil
361+
}
362+
363+
// readNewlineFrame reads a syslog message using newline-delimited framing method
364+
func readNewlineFrame(reader *bufio.Reader) (string, error) {
365+
message, err := reader.ReadString('\n')
366+
if err != nil {
367+
utils.Logger.ErrorF("failed to read newline-delimited message: %v", err)
368+
return "", fmt.Errorf("failed to read newline-delimited message: %w", err)
369+
}
370+
return message, nil
371+
}
372+
373+
// readSyslogMessage reads a syslog message with automatic framing detection
374+
func readSyslogMessage(reader *bufio.Reader) (string, error) {
375+
method, err := detectFramingMethod(reader)
376+
if err != nil {
377+
return "", err
378+
}
379+
380+
switch method {
381+
case FramingOctetCounting:
382+
return readOctetCountingFrame(reader)
383+
case FramingNewline:
384+
return readNewlineFrame(reader)
385+
default:
386+
utils.Logger.ErrorF("unsupported framing method: %d", method)
387+
return "", fmt.Errorf("unsupported framing method: %d", method)
388+
}
389+
}
390+
294391
func (m *SyslogModule) handleConnectionTCP(c net.Conn) {
295392
defer c.Close()
296393
reader := bufio.NewReader(c)
@@ -336,12 +433,17 @@ func (m *SyslogModule) handleConnectionTCP(c net.Conn) {
336433
case <-m.TCPListener.CTX.Done():
337434
return
338435
default:
339-
message, err := reader.ReadString('\n')
436+
message, err := readSyslogMessage(reader)
340437
if err != nil {
341-
if err == io.EOF || err.(net.Error).Timeout() {
438+
if err == io.EOF {
439+
utils.Logger.Info("TCP connection closed by %s", remoteAddr)
440+
return
441+
}
442+
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
443+
utils.Logger.Info("TCP connection timeout from %s", remoteAddr)
342444
return
343445
}
344-
utils.Logger.ErrorF("error reading tcp data: %v", err)
446+
utils.Logger.ErrorF("error reading syslog message from %s: %v", remoteAddr, err)
345447
return
346448
}
347449
msgChannel <- config.MSGDS{
@@ -398,12 +500,14 @@ func (m *SyslogModule) handleTLSConnection(conn net.Conn) {
398500
default:
399501
// Set read timeout for each message
400502
conn.SetDeadline(time.Now().Add(30 * time.Second))
401-
message, err := reader.ReadString('\n')
503+
message, err := readSyslogMessage(reader)
402504
if err != nil {
403505
if err == io.EOF {
506+
utils.Logger.Info("TLS connection closed by %s", remoteAddr)
404507
return
405508
}
406509
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
510+
utils.Logger.Info("TLS connection timeout from %s", remoteAddr)
407511
return
408512
}
409513
utils.Logger.ErrorF("error reading TLS data from %s: %v", remoteAddr, err)

0 commit comments

Comments
 (0)