Skip to content

Commit c334d76

Browse files
context.logger for task execute methods
1 parent 266b179 commit c334d76

9 files changed

Lines changed: 475 additions & 79 deletions

File tree

tilebox-workflows/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@ dependencies = [
3434
"python-dateutil>=2.9.0.post0",
3535
"obstore>=0.8.2",
3636
"opentelemetry-proto>=1.30.0",
37-
"structlog>=25.5.0",
3837
# grpcio 1.80.0 contains unwanted log message spam: https://github.com/grpc/grpc/issues/42293
3938
"grpcio<1.80.0",
39+
"opentelemetry-instrumentation-logging>=0.62b1",
4040
]
4141

4242
[dependency-groups]
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from collections.abc import Iterator
2+
3+
import pytest
4+
from opentelemetry.context import Context
5+
from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider
6+
from opentelemetry.sdk.trace.export import SpanProcessor
7+
8+
from tilebox.workflows.observability import tracing
9+
10+
11+
class RecordingSpanProcessor(SpanProcessor):
12+
def __init__(self) -> None:
13+
self.span_names: list[str] = []
14+
15+
def on_start(self, span: Span, parent_context: Context | None = None) -> None:
16+
pass
17+
18+
def on_end(self, span: ReadableSpan) -> None:
19+
self.span_names.append(span.name)
20+
21+
def shutdown(self) -> None:
22+
pass
23+
24+
def force_flush(self, timeout_millis: int = 30000) -> bool: # noqa: ARG002
25+
return True
26+
27+
28+
@pytest.fixture(autouse=True)
29+
def reset_tilebox_tracing() -> Iterator[None]:
30+
tracing._set_tilebox_tracer_provider(TracerProvider())
31+
tracing._workflow_tracers.clear()
32+
yield
33+
tracing._set_tilebox_tracer_provider(TracerProvider())
34+
tracing._workflow_tracers.clear()
35+
36+
37+
@pytest.fixture
38+
def span_processors(monkeypatch: pytest.MonkeyPatch) -> list[RecordingSpanProcessor]:
39+
processors: list[RecordingSpanProcessor] = []
40+
41+
def create_processor(*args: object, **kwargs: object) -> RecordingSpanProcessor: # noqa: ARG001
42+
processor = RecordingSpanProcessor()
43+
processors.append(processor)
44+
return processor
45+
46+
monkeypatch.setattr(tracing, "_otel_span_exporter", create_processor)
47+
return processors
48+
49+
50+
def test_workflow_tracers_do_not_share_client_span_processors(
51+
span_processors: list[RecordingSpanProcessor],
52+
) -> None:
53+
tracers = [tracing.WorkflowTracer(service=None, url="https://api.tilebox.com", token=None) for _ in range(3)]
54+
55+
for index, tracer in enumerate(tracers):
56+
with tracer.start_as_current_span(f"span-{index}"):
57+
pass
58+
59+
assert [processor.span_names for processor in span_processors] == [["span-0"], ["span-1"], ["span-2"]]
60+
61+
62+
def test_workflow_tracers_copy_configured_span_processors_once(
63+
span_processors: list[RecordingSpanProcessor],
64+
) -> None:
65+
tracing.configure_otel_tracing(endpoint="https://otel.example.com")
66+
tracers = [tracing.WorkflowTracer(service=None, url="https://api.tilebox.com", token=None) for _ in range(2)]
67+
68+
for index, tracer in enumerate(tracers):
69+
with tracer.start_as_current_span(f"span-{index}"):
70+
pass
71+
72+
assert [processor.span_names for processor in span_processors] == [
73+
["span-0", "span-1"],
74+
["span-0"],
75+
["span-1"],
76+
]

tilebox-workflows/tests/runner/test_runner.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,10 @@ def test_runner_disallow_duplicate_task_identifiers() -> None:
208208
"dummy-cluster",
209209
InMemoryCache(),
210210
NoopWorkflowTracer(),
211-
None,
212211
MagicMock(),
213212
RunnerContext(NoopWorkflowTracer()),
213+
MagicMock(),
214+
MagicMock(),
214215
)
215216

216217
runner.register(FlakyTask)

tilebox-workflows/tilebox/workflows/client.py

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import logging
22
import os
3+
import warnings
4+
from uuid import uuid4
35

46
from _tilebox.grpc.channel import open_channel, parse_channel_info
57
from tilebox.datasets.sync.client import Client as DatasetsClient
@@ -12,6 +14,12 @@
1214
)
1315
from tilebox.workflows.jobs.client import JobClient
1416
from tilebox.workflows.jobs.service import JobService
17+
from tilebox.workflows.observability.logging import (
18+
OTELLoggingHandler,
19+
StructuredLogger,
20+
_create_tilebox_logger,
21+
_create_tilebox_logger_provider,
22+
)
1523
from tilebox.workflows.observability.tracing import WorkflowTracer
1624
from tilebox.workflows.runner.task_runner import TaskRunner, _LeaseRenewer
1725
from tilebox.workflows.runner.task_service import TaskService
@@ -34,22 +42,27 @@ def __init__(
3442
token = _token_from_env(url, token)
3543
self._auth: dict[str, str] = {"token": token, "url": url}
3644
self._channel = open_channel(url, token)
37-
self._tracer = WorkflowTracer(service=name, url=url, token=token)
3845

39-
self._logger: logging.Logger | None = None
46+
# configure logging and tracing
47+
self._client_id = uuid4() # a random uuid to scope loggers to this client instance
48+
self._logger_provider = _create_tilebox_logger_provider(service=name, url=url, token=token)
4049

41-
def configure_tracing(self, tracer: WorkflowTracer) -> None:
42-
"""
43-
Configure the tracer to use for tracing of tasks and jobs within the workflow clients.
50+
# task logger is the logger available for users to emit logs from within a Task.execute method, via
51+
# context.logger
52+
self._task_logger = _create_tilebox_logger(self._client_id, scope="tasks")
53+
self._task_logger_handler = OTELLoggingHandler(level=logging.INFO, logger_provider=self._logger_provider)
54+
self._task_logger.addHandler(self._task_logger_handler)
4455

45-
The tracer will be used by all task runners and job clients created by this client.
56+
# runner logger is the logger used for logging internal events within a task runner, for example when a
57+
# Tilebox API call fails, or when unexpected errors occur. This logger is not exposed to users,
58+
# and is only used for logging internal events within the client and task runners.
59+
self._runner_logger = _create_tilebox_logger(self._client_id, scope="runner")
60+
self._runner_logger_handler = OTELLoggingHandler(level=logging.INFO, logger_provider=self._logger_provider)
61+
self._runner_logger.addHandler(self._runner_logger_handler)
4662

47-
Calling this method multiple times will replace the existing tracing configuration. However, task runners and
48-
job clients that have already been created will not be affected by subsequent calls to this method.
49-
"""
50-
self._tracer = tracer
63+
self._tracer = WorkflowTracer(service=name, url=url, token=token)
5164

52-
def configure_logging(self, logger: logging.Logger) -> None:
65+
def configure_logging(self, level: int | logging.Logger, runner_level: int | None = None) -> None:
5366
"""
5467
Configure the logger to use for logging of internal events within workflow clients.
5568
@@ -61,7 +74,25 @@ def configure_logging(self, logger: logging.Logger) -> None:
6174
Args:
6275
logger: The logger to use for logging.
6376
"""
64-
self._logger = logger
77+
if not isinstance(level, int):
78+
warning_message = (
79+
"Configuring a logger instance directly on a client is deprecated and will be removed in a future "
80+
"version. If you want to export logs to an external system, configure the tilebox root logger "
81+
"instance, which you can get with `tilebox.workflows.observability.logging.get_logger()`."
82+
)
83+
warnings.warn(
84+
warning_message,
85+
DeprecationWarning,
86+
stacklevel=2,
87+
)
88+
# to preserve backwards compatibility with the old API where the first argument was a logger
89+
self._runner_logger = level
90+
else:
91+
# always adjust the level of the handler, not the loggers themselves, to make sure that other logger
92+
# handlers still receive the logs (for example, if the user configured the tilebox root logger to export
93+
# all logs at DEBUG level to a file)
94+
self._task_logger_handler.setLevel(level)
95+
self._runner_logger_handler.setLevel(runner_level or level)
6596

6697
def jobs(self) -> JobClient:
6798
"""Get a client for the jobs service.
@@ -114,9 +145,10 @@ def runner(
114145
found_cluster.slug,
115146
cache,
116147
self._tracer,
117-
self._logger,
118148
_LeaseRenewer(**self._auth),
119149
runner_context,
150+
task_logger=StructuredLogger(self._task_logger, {}),
151+
runner_logger=StructuredLogger(self._runner_logger, {}),
120152
)
121153

122154
if tasks is not None:

tilebox-workflows/tilebox/workflows/observability/logging.py

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,19 @@
66
import re
77
import sys
88
import traceback
9-
from datetime import timedelta
9+
from datetime import datetime, timedelta
1010
from functools import lru_cache
1111
from importlib.metadata import PackageNotFoundError, version
12-
from typing import ClassVar, TextIO
13-
from uuid import uuid4
12+
from typing import Any, ClassVar, TextIO
13+
from uuid import UUID, uuid4
1414

1515
from opentelemetry.exporter.otlp.proto.http._log_exporter import (
1616
DEFAULT_LOGS_EXPORT_PATH,
1717
OTLPLogExporter,
1818
_append_logs_path,
1919
)
20-
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
20+
from opentelemetry.instrumentation.logging.handler import LoggingHandler
21+
from opentelemetry.sdk._logs import LoggerProvider
2122
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
2223
from opentelemetry.sdk.resources import (
2324
HOST_ARCH,
@@ -31,6 +32,7 @@
3132
Resource,
3233
)
3334
from opentelemetry.semconv.attributes import exception_attributes
35+
from opentelemetry.trace import get_current_span
3436
from opentelemetry.util.types import _ExtendedAttributes
3537

3638
# prefix for stdlib loggers
@@ -44,6 +46,8 @@
4446
_OTEL_LOGS_ENDPOINT_ENV_VAR = "OTEL_LOGS_ENDPOINT"
4547
_OTEL_EXPORT_INTERVAL_ENV_VAR = "OTEL_EXPORT_INTERVAL"
4648

49+
_WORKFLOW_LOG_ATTRIBUTES = "tilebox_structured_log_attributes"
50+
4751
# process-unique identifier to distinguish different instances of the same service running on the same host
4852
_instance_id = str(uuid4())
4953

@@ -87,10 +91,51 @@ def _root_logger() -> logging.Logger:
8791
return root_logger
8892

8993

94+
def _current_span_attributes() -> dict[str, str]:
95+
span_context = get_current_span().get_span_context()
96+
if not span_context.is_valid:
97+
return {}
98+
99+
return {
100+
"trace_id": f"{span_context.trace_id:032x}",
101+
"span_id": f"{span_context.span_id:016x}",
102+
}
103+
104+
105+
def _sanitize_otel_attribute_value(
106+
value: Any,
107+
) -> str | bool | int | float | bytes | list[str | bool | int | float | bytes]:
108+
if isinstance(value, str | bool | int | float | bytes):
109+
return value
110+
111+
if isinstance(value, datetime):
112+
return value.isoformat()
113+
114+
if isinstance(value, tuple | list):
115+
values = []
116+
for item in value:
117+
if isinstance(item, str | bool | int | float | bytes):
118+
values.append(item)
119+
else:
120+
values.append(_sanitize_otel_attribute_value(item))
121+
return values
122+
123+
return str(value)
124+
125+
126+
def _sanitize_otel_attributes(attributes: dict[str, Any]) -> _ExtendedAttributes:
127+
return {str(key): _sanitize_otel_attribute_value(value) for key, value in attributes.items()}
128+
129+
90130
class OTELLoggingHandler(LoggingHandler):
91-
@staticmethod
92-
def _get_attributes(record: logging.LogRecord) -> _ExtendedAttributes:
93-
attributes = {}
131+
def _get_attributes(self, record: logging.LogRecord) -> _ExtendedAttributes:
132+
attributes: dict[str, Any] = {}
133+
attributes.update(_current_span_attributes())
134+
135+
workflow_attributes = getattr(record, _WORKFLOW_LOG_ATTRIBUTES, None)
136+
if isinstance(workflow_attributes, dict):
137+
attributes.update(workflow_attributes)
138+
94139
# the default implementation returns attributes for the filepath, lineno and function of the log record
95140
# we don't want that by default, so we override it to return an empty dict
96141
if record.exc_info:
@@ -104,7 +149,87 @@ def _get_attributes(record: logging.LogRecord) -> _ExtendedAttributes:
104149
attributes[exception_attributes.EXCEPTION_STACKTRACE] = "".join(
105150
traceback.format_exception(*record.exc_info)
106151
)
107-
return attributes
152+
return _sanitize_otel_attributes(attributes)
153+
154+
155+
class StructuredLogger:
156+
"""A small structured logging wrapper for logs emitted during task execution."""
157+
158+
def __init__(self, logger: logging.Logger, attributes: dict[str, Any] | None = None) -> None:
159+
self._logger = logger
160+
self._attributes = attributes or {}
161+
162+
def bind(self, **attributes: Any) -> "StructuredLogger":
163+
"""Return a new logger that includes the given attributes in every log record."""
164+
return StructuredLogger(self._logger, self._attributes | attributes)
165+
166+
def log(self, level: int, message: object, /, *args: Any, **attributes: Any) -> None:
167+
"""Log a message with structured attributes."""
168+
self._log(level, message, args, attributes, exc_info=False)
169+
170+
def debug(self, message: object, /, *args: Any, **attributes: Any) -> None:
171+
"""Log a debug message with structured attributes."""
172+
self._log(logging.DEBUG, message, args, attributes, exc_info=False)
173+
174+
def info(self, message: object, /, *args: Any, **attributes: Any) -> None:
175+
"""Log an info message with structured attributes."""
176+
self._log(logging.INFO, message, args, attributes, exc_info=False)
177+
178+
def warning(self, message: object, /, *args: Any, **attributes: Any) -> None:
179+
"""Log a warning message with structured attributes."""
180+
self._log(logging.WARNING, message, args, attributes, exc_info=False)
181+
182+
def error(self, message: object, /, *args: Any, **attributes: Any) -> None:
183+
"""Log an error message with structured attributes."""
184+
self._log(logging.ERROR, message, args, attributes, exc_info=False)
185+
186+
def exception(self, message: object, /, *args: Any, **attributes: Any) -> None:
187+
"""Log an error message with structured attributes and the current exception information."""
188+
self._log(logging.ERROR, message, args, attributes, exc_info=True)
189+
190+
def critical(self, message: object, /, *args: Any, **attributes: Any) -> None:
191+
"""Log a critical message with structured attributes."""
192+
self._log(logging.CRITICAL, message, args, attributes, exc_info=False)
193+
194+
def _log(
195+
self,
196+
level: int,
197+
message: object,
198+
args: tuple[Any, ...],
199+
attributes: dict[str, Any],
200+
*,
201+
exc_info: bool,
202+
) -> None:
203+
if not self._logger.isEnabledFor(level):
204+
return
205+
206+
workflow_attributes = self._attributes | attributes | _current_span_attributes()
207+
self._logger.log(
208+
level,
209+
message,
210+
*args,
211+
exc_info=exc_info,
212+
extra={_WORKFLOW_LOG_ATTRIBUTES: workflow_attributes},
213+
stacklevel=3,
214+
)
215+
216+
217+
@lru_cache(maxsize=16) # reuse logger providers for the same credentials if possible
218+
def _create_tilebox_logger_provider(service: str | None, url: str, token: str | None) -> LoggerProvider:
219+
provider = LoggerProvider(resource=_get_default_resource(service))
220+
batch_exporter = _otel_log_exporter(
221+
endpoint=url,
222+
headers={"Authorization": f"Bearer {token}"} if token is not None else None,
223+
)
224+
provider.add_log_record_processor(batch_exporter)
225+
return provider
226+
227+
228+
def _create_tilebox_logger(client_id: UUID, scope: str) -> logging.Logger:
229+
logger = logging.getLogger(f"{_LOGGING_NAMESPACE}.clients.{client_id}.{scope}")
230+
logger.setLevel(logging.DEBUG) # always debug, so that other handlers can still filter by that level
231+
logger.propagate = True
232+
return logger
108233

109234

110235
def _otel_log_exporter(

0 commit comments

Comments
 (0)