Skip to content

Commit e5fe70e

Browse files
authored
Merge pull request #77 from KaminariOS/log_dedup
Add log dedup logic
2 parents 6bb38d5 + 0a09f27 commit e5fe70e

4 files changed

Lines changed: 335 additions & 8 deletions

File tree

aiopslab/orchestrator/actions/base.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@
1515
from aiopslab.observer.metric_api import PrometheusAPI
1616
from aiopslab.observer.trace_api import TraceAPI
1717

18+
from aiopslab.orchestrator.actions.log_deduplication import greedy_compress_lines
19+
20+
import re
21+
22+
LOG_COMMAND_PATTERN: str = (
23+
r"\b(?:"
24+
r"kubectl\s+(?:logs|get\s+events|describe|get\s+\S+\s+-w)" # logs/events/describe/watch
25+
r"|docker\s+(?:logs|events)" # docker logs/events
26+
r")\b(?:[^\n]*)"
27+
)
1828

1929
class TaskActions:
2030
"""Base class for task actions."""
@@ -60,6 +70,7 @@ def get_logs(namespace: str, service: str) -> str:
6070
except Exception as e:
6171
return "Error: Your service/namespace does not exist. Use kubectl to check."
6272

73+
logs = greedy_compress_lines(logs)
6374
print(logs)
6475

6576
return logs
@@ -90,7 +101,14 @@ def exec_shell(command: str, timeout: int = 30) -> str:
90101
if pattern in command:
91102
return error
92103

93-
return Shell.exec(command, timeout=timeout)
104+
result = Shell.exec(command)
105+
106+
if re.search(LOG_COMMAND_PATTERN, command):
107+
result = greedy_compress_lines(result)
108+
109+
print(result)
110+
111+
return result
94112

95113
@staticmethod
96114
@read
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
#!/usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
4+
import re
5+
import sys
6+
import argparse
7+
from pathlib import Path
8+
import os
9+
10+
# -------------------------------
11+
# Timestamp detection
12+
# -------------------------------
13+
DEFAULT_TIMESTAMP_REGEX = (
14+
r"(?:"
15+
# ISO-like: 2025-09-24 18:41:09 or 2025-09-24T18:41:09Z
16+
r"\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:\d{2})?)"
17+
r"|"
18+
# Abbreviated month: 2025-Sep-24 18:41:09.830218
19+
r"\d{4}-[A-Z][a-z]{2}-\d{2} \d{2}:\d{2}:\d{2}(?:\.\d+)?"
20+
r"|"
21+
# Time only: 18:41:09
22+
r"\b\d{2}:\d{2}:\d{2}\b"
23+
r"|"
24+
# Durations: 10m5s, 30s
25+
r"\b\d+m(?:\d+s)?\b"
26+
r"|"
27+
r"\b\d+s\b"
28+
r"|"
29+
# syslog-like: Wed Sep 24 18:41:09 2025
30+
r"[A-Z][a-z]{2} [A-Z][a-z]{2} \d{2} \d{2}:\d{2}:\d{2} \d{4}"
31+
r")"
32+
)
33+
34+
DEFAULT_TS_RX = re.compile(DEFAULT_TIMESTAMP_REGEX)
35+
36+
def find_timestamp_spans(line: str, ts_rx: re.Pattern[str]) -> list[tuple[int, int]]:
37+
return [m.span() for m in ts_rx.finditer(line)]
38+
39+
def make_blocks(lines: list[str], block_size: int) -> list[str]:
40+
if block_size <= 0:
41+
raise ValueError("block_size must be positive")
42+
return ["\n".join(lines[i:i+block_size]) for i in range(0, len(lines), block_size)]
43+
44+
# -------------------------------
45+
# Greedy compressor (single pass)
46+
# -------------------------------
47+
def greedy_compress_pass(
48+
lines: list[str],
49+
ts_rx: re.Pattern[str],
50+
block_size: int
51+
) -> list[str]:
52+
"""Run greedy timestamp dedup for a single block size."""
53+
if not lines:
54+
return []
55+
56+
blocks = make_blocks(lines, block_size)
57+
result: list[str] = [blocks[0]]
58+
prev_spans: list[tuple[int, int]] | None = find_timestamp_spans(blocks[0], ts_rx)
59+
60+
for block in blocks[1:]:
61+
spans = find_timestamp_spans(block, ts_rx)
62+
63+
if not prev_spans or not spans:
64+
result.append(block)
65+
prev_spans = spans
66+
continue
67+
68+
if len(spans) != len(prev_spans):
69+
result.append(block)
70+
prev_spans = spans
71+
continue
72+
73+
if [s[0] for s in spans] != [s[0] for s in prev_spans]:
74+
result.append(block)
75+
prev_spans = spans
76+
continue
77+
78+
def mask_timestamps(text: str, spans: list[tuple[int, int]]) -> str:
79+
parts: list[str] = []
80+
last_end = 0
81+
for start, end in spans:
82+
parts.append(text[last_end:start])
83+
# Replace timestamp with spaces of equal length
84+
parts.append(" " * (end - start))
85+
last_end = end
86+
parts.append(text[last_end:])
87+
return "".join(parts)
88+
89+
prev_masked = mask_timestamps(result[-1], prev_spans)
90+
curr_masked = mask_timestamps(block, spans)
91+
92+
if prev_masked == curr_masked:
93+
# Replace last block if only timestamps differ
94+
result[-1] = block
95+
else:
96+
# print(f"prev: {prev_masked}\ncurr: {curr_masked}")
97+
result.append(block)
98+
99+
prev_spans = spans
100+
101+
return result
102+
103+
# -------------------------------
104+
# Multi-pass driver
105+
# -------------------------------
106+
def greedy_compress_lines(
107+
raw_str: str,
108+
ts_rx: re.Pattern[str] = DEFAULT_TS_RX,
109+
) -> str:
110+
"""
111+
Run greedy dedup with passes from block_size=1 up to block_size=window_size.
112+
window_size = LOG_TRIM if LOG_TRIM is int or trimming is disabled
113+
"""
114+
log_trim = None
115+
try:
116+
value = os.environ.get("LOG_TRIM")
117+
log_trim = int(value) if value is not None else None
118+
except ValueError:
119+
log_trim = None
120+
if not log_trim:
121+
return raw_str
122+
window_size = log_trim
123+
lines = raw_str.splitlines()
124+
result = lines[:]
125+
for size in range(1, window_size + 1):
126+
result = greedy_compress_pass(result, ts_rx, size)
127+
return "\n".join(result)
128+
129+
# -------------------------------
130+
# CLI
131+
# -------------------------------
132+
def main():
133+
ap = argparse.ArgumentParser(
134+
description="Greedy log deduplicator with multi-pass window size support."
135+
)
136+
ap.add_argument("input", help="Path to input text file")
137+
ap.add_argument("output", help="Path to output text file")
138+
ap.add_argument(
139+
"--timestamp-regex",
140+
default=DEFAULT_TIMESTAMP_REGEX,
141+
help="Regex for timestamps. Default matches ISO, RFC, and k8s durations.",
142+
)
143+
ap.add_argument(
144+
"--window-size",
145+
type=int,
146+
default=2,
147+
help="Maximum block size for multi-pass deduplication. Default: 2",
148+
)
149+
args = ap.parse_args()
150+
151+
in_path = Path(args.input)
152+
out_path = Path(args.output)
153+
154+
if not in_path.exists():
155+
print(f"Input file not found: {in_path}", file=sys.stderr)
156+
sys.exit(1)
157+
158+
raw_text = in_path.read_text(encoding="utf-8", errors="replace")
159+
lines = raw_text
160+
161+
try:
162+
ts_rx = re.compile(args.timestamp_regex)
163+
except re.error as e:
164+
print(f"Invalid timestamp regex: {e}", file=sys.stderr)
165+
sys.exit(1)
166+
167+
deduped = greedy_compress_lines(lines, ts_rx, args.window_size)
168+
out_path.parent.mkdir(parents=True, exist_ok=True)
169+
out_path.write_text(deduped, encoding="utf-8")
170+
171+
if __name__ == "__main__":
172+
main()
173+

aiopslab/service/shell.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,10 @@ def local_exec(command: str, input_data=None, cwd=None, timeout=30):
5555

5656
if out.returncode != 0:
5757
error_message = out.stderr.decode("utf-8")
58-
print(f"[ERROR] Command execution failed: {error_message}")
58+
error_message = f"[ERROR] Command execution failed: {error_message}"
5959
return error_message
6060
else:
6161
output_message = out.stdout.decode("utf-8") + out.stderr.decode("utf-8")
62-
print(output_message)
6362
return output_message
6463

6564
except Exception as e:
@@ -89,7 +88,6 @@ def ssh_exec(host: str, user: str, ssh_key_path: str, command: str, timeout=30):
8988
return error_message
9089
else:
9190
output_message = stdout.read().decode("utf-8")
92-
print(output_message)
9391
return output_message
9492

9593
except Exception as e:
@@ -116,12 +114,10 @@ def docker_exec(container_name: str, command: str, timeout=30):
116114

117115
if out.stderr or out.returncode != 0:
118116
error_message = out.stderr.decode("utf-8")
119-
print(f"[ERROR] Docker command execution failed: {error_message}")
120-
return error_message
117+
return f"[ERROR] Docker command execution failed: {error_message}"
121118
else:
122119
output_message = out.stdout.decode("utf-8")
123-
print(output_message)
124120
return output_message
125121

126122
except Exception as e:
127-
raise RuntimeError(f"Failed to execute command in Docker container: {container_name}\nError: {str(e)}")
123+
raise RuntimeError(f"Failed to execute command in Docker container: {container_name}\nError: {str(e)}")

tests/orchestrator/log_dedup.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# test_log_deduplication.py
2+
import os
3+
import unittest
4+
from aiopslab.orchestrator.actions import log_deduplication as ld
5+
6+
7+
class TestTimestampDetection(unittest.TestCase):
8+
def test_find_timestamp_spans_iso(self):
9+
line = "2025-09-24 18:41:09 some log message"
10+
spans = ld.find_timestamp_spans(line, ld.DEFAULT_TS_RX)
11+
self.assertEqual(len(spans), 1)
12+
start, end = spans[0]
13+
self.assertEqual(line[start:end], "2025-09-24 18:41:09")
14+
15+
def test_find_timestamp_spans_multiple(self):
16+
line = "Start 2025-09-24T18:41:09Z end 2025-09-24T19:00:00Z"
17+
spans = ld.find_timestamp_spans(line, ld.DEFAULT_TS_RX)
18+
self.assertEqual(len(spans), 2)
19+
20+
def test_find_timestamp_spans_no_match(self):
21+
line = "no timestamp here"
22+
spans = ld.find_timestamp_spans(line, ld.DEFAULT_TS_RX)
23+
self.assertEqual(spans, [])
24+
25+
26+
class TestMakeBlocks(unittest.TestCase):
27+
def test_make_blocks_basic(self):
28+
lines = ["a", "b", "c", "d", "e"]
29+
blocks = ld.make_blocks(lines, 2)
30+
self.assertEqual(blocks, ["a\nb", "c\nd", "e"])
31+
32+
def test_make_blocks_invalid(self):
33+
with self.assertRaises(ValueError):
34+
ld.make_blocks(["x"], 0)
35+
36+
37+
class TestGreedyCompressPass(unittest.TestCase):
38+
def setUp(self):
39+
self.ts_rx = ld.DEFAULT_TS_RX
40+
41+
def test_no_lines(self):
42+
result = ld.greedy_compress_pass([], self.ts_rx, 1)
43+
self.assertEqual(result, [])
44+
45+
def test_dedup_identical_except_timestamps(self):
46+
lines = [
47+
'{"time":"2025-09-25T04:36:33Z","msg":"foo"}',
48+
'{"time":"2025-09-25T04:36:34Z","msg":"foo"}',
49+
]
50+
result = ld.greedy_compress_pass(lines, self.ts_rx, 1)
51+
self.assertEqual(len(result), 1)
52+
self.assertIn('"foo"', result[0])
53+
54+
def test_different_messages_not_deduped(self):
55+
lines = [
56+
'{"time":"2025-09-25T04:36:33Z","msg":"foo"}',
57+
'{"time":"2025-09-25T04:36:33Z","msg":"bar"}',
58+
]
59+
result = ld.greedy_compress_pass(lines, self.ts_rx, 1)
60+
self.assertEqual(len(result), 2)
61+
62+
63+
class TestGreedyCompressLines(unittest.TestCase):
64+
def setUp(self):
65+
self.ts_rx = ld.DEFAULT_TS_RX
66+
67+
def test_env_disabled(self):
68+
"""Should return input unchanged when LOG_TRIM is unset or invalid."""
69+
os.environ.pop("LOG_TRIM", None)
70+
text = "some logs\nmore logs"
71+
result = ld.greedy_compress_lines(text)
72+
self.assertEqual(result, text)
73+
74+
def test_env_enabled(self):
75+
"""Should dedup when LOG_TRIM is set."""
76+
os.environ["LOG_TRIM"] = "2"
77+
text = (
78+
'{"time":"2025-09-25T04:36:33Z","message":"TLS disabled."}\n'
79+
'{"time":"2025-09-25T04:36:34Z","message":"TLS disabled."}\n'
80+
)
81+
result = ld.greedy_compress_lines(text, self.ts_rx)
82+
# Expect only one entry remains after deduplication
83+
self.assertIn("TLS disabled", result)
84+
self.assertEqual(result.count("TLS disabled"), 1)
85+
86+
def test_invalid_env(self):
87+
"""Invalid LOG_TRIM should skip dedup."""
88+
os.environ["LOG_TRIM"] = "invalid"
89+
text = "hello world"
90+
result = ld.greedy_compress_lines(text)
91+
self.assertEqual(result, text)
92+
93+
94+
class TestRealisticLogs(unittest.TestCase):
95+
"""Integration-style tests using the provided log sequences."""
96+
97+
def setUp(self):
98+
self.ts_rx = ld.DEFAULT_TS_RX
99+
os.environ["LOG_TRIM"] = "3"
100+
101+
def test_dedup_kubernetes_events(self):
102+
sample = """
103+
Error: Your service/namespace does not exist. Use kubectl to check.
104+
{"level":"info","time":"2025-09-25T04:36:33Z","message":"TLS disabled."}
105+
{"level":"info","time":"2025-09-25T04:36:33Z","message":"TLS disabled."}
106+
{"level":"info","time":"2025-09-25T04:36:34Z","message":"TLS disabled."}
107+
"""
108+
result = ld.greedy_compress_lines(sample.strip(), self.ts_rx)
109+
# Should remove duplicate lines differing only by timestamps
110+
self.assertIn("TLS disabled", result)
111+
self.assertEqual(result.count("TLS disabled"), 1)
112+
113+
def test_k8s_container_events(self):
114+
sample = """
115+
2025-09-25T04:36:33Z INF cmd/geo/db.go:29 > New session successfull...
116+
2025-09-25T04:36:34Z INF cmd/geo/db.go:29 > New session successfull...
117+
2025-09-25T04:36:35Z INF cmd/geo/db.go:31 > Generating test data...
118+
"""
119+
result = ld.greedy_compress_lines(sample.strip(), self.ts_rx)
120+
# Should collapse identical log messages differing only by timestamp
121+
self.assertIn("New session successfull...", result)
122+
self.assertEqual(result.count("New session successfull..."), 1)
123+
# Should preserve distinct lines
124+
self.assertIn("Generating test data...", result)
125+
126+
def test_multiblock_dedup(self):
127+
sample = "\n".join(
128+
[
129+
f'{{"level":"info","time":"2025-09-25T04:37:{i:02d}Z","message":"Tune: setGCPercent to 100"}}'
130+
for i in range(3)
131+
]
132+
)
133+
result = ld.greedy_compress_lines(sample, self.ts_rx)
134+
self.assertIn("Tune: setGCPercent to 100", result)
135+
self.assertEqual(result.count("Tune: setGCPercent to 100"), 1)
136+
137+
138+
if __name__ == "__main__":
139+
unittest.main()
140+

0 commit comments

Comments
 (0)