Skip to content

Commit 2b70503

Browse files
committed
Add support for adding callables as extra tags
1 parent 1dc706a commit 2b70503

5 files changed

Lines changed: 199 additions & 15 deletions

File tree

README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,65 @@ logger = logging.getLogger("my-logger")
8888
logger.addHandler(handler)
8989
logger.error(...)
9090
```
91+
92+
Adding extra callable tags
93+
--------------------------
94+
95+
Having a prior definition of:
96+
```python
97+
import logging
98+
import logging_loki
99+
from multiprocessing import Queue
100+
from myapp.tracing import tracer
101+
102+
get_context = lambda: tracer.active_span.context
103+
add_trace_id = lambda: hex(get_context().trace_id)[
104+
2:] if tracer is not None and tracer.active_span is not None else None
105+
add_span_id = lambda: hex(get_context().span_id)[2:] if tracer is not None and tracer.active_span else None
106+
```
107+
108+
If you want to add extra span IDs or trace IDs do the following:
109+
110+
```python
111+
handler = logging_loki.LokiQueueHandler(
112+
Queue(-1),
113+
url="https://my-loki-instance/loki/api/v1/push",
114+
tags={"application": "my-app", 'span_id': add_span_id, 'trace_id': add_trace_id},
115+
auth=("username", "password"),
116+
version="1"
117+
)
118+
logger = logging.getLogger("my-logger")
119+
logger.addHandler(handler)
120+
logger.error(...)
121+
```
122+
123+
Basically if your callable returns a non-None value, it will be added as a tag. No casting to string will be made.
124+
125+
You can use also the blocking approach of:
126+
127+
```python
128+
handler = logging_loki.LokiHandler(
129+
url="https://my-loki-instance/loki/api/v1/push",
130+
tags={"application": "my-app", "trace_id": add_trace_id, "span_id": add_span_id},
131+
auth=("username", "password"),
132+
version="1",
133+
)
134+
135+
logger = logging.getLogger("my-logger")
136+
logger.addHandler(handler)
137+
logger.error(
138+
"Something happened",
139+
extra={"tags": {"service": "my-service"}},
140+
)
141+
```
142+
143+
144+
Supplying extra tags
145+
--------------------
146+
147+
If you want to supply extra tags, you can do it twofold:
148+
149+
```python
150+
logger.error('Something happened', extra={'test': 4})
151+
logger.error('Something happened', extra={'tags': {'test': 4}})
152+
```

logging_loki/emitter.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
BasicAuth = Optional[Tuple[str, str]]
2121

2222

23+
KEYS_TO_SKIP = {'severity', 'logger', 'msg', 'message', 'tags', 'lineno'}
24+
25+
2326
class LokiEmitter(abc.ABC):
2427
"""Base Loki emitter class."""
2528

@@ -30,6 +33,17 @@ class LokiEmitter(abc.ABC):
3033
label_replace_with = const.label_replace_with
3134
session_class = requests.Session
3235

36+
@staticmethod
37+
def get_entry_labels(record: logging.LogRecord, line: int) -> dict:
38+
labels = {}
39+
for key, value in record.__dict__.items():
40+
if key in KEYS_TO_SKIP:
41+
continue
42+
if value:
43+
labels[key] = value
44+
labels['line_no'] = line
45+
return labels
46+
3347
def __init__(self, url: str, tags: Optional[dict] = None, auth: BasicAuth = None):
3448
"""
3549
Create new Loki emitter.
@@ -89,7 +103,6 @@ def format_label(self, label: str) -> str:
89103
def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
90104
"""Return tags that must be send to Loki with a log record."""
91105
tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags
92-
tags = copy.deepcopy(tags)
93106
tags[self.level_tag] = record.levelname.lower()
94107
tags[self.logger_tag] = record.name
95108

@@ -99,9 +112,8 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
99112

100113
for tag_name, tag_value in extra_tags.items():
101114
cleared_name = self.format_label(tag_name)
102-
if cleared_name:
115+
if cleared_name and tag_value:
103116
tags[cleared_name] = tag_value
104-
105117
return tags
106118

107119

@@ -138,6 +150,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
138150
ts = str(int(time.time() * ns))
139151
stream = {
140152
"stream": labels,
141-
"values": [[ts, line]],
153+
"values": [[ts, line, LokiEmitter.get_entry_labels(record, line)]],
142154
}
143155
return {"streams": [stream]}

logging_loki/handlers.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,60 @@
11
# -*- coding: utf-8 -*-
2-
2+
import copy
33
import logging
44
import warnings
55
from logging.handlers import QueueHandler
66
from logging.handlers import QueueListener
77
from queue import Queue
8-
from typing import Dict
8+
from typing import Dict, Callable, Any, Union
99
from typing import Optional
1010
from typing import Type
1111

1212
from logging_loki import const
1313
from logging_loki import emitter
1414

1515

16-
class LokiQueueHandler(QueueHandler):
16+
class TagMixin:
17+
"""
18+
A mixin class to support callable tags.
19+
20+
This is to be inherited from as a first class, eg
21+
>>> class Handler(TagMixin, logging.Handler):
22+
>>> pass
23+
"""
24+
25+
def __init__(self, tags=None):
26+
self.tags = tags or {}
27+
28+
def prepare(self, record):
29+
# This is invoked in the same thread in which logging is invoked
30+
# assume the second class has a proper solution for prepare()
31+
try:
32+
record = self.__class__.__bases__[1].prepare(self, record)
33+
except AttributeError: # logging.Handler has no prepare
34+
pass
35+
record.tags = getattr(record, 'tags', {})
36+
for key, value in (self.tags | record.tags).items():
37+
if callable(value):
38+
value = value()
39+
if value is None:
40+
continue
41+
record.__dict__[key] = value
42+
return record
43+
44+
45+
class LokiQueueHandler(TagMixin, QueueHandler):
1746
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""
1847

1948
def __init__(self, queue: Queue, **kwargs):
2049
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
21-
super().__init__(queue)
50+
QueueHandler.__init__(self, queue)
51+
TagMixin.__init__(self, kwargs.get("tags"))
2252
self.handler = LokiHandler(**kwargs) # noqa: WPS110
2353
self.listener = QueueListener(self.queue, self.handler)
2454
self.listener.start()
2555

2656

27-
class LokiHandler(logging.Handler):
57+
class LokiHandler(TagMixin, logging.Handler):
2858
"""
2959
Log handler that sends log records to Loki.
3060
@@ -39,7 +69,7 @@ class LokiHandler(logging.Handler):
3969
def __init__(
4070
self,
4171
url: str,
42-
tags: Optional[dict] = None,
72+
tags: Optional[Dict[str, Union[Any, Callable]]] = None,
4373
auth: Optional[emitter.BasicAuth] = None,
4474
version: Optional[str] = None,
4575
):
@@ -53,7 +83,8 @@ def __init__(
5383
version: Version of Loki emitter to use.
5484
5585
"""
56-
super().__init__()
86+
logging.Handler.__init__(self)
87+
TagMixin.__init__(self, tags)
5788

5889
if version is None and const.emitter_ver == "0":
5990
msg = (
@@ -65,9 +96,10 @@ def __init__(
6596
warnings.warn(" ".join(msg), DeprecationWarning)
6697

6798
version = version or const.emitter_ver
68-
if version not in self.emitters:
69-
raise ValueError("Unknown emitter version: {0}".format(version))
70-
self.emitter = self.emitters[version](url, tags, auth)
99+
try:
100+
self.emitter = self.emitters[version](url, tags, auth)
101+
except KeyError as exc:
102+
raise ValueError("Unknown emitter version: {0}".format(version)) from exc
71103

72104
def handleError(self, record): # noqa: N802
73105
"""Close emitter and let default handler take actions on error."""
@@ -76,8 +108,9 @@ def handleError(self, record): # noqa: N802
76108

77109
def emit(self, record: logging.LogRecord):
78110
"""Send log record to Loki."""
111+
record = self.prepare(record)
79112
# noinspection PyBroadException
80113
try:
81-
self.emitter(record, self.format(record))
114+
self.emitter(record, record.lineno)
82115
except Exception:
83116
self.handleError(record)

tests/test_emitter_v1.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,4 @@ def test_can_build_tags_from_converting_dict(emitter_v1):
176176
logger = logging.getLogger(logger_name)
177177
emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter
178178
emitter.build_tags(create_record())
179+
payload = emitter.build_payload(create_record(), 10)

tests/test_real_logs.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
def test_callable_tags():
2+
class MyEmitter(LokiEmitterV1):
3+
4+
def build_payload(self, record, line) -> dict:
5+
labels = self.build_tags(record)
6+
ns = 1e9
7+
ts = str(int(time.time() * ns))
8+
stream = {
9+
"stream": labels,
10+
"values": [[ts, line, self.get_entry_labels(record, line)]],
11+
}
12+
return {"streams": [stream]}
13+
14+
def __call__(self, record, line_no):
15+
payload = self.build_payload(record, line_no)
16+
stream = payload['streams'][0]['values'][0][2]
17+
assert 'application' in stream
18+
assert stream['value'] == 5
19+
assert stream['device'] == 'test'
20+
assert stream['levelname'] == 'WARNING'
21+
22+
# Register a mock emitter
23+
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter
24+
25+
handler = logging_loki.LokiHandler(
26+
url="https://example.com/loki/api/v1/push",
27+
tags={"application": "my-app", 'value': lambda: 5},
28+
auth=("username", "password"),
29+
version="mock_emitter"
30+
)
31+
logger = logging.getLogger("my-logger")
32+
logger.addHandler(handler)
33+
logger.warning('Error occurred', extra={'device': 'test'})
34+
35+
36+
import logging
37+
import time
38+
39+
import logging_loki
40+
from logging_loki.emitter import LokiEmitterV1
41+
42+
43+
def test_callable_tags():
44+
class MyEmitter(LokiEmitterV1):
45+
46+
def build_payload(self, record, line) -> dict:
47+
labels = self.build_tags(record)
48+
ns = 1e9
49+
ts = str(int(time.time() * ns))
50+
stream = {
51+
"stream": labels,
52+
"values": [[ts, line, self.get_entry_labels(record, line)]],
53+
}
54+
return {"streams": [stream]}
55+
56+
def __call__(self, record, line_no):
57+
payload = self.build_payload(record, line_no)
58+
stream = payload['streams'][0]['values'][0][2]
59+
assert 'application' in stream
60+
assert stream['value'] == 5
61+
assert stream['device'] == 'test'
62+
assert stream['levelname'] == 'WARNING'
63+
64+
# Register a mock emitter
65+
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter
66+
67+
handler = logging_loki.LokiHandler(
68+
url="https://example.com/loki/api/v1/push",
69+
tags={"application": "my-app", 'value': lambda: 5},
70+
auth=("username", "password"),
71+
version="mock_emitter"
72+
)
73+
logger = logging.getLogger("my-logger")
74+
logger.addHandler(handler)
75+
logger.warning('Error occurred', extra={'tags': {'device': 'test'}})
76+
logger.warning('Error occurred', extra={'device': 'test'})

0 commit comments

Comments
 (0)