Skip to content

Commit fa2c111

Browse files
authored
Merge pull request #193 from Aiven-Open/dogukancagatay/fix-octet-counted-overflow-edge-case
rsyslog: fix octet-counted framing on oversize messages
2 parents b898922 + 0386209 commit fa2c111

4 files changed

Lines changed: 244 additions & 20 deletions

File tree

README.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,14 @@ By default, log messages are sent using non-transparent framing and are terminat
595595
Set it to ``true`` if the server supports octet-counted framing and messages are multi-line.
596596
(see `RFC 6587 for Syslog over TCP message transfer <https://datatracker.ietf.org/doc/html/rfc6587#section-3.4.1>`_)
597597

598+
``max_message_size`` (default ``2048``)
599+
600+
Maximum size in bytes of the message body sent to the syslog server. Messages longer than this
601+
are truncated. With non-transparent framing the truncated body is terminated with a newline.
602+
With octet-counted framing the truncated body is wrapped in a ``<length> SP <body>`` frame
603+
whose length matches the actual body bytes sent (RFC 6587 section 3.4.1), so the stream
604+
framing is never corrupted by an oversize message.
605+
598606
``structured_data`` (default ``null``)
599607

600608
Content of structured data section (optional, required by some services to identify the sender).

journalpump/rsyslog.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -147,29 +147,32 @@ def close(self):
147147
finally:
148148
self.socket = None
149149

150-
def send(self, message):
150+
def send(self, message: bytes) -> None:
151151
for retry in [True, False]:
152152
try:
153153
if self.socket is None:
154154
self._connect()
155+
assert self.socket is not None
155156

156-
# Syslog over TCP (RFC 6587) supports 2 message framing methods:
157-
# - Octet-counted framing: Each message is prefixed with its
158-
# length and a space (<length> <message>).
159-
# - Non-transparent framing: Each message is terminated by a
160-
# newline (\n).
161-
# So far, only non-transparent framing was used by journalpump
162-
# and multi-line log messages were terminated at first newline.
163-
# We keep this behavior by default, but now expose a new config
164-
# allowing to switch to octet-counted framing. Provided that this
165-
# method is supported by the syslog server, multi-line messages
166-
# will only get truncated according to the max_message_size.
157+
# RFC 6587 defines two framing methods for syslog over TCP.
167158
if self.octet_counted_framing:
168-
message = f"{len(message)} ".encode("utf-8") + message
169-
170-
self.socket.sendall(message[: self.max_msg - 1])
171-
if len(message) >= self.max_msg:
172-
self.socket.sendall(b"\n")
159+
# Octet-counted framing (RFC 6587 section 3.4.1):
160+
# <MSGLEN> SP <SYSLOG-MSG>. The receiver reads exactly
161+
# <MSGLEN> bytes after the space, so the advertised length
162+
# must equal the body bytes actually sent. max_message_size
163+
# bounds the body (matching rsyslog MaxMessageSize semantics).
164+
# No trailing newline — it would be parsed as the start of
165+
# the next frame's length prefix and desync the stream.
166+
body = message[: self.max_msg]
167+
frame = f"{len(body)} ".encode("utf-8") + body
168+
self.socket.sendall(frame)
169+
else:
170+
# Non-transparent framing (RFC 6587 section 3.4.2): frames
171+
# are delimited by a trailing newline. The formatter already
172+
# appends '\n'; when we truncate we add it explicitly.
173+
self.socket.sendall(message[: self.max_msg - 1])
174+
if len(message) >= self.max_msg:
175+
self.socket.sendall(b"\n")
173176

174177
break
175178
except Exception as ex: # pylint: disable=broad-except

systest/test_rsyslog.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ def _run_pump_test(
116116
messages_to_send,
117117
expected_message_count,
118118
expected_multiline_support,
119+
expected_subsequent_message=None,
119120
):
120121
journalpump = None
121122
threads = []
@@ -156,6 +157,7 @@ def _run_pump_test(
156157
# Check the results
157158
found = 0
158159
multiline_support = False
160+
subsequent_message_found = False
159161
with open(logfile, "r", encoding="utf-8") as fp:
160162
lines = fp.readlines()
161163
for txt in ["Info", "Warning", "Error", "Critical"]:
@@ -167,14 +169,24 @@ def _run_pump_test(
167169
if txt == "Info":
168170
multiline_support = line.endswith("example#012stack#012trace {%} -\n")
169171
break
172+
if expected_subsequent_message is not None:
173+
subsequent_pattern = re.compile(rf".*{expected_subsequent_message} for {identifier}.*")
174+
for line in lines:
175+
if subsequent_pattern.match(line):
176+
log.info("Found subsequent message: %s", line)
177+
subsequent_message_found = True
178+
break
179+
assert (
180+
subsequent_message_found
181+
), "Subsequent message not found — connection likely desynced after oversize octet-counted frame"
170182
assert found == expected_message_count, "Expected messages not found in syslog"
171183
assert (
172184
multiline_support == expected_multiline_support
173185
), f"Multi-line support is {multiline_support} which does not match expected {expected_multiline_support}"
174186

175187

176188
@pytest.mark.parametrize(
177-
"messages_to_send,octet_counted_framing_config,expected_message_count,expected_multiline_support",
189+
"messages_to_send,sender_config,expected_message_count,expected_multiline_support,expected_subsequent_message",
178190
[
179191
(
180192
[
@@ -192,6 +204,7 @@ def _run_pump_test(
192204
{}, # config not specified, octet_counted_framing is False by default
193205
4,
194206
False,
207+
None,
195208
),
196209
(
197210
[
@@ -203,6 +216,7 @@ def _run_pump_test(
203216
{"octet_counted_framing": False},
204217
1,
205218
False,
219+
None,
206220
),
207221
(
208222
[
@@ -214,15 +228,35 @@ def _run_pump_test(
214228
{"octet_counted_framing": True},
215229
1,
216230
True,
231+
None,
232+
),
233+
# Verify that an oversize octet-counted frame is truncated without desyncing the stream.
234+
# A short subsequent message on the same connection must still be delivered correctly.
235+
(
236+
[
237+
{
238+
"text": "Info message for {identifier}\n" + "X" * 300,
239+
"PRIORITY": journal.LOG_INFO,
240+
},
241+
{
242+
"text": "Critical message for {identifier}",
243+
"PRIORITY": journal.LOG_CRIT,
244+
},
245+
],
246+
{"octet_counted_framing": True, "max_message_size": 80},
247+
2,
248+
False,
249+
"Critical message",
217250
),
218251
],
219252
)
220253
def test_rsyslogd_tcp_sender(
221254
tmpdir,
222255
messages_to_send,
223-
octet_counted_framing_config,
256+
sender_config,
224257
expected_message_count,
225258
expected_multiline_support,
259+
expected_subsequent_message,
226260
):
227261
workdir = tmpdir.dirname
228262
logfile = f"{workdir}/test.log"
@@ -239,7 +273,7 @@ def test_rsyslogd_tcp_sender(
239273
"rsyslog_port": 5140,
240274
"format": "custom",
241275
"logline": "<%pri%>%timestamp% %HOSTNAME% %app-name%[%procid%]: %msg% {%%} %not-valid-tag%",
242-
**dict(octet_counted_framing_config),
276+
**dict(sender_config),
243277
},
244278
},
245279
},
@@ -256,6 +290,7 @@ def test_rsyslogd_tcp_sender(
256290
messages_to_send=messages_to_send,
257291
expected_message_count=expected_message_count,
258292
expected_multiline_support=expected_multiline_support,
293+
expected_subsequent_message=expected_subsequent_message,
259294
)
260295
finally:
261296
rsyslogd.stop()

test/test_rsyslog.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# Copyright 2019, Aiven, https://aiven.io/
2+
#
3+
# This file is under the Apache License, Version 2.0.
4+
# See the file `LICENSE` for details.
5+
from journalpump.rsyslog import SyslogTcpClient
6+
from unittest import mock
7+
8+
import pytest
9+
10+
11+
class _StubSocket:
12+
"""Minimal socket stub that records bytes passed to sendall."""
13+
14+
def __init__(self) -> None:
15+
self.sent = bytearray()
16+
17+
def sendall(self, data: bytes) -> None:
18+
self.sent.extend(data)
19+
20+
def close(self) -> None:
21+
pass
22+
23+
24+
def _make_client(*, max_msg: int, octet_counted_framing: bool) -> tuple[SyslogTcpClient, _StubSocket]:
25+
with mock.patch.object(SyslogTcpClient, "_connect"):
26+
client = SyslogTcpClient(
27+
server="127.0.0.1",
28+
port=1,
29+
rfc="RFC5424",
30+
max_msg=max_msg,
31+
octet_counted_framing=octet_counted_framing,
32+
)
33+
sock = _StubSocket()
34+
client.socket = sock
35+
return client, sock
36+
37+
38+
def _parse_octet_counted_frames(stream: bytes) -> list[bytes]:
39+
"""Parse an RFC 6587 octet-counted stream into individual message bodies."""
40+
frames: list[bytes] = []
41+
i = 0
42+
while i < len(stream):
43+
sp = stream.index(b" ", i)
44+
n = int(stream[i:sp])
45+
body = stream[sp + 1 : sp + 1 + n]
46+
assert len(body) == n, f"stream desync at offset {i}: advertised {n} bytes, got {len(body)}"
47+
frames.append(bytes(body))
48+
i = sp + 1 + n
49+
return frames
50+
51+
52+
class TestNonTransparentFraming:
53+
"""octet_counted_framing=False — must be byte-for-byte identical to the pre-fix behavior."""
54+
55+
def test_small_body_sent_verbatim(self):
56+
body = b"<13>1 2024-01-01T00:00:00Z host app - - hello\n"
57+
max_msg = len(body) + 10
58+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=False)
59+
60+
client.send(body)
61+
62+
assert bytes(sock.sent) == body
63+
64+
def test_small_body_no_extra_newline(self):
65+
body = b"<13>1 2024-01-01T00:00:00Z host app - - hello\n"
66+
max_msg = len(body) + 10
67+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=False)
68+
69+
client.send(body)
70+
71+
assert sock.sent.count(b"\n") == 1
72+
73+
def test_oversize_body_truncated_and_newline_appended(self):
74+
body = b"A" * 200 + b"\n"
75+
max_msg = 50
76+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=False)
77+
78+
client.send(body)
79+
80+
expected = body[: max_msg - 1] + b"\n"
81+
assert bytes(sock.sent) == expected
82+
83+
def test_body_exactly_max_msg_triggers_truncation(self):
84+
# len(body) == max_msg means the body >= max_msg branch fires
85+
max_msg = 50
86+
body = b"B" * max_msg
87+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=False)
88+
89+
client.send(body)
90+
91+
expected = body[: max_msg - 1] + b"\n"
92+
assert bytes(sock.sent) == expected
93+
94+
95+
class TestOctetCountedFraming:
96+
"""octet_counted_framing=True — must produce RFC 6587 section 3.4.1 frames."""
97+
98+
def test_small_multiline_body_framed_correctly(self):
99+
body = b"<13>1 2024-01-01T00:00:00Z host app - - line1\nline2\nline3\n"
100+
max_msg = len(body) + 50
101+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
102+
103+
client.send(body)
104+
105+
wire = bytes(sock.sent)
106+
sp = wire.index(b" ")
107+
advertised_len = int(wire[:sp])
108+
sent_body = wire[sp + 1 :]
109+
110+
assert advertised_len == len(body)
111+
assert sent_body == body
112+
113+
def test_small_body_no_trailing_newline_outside_frame(self):
114+
body = b"<13>1 2024-01-01T00:00:00Z host app - - msg\n"
115+
max_msg = len(body) + 50
116+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
117+
118+
client.send(body)
119+
120+
wire = bytes(sock.sent)
121+
sp = wire.index(b" ")
122+
n = int(wire[:sp])
123+
# Everything after the prefix+space+body must be empty (no stray newline)
124+
assert len(wire) == sp + 1 + n
125+
126+
def test_oversize_body_truncated_and_prefix_matches(self):
127+
body = b"X" * 300
128+
max_msg = 100
129+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
130+
131+
client.send(body)
132+
133+
wire = bytes(sock.sent)
134+
sp = wire.index(b" ")
135+
advertised_len = int(wire[:sp])
136+
sent_body = wire[sp + 1 :]
137+
138+
assert advertised_len == len(sent_body)
139+
assert len(sent_body) <= max_msg
140+
assert sent_body == body[:max_msg]
141+
142+
def test_back_to_back_oversize_messages_no_desync(self):
143+
"""Two oversize messages on the same connection must parse without desync."""
144+
body1 = b"<13>1 2024-01-01T00:00:00Z h a - - " + b"M" * 300 + b"\n"
145+
body2 = b"<14>1 2024-01-01T00:00:01Z h b - - short\n"
146+
max_msg = 60
147+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
148+
149+
client.send(body1)
150+
client.send(body2)
151+
152+
frames = _parse_octet_counted_frames(bytes(sock.sent))
153+
assert len(frames) == 2
154+
assert frames[0] == body1[:max_msg]
155+
assert frames[1] == body2[:max_msg]
156+
157+
def test_very_small_max_msg_produces_self_consistent_frame(self):
158+
"""Even with max_msg=8 the frame prefix must equal the body length."""
159+
body = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ\n"
160+
max_msg = 8
161+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
162+
163+
client.send(body)
164+
165+
frames = _parse_octet_counted_frames(bytes(sock.sent))
166+
assert len(frames) == 1
167+
assert len(frames[0]) <= max_msg
168+
169+
@pytest.mark.parametrize("max_msg", [1, 4, 8, 16, 32])
170+
def test_various_small_max_msg_always_self_consistent(self, max_msg: int):
171+
body = b"Hello, multi\nline\nbody\n" * 5
172+
client, sock = _make_client(max_msg=max_msg, octet_counted_framing=True)
173+
174+
client.send(body)
175+
176+
frames = _parse_octet_counted_frames(bytes(sock.sent))
177+
assert len(frames) == 1
178+
assert len(frames[0]) <= max_msg

0 commit comments

Comments
 (0)