Skip to content

Commit 4ecbb72

Browse files
committed
Add support for adding callables as extra tags
1 parent 1dc706a commit 4ecbb72

5 files changed

Lines changed: 213 additions & 16 deletions

File tree

README.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,65 @@ logger = logging.getLogger("my-logger")
8888
logger.addHandler(handler)
8989
logger.error(...)
9090
```
91+
92+
Adding extra callable tags
93+
--------------------------
94+
95+
Having a prior definition of:
96+
```python
97+
import logging
98+
import logging_loki
99+
from multiprocessing import Queue
100+
from myapp.tracing import tracer
101+
102+
get_context = lambda: tracer.active_span.context
103+
add_trace_id = lambda: hex(get_context().trace_id)[
104+
2:] if tracer is not None and tracer.active_span is not None else None
105+
add_span_id = lambda: hex(get_context().span_id)[2:] if tracer is not None and tracer.active_span else None
106+
```
107+
108+
If you want to add extra span IDs or trace IDs do the following:
109+
110+
```python
111+
handler = logging_loki.LokiQueueHandler(
112+
Queue(-1),
113+
url="https://my-loki-instance/loki/api/v1/push",
114+
tags={"application": "my-app", 'span_id': add_span_id, 'trace_id': add_trace_id},
115+
auth=("username", "password"),
116+
version="1"
117+
)
118+
logger = logging.getLogger("my-logger")
119+
logger.addHandler(handler)
120+
logger.error(...)
121+
```
122+
123+
Basically if your callable returns a non-None value, it will be added as a tag. No casting to string will be made.
124+
125+
You can use also the blocking approach of:
126+
127+
```python
128+
handler = logging_loki.LokiHandler(
129+
url="https://my-loki-instance/loki/api/v1/push",
130+
tags={"application": "my-app", "trace_id": add_trace_id, "span_id": add_span_id},
131+
auth=("username", "password"),
132+
version="1",
133+
)
134+
135+
logger = logging.getLogger("my-logger")
136+
logger.addHandler(handler)
137+
logger.error(
138+
"Something happened",
139+
extra={"tags": {"service": "my-service"}},
140+
)
141+
```
142+
143+
144+
Supplying extra tags
145+
--------------------
146+
147+
If you want to supply extra tags, you can do it twofold:
148+
149+
```python
150+
logger.error('Something happened', extra={'test': 4})
151+
logger.error('Something happened', extra={'tags': {'test': 4}})
152+
```

logging_loki/emitter.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
BasicAuth = Optional[Tuple[str, str]]
2121

2222

23+
KEYS_TO_SKIP = {'severity', 'logger', 'msg', 'message', 'tags', 'lineno'}
24+
25+
2326
class LokiEmitter(abc.ABC):
2427
"""Base Loki emitter class."""
2528

@@ -30,6 +33,17 @@ class LokiEmitter(abc.ABC):
3033
label_replace_with = const.label_replace_with
3134
session_class = requests.Session
3235

36+
@staticmethod
37+
def get_entry_labels(record: logging.LogRecord, line: int) -> dict:
38+
labels = {}
39+
for key, value in record.__dict__.items():
40+
if key in KEYS_TO_SKIP:
41+
continue
42+
if value:
43+
labels[key] = value
44+
labels['line_no'] = line
45+
return labels
46+
3347
def __init__(self, url: str, tags: Optional[dict] = None, auth: BasicAuth = None):
3448
"""
3549
Create new Loki emitter.
@@ -88,8 +102,13 @@ def format_label(self, label: str) -> str:
88102

89103
def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
90104
"""Return tags that must be send to Loki with a log record."""
91-
tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags
92-
tags = copy.deepcopy(tags)
105+
old_tags = dict(self.tags) if isinstance(self.tags, ConvertingDict) else self.tags
106+
tags = {}
107+
for key, value in old_tags.items():
108+
if callable(value):
109+
value = value()
110+
if value:
111+
tags[key] = value
93112
tags[self.level_tag] = record.levelname.lower()
94113
tags[self.logger_tag] = record.name
95114

@@ -99,9 +118,8 @@ def build_tags(self, record: logging.LogRecord) -> Dict[str, Any]:
99118

100119
for tag_name, tag_value in extra_tags.items():
101120
cleared_name = self.format_label(tag_name)
102-
if cleared_name:
121+
if cleared_name and tag_value:
103122
tags[cleared_name] = tag_value
104-
105123
return tags
106124

107125

@@ -138,6 +156,6 @@ def build_payload(self, record: logging.LogRecord, line) -> dict:
138156
ts = str(int(time.time() * ns))
139157
stream = {
140158
"stream": labels,
141-
"values": [[ts, line]],
159+
"values": [[ts, line, LokiEmitter.get_entry_labels(record, line)]],
142160
}
143161
return {"streams": [stream]}

logging_loki/handlers.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,60 @@
11
# -*- coding: utf-8 -*-
2-
2+
import copy
33
import logging
44
import warnings
55
from logging.handlers import QueueHandler
66
from logging.handlers import QueueListener
77
from queue import Queue
8-
from typing import Dict
8+
from typing import Dict, Callable, Any, Union
99
from typing import Optional
1010
from typing import Type
1111

1212
from logging_loki import const
1313
from logging_loki import emitter
1414

1515

16-
class LokiQueueHandler(QueueHandler):
16+
class TagMixin:
17+
"""
18+
A mixin class to support callable tags.
19+
20+
This is to be inherited from as a first class, eg
21+
>>> class Handler(TagMixin, logging.Handler):
22+
>>> pass
23+
"""
24+
25+
def __init__(self, tags=None):
26+
self.tags = tags or {}
27+
28+
def prepare(self, record):
29+
# This is invoked in the same thread in which logging is invoked
30+
# assume the second class has a proper solution for prepare()
31+
try:
32+
record = self.__class__.__bases__[1].prepare(self, record)
33+
except AttributeError: # logging.Handler has no prepare
34+
pass
35+
record.tags = getattr(record, 'tags', {})
36+
for key, value in (self.tags | record.tags).items():
37+
if callable(value):
38+
value = value()
39+
if value is None:
40+
continue
41+
record.__dict__[key] = value
42+
return record
43+
44+
45+
class LokiQueueHandler(TagMixin, QueueHandler):
1746
"""This handler automatically creates listener and `LokiHandler` to handle logs queue."""
1847

1948
def __init__(self, queue: Queue, **kwargs):
2049
"""Create new logger handler with the specified queue and kwargs for the `LokiHandler`."""
21-
super().__init__(queue)
50+
QueueHandler.__init__(self, queue)
51+
TagMixin.__init__(self, kwargs.get("tags"))
2252
self.handler = LokiHandler(**kwargs) # noqa: WPS110
2353
self.listener = QueueListener(self.queue, self.handler)
2454
self.listener.start()
2555

2656

27-
class LokiHandler(logging.Handler):
57+
class LokiHandler(TagMixin, logging.Handler):
2858
"""
2959
Log handler that sends log records to Loki.
3060
@@ -39,7 +69,7 @@ class LokiHandler(logging.Handler):
3969
def __init__(
4070
self,
4171
url: str,
42-
tags: Optional[dict] = None,
72+
tags: Optional[Dict[str, Union[Any, Callable]]] = None,
4373
auth: Optional[emitter.BasicAuth] = None,
4474
version: Optional[str] = None,
4575
):
@@ -53,7 +83,8 @@ def __init__(
5383
version: Version of Loki emitter to use.
5484
5585
"""
56-
super().__init__()
86+
logging.Handler.__init__(self)
87+
TagMixin.__init__(self, tags)
5788

5889
if version is None and const.emitter_ver == "0":
5990
msg = (
@@ -65,9 +96,10 @@ def __init__(
6596
warnings.warn(" ".join(msg), DeprecationWarning)
6697

6798
version = version or const.emitter_ver
68-
if version not in self.emitters:
69-
raise ValueError("Unknown emitter version: {0}".format(version))
70-
self.emitter = self.emitters[version](url, tags, auth)
99+
try:
100+
self.emitter = self.emitters[version](url, tags, auth)
101+
except KeyError as exc:
102+
raise ValueError("Unknown emitter version: {0}".format(version)) from exc
71103

72104
def handleError(self, record): # noqa: N802
73105
"""Close emitter and let default handler take actions on error."""
@@ -76,8 +108,9 @@ def handleError(self, record): # noqa: N802
76108

77109
def emit(self, record: logging.LogRecord):
78110
"""Send log record to Loki."""
111+
record = self.prepare(record)
79112
# noinspection PyBroadException
80113
try:
81-
self.emitter(record, self.format(record))
114+
self.emitter(record, record.lineno)
82115
except Exception:
83116
self.handleError(record)

tests/test_emitter_v1.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,4 @@ def test_can_build_tags_from_converting_dict(emitter_v1):
176176
logger = logging.getLogger(logger_name)
177177
emitter: LokiEmitterV1 = logger.handlers[0].handler.emitter
178178
emitter.build_tags(create_record())
179+
payload = emitter.build_payload(create_record(), 10)

tests/test_real_logs.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import time
2+
3+
from logging_loki.emitter import LokiEmitter, LokiEmitterV1
4+
import logging
5+
import logging_loki
6+
7+
8+
def test_callable_tags():
9+
class MyEmitter(LokiEmitterV1):
10+
11+
def build_payload(self, record, line) -> dict:
12+
labels = self.build_tags(record)
13+
ns = 1e9
14+
ts = str(int(time.time() * ns))
15+
stream = {
16+
"stream": labels,
17+
"values": [[ts, line, self.get_entry_labels(record, line)]],
18+
}
19+
return {"streams": [stream]}
20+
21+
def __call__(self, record, line_no):
22+
payload = self.build_payload(record, line_no)
23+
stream = payload['streams'][0]['values'][0][2]
24+
assert 'application' in stream
25+
assert stream['value'] == 5
26+
assert stream['device'] == 'test'
27+
assert stream['levelname'] == 'WARNING'
28+
29+
# Register a mock emitter
30+
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter
31+
32+
handler = logging_loki.LokiHandler(
33+
url="https://example.com/loki/api/v1/push",
34+
tags={"application": "my-app", 'value': lambda: 5},
35+
auth=("username", "password"),
36+
version="mock_emitter"
37+
)
38+
logger = logging.getLogger("my-logger")
39+
logger.addHandler(handler)
40+
logger.warning('Error occurred', extra={'device': 'test'})
41+
42+
43+
import time
44+
45+
from logging_loki.emitter import LokiEmitter, LokiEmitterV1
46+
import logging
47+
import logging_loki
48+
49+
50+
def test_callable_tags():
51+
class MyEmitter(LokiEmitterV1):
52+
53+
def build_payload(self, record, line) -> dict:
54+
labels = self.build_tags(record)
55+
ns = 1e9
56+
ts = str(int(time.time() * ns))
57+
stream = {
58+
"stream": labels,
59+
"values": [[ts, line, self.get_entry_labels(record, line)]],
60+
}
61+
return {"streams": [stream]}
62+
63+
def __call__(self, record, line_no):
64+
payload = self.build_payload(record, line_no)
65+
stream = payload['streams'][0]['values'][0][2]
66+
assert 'application' in stream
67+
assert stream['value'] == 5
68+
assert stream['device'] == 'test'
69+
assert stream['levelname'] == 'WARNING'
70+
71+
# Register a mock emitter
72+
logging_loki.LokiHandler.emitters['mock_emitter'] = MyEmitter
73+
74+
handler = logging_loki.LokiHandler(
75+
url="https://example.com/loki/api/v1/push",
76+
tags={"application": "my-app", 'value': lambda: 5},
77+
auth=("username", "password"),
78+
version="mock_emitter"
79+
)
80+
logger = logging.getLogger("my-logger")
81+
logger.addHandler(handler)
82+
logger.warning('Error occurred', extra={'tags': {'device': 'test'}})
83+
logger.warning('Error occurred', extra={'device': 'test'})

0 commit comments

Comments
 (0)