Skip to content

Commit 3ab28b2

Browse files
committed
refactor sink && add perf ut case
1 parent 4c7acdf commit 3ab28b2

8 files changed

Lines changed: 489 additions & 50 deletions

File tree

objwatch/event_handls.py

Lines changed: 94 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import sys
55
import json
6+
import time
67
import signal
78
import atexit
89
from functools import lru_cache
@@ -88,8 +89,24 @@ def _log_event(self, lineno: int, event_type: EventType, message: str, call_dept
8889
call_depth (int): Current depth of the call stack.
8990
index_info (str): Information about the index to track in a multi-process environment.
9091
"""
92+
# For backward compatibility, still format the message for direct logging
9193
prefix = self._generate_prefix(lineno, call_depth)
92-
log_debug(f"{index_info}{prefix}{event_type.label} {message}")
94+
formatted_msg = f"{index_info}{prefix}{event_type.label} {message}"
95+
96+
# But also include raw event data for lazy serialization in sinks like ZeroMQSink
97+
log_debug(
98+
formatted_msg,
99+
extra={
100+
'raw_event': {
101+
'event_type': event_type.label,
102+
'lineno': lineno,
103+
'call_depth': call_depth,
104+
'index_info': index_info,
105+
'message': message,
106+
'timestamp': time.time(),
107+
}
108+
},
109+
)
93110

94111
def _add_json_event(self, event_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
95112
"""
@@ -137,7 +154,26 @@ def _handle_collection_change(
137154
diff_msg = f" ({value_type.__name__})(len){old_value_len} -> {current_value_len}"
138155
logger_msg = f"{class_name}.{key}{diff_msg}"
139156

140-
self._log_event(lineno, event_type, logger_msg, call_depth, index_info)
157+
# Log with raw event data for lazy serialization
158+
import time
159+
160+
raw_event = {
161+
'event_type': event_type.label,
162+
'lineno': lineno,
163+
'call_depth': call_depth,
164+
'index_info': index_info,
165+
'class_name': class_name,
166+
'key': key,
167+
'value_type': value_type,
168+
'old_value_len': old_value_len,
169+
'current_value_len': current_value_len,
170+
'timestamp': time.time(),
171+
}
172+
173+
# For backward compatibility, still format the message for direct logging
174+
prefix = self._generate_prefix(lineno, call_depth)
175+
formatted_msg = f"{index_info}{prefix}{event_type.label} {logger_msg}"
176+
log_debug(formatted_msg, extra={'raw_event': raw_event})
141177

142178
if self.output_json:
143179
self._add_json_event(
@@ -173,21 +209,33 @@ def handle_run(
173209
func_data['call_msg'] = call_msg
174210
logger_msg += ' <- ' + call_msg
175211

176-
self._log_event(lineno, EventType.RUN, logger_msg, call_depth, index_info)
212+
# Log with raw event data for lazy serialization
213+
import time
214+
215+
raw_event = {
216+
'event_type': EventType.RUN.label,
217+
'lineno': lineno,
218+
'call_depth': call_depth,
219+
'index_info': index_info,
220+
'func_info': func_info,
221+
'timestamp': time.time(),
222+
}
223+
224+
# For backward compatibility, still format the message for direct logging
225+
prefix = self._generate_prefix(lineno, call_depth)
226+
formatted_msg = f"{index_info}{prefix}{EventType.RUN.label} {logger_msg}"
227+
log_debug(formatted_msg, extra={'raw_event': raw_event})
177228

178229
if self.output_json:
179-
function_event = self._add_json_event('Function', func_data)
230+
function_event = self._add_json_event(
231+
'Function',
232+
func_data,
233+
)
180234
# Push the function's events list to the stack to maintain hierarchy
181235
self.current_node.append(function_event['events'])
182236

183237
def handle_end(
184-
self,
185-
lineno: int,
186-
func_info: dict,
187-
abc_wrapper: Optional[Any],
188-
call_depth: int,
189-
index_info: str,
190-
result: Any,
238+
self, lineno: int, func_info: dict, abc_wrapper: Optional[Any], call_depth: int, index_info: str, result: Any
191239
) -> None:
192240
"""
193241
Handle the 'end' event indicating the end of a function or method execution.
@@ -199,7 +247,22 @@ def handle_end(
199247
return_msg = abc_wrapper.wrap_return(func_info['symbol'], result)
200248
logger_msg += ' -> ' + return_msg
201249

202-
self._log_event(lineno, EventType.END, logger_msg, call_depth, index_info)
250+
# Log with raw event data for lazy serialization
251+
import time
252+
253+
raw_event = {
254+
'event_type': EventType.END.label,
255+
'lineno': lineno,
256+
'call_depth': call_depth,
257+
'index_info': index_info,
258+
'func_info': func_info,
259+
'timestamp': time.time(),
260+
}
261+
262+
# For backward compatibility, still format the message for direct logging
263+
prefix = self._generate_prefix(lineno, call_depth)
264+
formatted_msg = f"{index_info}{prefix}{EventType.END.label} {logger_msg}"
265+
log_debug(formatted_msg, extra={'raw_event': raw_event})
203266

204267
if self.output_json and len(self.current_node) > 1:
205268
# Find the corresponding function event in the parent node
@@ -248,7 +311,25 @@ def handle_upd(
248311
diff_msg = f" {old_msg} -> {current_msg}"
249312
logger_msg = f"{class_name}.{key}{diff_msg}"
250313

251-
self._log_event(lineno, EventType.UPD, logger_msg, call_depth, index_info)
314+
# Log with raw event data for lazy serialization
315+
import time
316+
317+
raw_event = {
318+
'event_type': EventType.UPD.label,
319+
'lineno': lineno,
320+
'call_depth': call_depth,
321+
'index_info': index_info,
322+
'class_name': class_name,
323+
'key': key,
324+
'old_value': old_value,
325+
'current_value': current_value,
326+
'timestamp': time.time(),
327+
}
328+
329+
# For backward compatibility, still format the message for direct logging
330+
prefix = self._generate_prefix(lineno, call_depth)
331+
formatted_msg = f"{index_info}{prefix}{EventType.UPD.label} {logger_msg}"
332+
log_debug(formatted_msg, extra={'raw_event': raw_event})
252333

253334
if self.output_json:
254335
self._add_json_event(

objwatch/events.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
# Copyright (c) 2025 aeeeeeep
33

44
from enum import Enum
5+
from dataclasses import dataclass, asdict
6+
from typing import Dict, Any, Optional
57

68

79
class EventType(Enum):
@@ -27,3 +29,43 @@ class EventType(Enum):
2729
def __init__(self, value):
2830
labels = {1: 'run', 2: 'end', 3: 'upd', 4: 'apd', 5: 'pop'}
2931
self.label = labels[value]
32+
33+
34+
@dataclass
35+
class LogEvent:
36+
"""
37+
Lightweight data structure for raw tracing events.
38+
Contains minimal data needed to reconstruct the final log message.
39+
"""
40+
41+
# Basic event information (all required, no defaults)
42+
timestamp: float
43+
event_type: str
44+
lineno: int
45+
call_depth: int
46+
47+
# Optional context information with defaults
48+
level: str = "INFO"
49+
index_info: str = ""
50+
51+
# Function information (for run/end events)
52+
func_info: Optional[Dict[str, Any]] = None
53+
54+
# Update information (for upd events)
55+
class_name: Optional[str] = None
56+
key: Optional[str] = None
57+
old_value: Optional[Any] = None
58+
current_value: Optional[Any] = None
59+
60+
# Collection change information (for apd/pop events)
61+
value_type: Optional[type] = None
62+
old_value_len: Optional[int] = None
63+
current_value_len: Optional[int] = None
64+
65+
# Additional metadata
66+
process_id: Optional[str] = None
67+
output_file: Optional[str] = None
68+
69+
def to_dict(self) -> Dict[str, Any]:
70+
"""Convert to dictionary, handling non-serializable types."""
71+
return asdict(self)

objwatch/sinks/consumer.py

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from typing import Dict, Any, Optional
1212
from collections import OrderedDict
1313

14+
from ..events import LogEvent
15+
from .formatter import Formatter
16+
1417

1518
class ZeroMQFileConsumer:
1619
"""
@@ -90,25 +93,30 @@ def _disconnect(self) -> None:
9093
self.context = None
9194
self.logger.info("Disconnected from ZeroMQ endpoint")
9295

93-
def _process_event(self, event: Dict[str, Any]) -> str:
96+
def _process_event(self, event_dict: Dict[str, Any]) -> str:
9497
"""
95-
Process the event into a string format suitable for logging.
98+
Process the event dictionary into a string format suitable for logging.
9699
97100
Args:
98-
event: The event dictionary to process
101+
event_dict: The event dictionary to process
99102
100103
Returns:
101104
str: Formatted log line
102105
"""
103-
# Extract event fields
104-
level = event.get('level', 'INFO')
105-
msg = event.get('msg', '')
106-
timestamp = event.get('time', time.time())
107-
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
108-
name = event.get('name', 'unknown')
109-
110-
# Format the log line similar to standard logging format
111-
return f"[{time_str}] [{level}] {name}: {msg}\n"
106+
# Check if this is a raw LogEvent (new format)
107+
if 'event_type' in event_dict and 'lineno' in event_dict and 'call_depth' in event_dict:
108+
# Convert dict to LogEvent object
109+
log_event = LogEvent(**event_dict)
110+
return Formatter.format(log_event)
111+
else:
112+
# Legacy format - keep for backward compatibility
113+
level = event_dict.get('level', 'INFO')
114+
msg = event_dict.get('msg', '')
115+
timestamp = event_dict.get('time', time.time())
116+
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
117+
name = event_dict.get('name', 'unknown')
118+
119+
return f"[{time_str}] [{level}] {name}: {msg}\n"
112120

113121
def _run(self) -> None:
114122
"""
@@ -430,24 +438,31 @@ def _disconnect(self) -> None:
430438
self.context = None
431439
self.logger.info("Disconnected from ZeroMQ endpoint")
432440

433-
def _process_event(self, event: Dict[str, Any]) -> str:
441+
def _process_event(self, event_dict: Dict[str, Any]) -> str:
434442
"""
435-
Process the event into a string format suitable for logging.
443+
Process the event dictionary into a string format suitable for logging.
436444
437445
Args:
438-
event: The event dictionary to process
446+
event_dict: The event dictionary to process
439447
440448
Returns:
441449
str: Formatted log line
442450
"""
443-
level = event.get('level', 'INFO')
444-
msg = event.get('msg', '')
445-
timestamp = event.get('time', time.time())
446-
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
447-
name = event.get('name', 'unknown')
448-
process_id = event.get('process_id', 'unknown')
449-
450-
return f"[{time_str}] [{level}] [PID:{process_id}] {name}: {msg}\n"
451+
# Check if this is a raw LogEvent (new format)
452+
if 'event_type' in event_dict and 'lineno' in event_dict and 'call_depth' in event_dict:
453+
# Convert dict to LogEvent object
454+
log_event = LogEvent(**event_dict)
455+
return Formatter.format(log_event)
456+
else:
457+
# Legacy format - keep for backward compatibility
458+
level = event_dict.get('level', 'INFO')
459+
msg = event_dict.get('msg', '')
460+
timestamp = event_dict.get('time', time.time())
461+
time_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
462+
name = event_dict.get('name', 'unknown')
463+
process_id = event_dict.get('process_id', 'unknown')
464+
465+
return f"[{time_str}] [{level}] [PID:{process_id}] {name}: {msg}\n"
451466

452467
def _run(self) -> None:
453468
"""

0 commit comments

Comments
 (0)