Skip to content

Commit e363015

Browse files
to_pandas conversion for job logs/traces
1 parent 891158a commit e363015

5 files changed

Lines changed: 63 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.52.0]
11+
12+
### Added
13+
14+
- `tilebox-workflows`: Added automatic OpenTelemetry-based logging and tracing instrumentation to a Tilebox
15+
observability backend.
16+
- `tilebox-workflows`: Added `ExecutionContext.logger` and `ExecutionContext.tracer` access for task implementations to
17+
emit structured logs and custom spans during task execution.
18+
- `tilebox-workflows`: Added `JobClient.query_logs()` and `JobClient.query_spans()` to fetch telemetry for a job, with
19+
list-like results that can be converted to pandas DataFrames via `to_pandas()`.
20+
1021
## [0.51.0] - 2026-04-07
1122

1223
### Changed
@@ -357,7 +368,8 @@ the first client that does not cache data (since it's already on the local file
357368
- Released under the [MIT](https://opensource.org/license/mit) license.
358369
- Released packages: `tilebox-datasets`, `tilebox-workflows`, `tilebox-storage`, `tilebox-grpc`
359370

360-
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.51.0...HEAD
371+
[Unreleased]: https://github.com/tilebox/tilebox-python/compare/v0.52.0...HEAD
372+
[0.52.0]: https://github.com/tilebox/tilebox-python/compare/v0.51.0...v0.52.0
361373
[0.51.0]: https://github.com/tilebox/tilebox-python/compare/v0.50.1...v0.51.0
362374
[0.50.1]: https://github.com/tilebox/tilebox-python/compare/v0.50.0...v0.50.1
363375
[0.50.0]: https://github.com/tilebox/tilebox-python/compare/v0.49.0...v0.50.0

tilebox-workflows/tests/jobs/test_client.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
Job,
1313
JobState,
1414
LogRecord,
15+
LogRecords,
1516
QueryJobLogsResponse,
1617
QueryJobSpansResponse,
1718
Span,
19+
Spans,
1820
uuid_message_to_uuid,
1921
uuid_to_uuid_message,
2022
)
@@ -70,13 +72,16 @@ def test_query_logs_paginates() -> None:
7072
]
7173
telemetry_service = MagicMock()
7274
telemetry_service.query_job_logs.side_effect = [
73-
QueryJobLogsResponse([log_records[0]], Pagination(starting_after=next_page_start)),
74-
QueryJobLogsResponse([log_records[1]], Pagination()),
75+
QueryJobLogsResponse(LogRecords([log_records[0]]), Pagination(starting_after=next_page_start)),
76+
QueryJobLogsResponse(LogRecords([log_records[1]]), Pagination()),
7577
]
7678
job_client = JobClient(MagicMock(), telemetry_service, NoopWorkflowTracer())
7779
job_id = uuid4()
7880

79-
assert job_client.query_logs(job_id) == log_records
81+
logs = job_client.query_logs(job_id)
82+
83+
assert logs == log_records
84+
assert list(logs.to_pandas()["body"]) == ["first", "second"]
8085
assert [call.args[1].starting_after for call in telemetry_service.query_job_logs.call_args_list] == [
8186
None,
8287
next_page_start,
@@ -115,13 +120,16 @@ def test_query_spans_paginates() -> None:
115120
]
116121
telemetry_service = MagicMock()
117122
telemetry_service.query_job_spans.side_effect = [
118-
QueryJobSpansResponse([spans[0]], Pagination(starting_after=next_page_start)),
119-
QueryJobSpansResponse([spans[1]], Pagination()),
123+
QueryJobSpansResponse(Spans([spans[0]]), Pagination(starting_after=next_page_start)),
124+
QueryJobSpansResponse(Spans([spans[1]]), Pagination()),
120125
]
121126
job_client = JobClient(MagicMock(), telemetry_service, NoopWorkflowTracer())
122127
job_id = uuid4()
123128

124-
assert job_client.query_spans(job_id) == spans
129+
queried_spans = job_client.query_spans(job_id)
130+
131+
assert queried_spans == spans
132+
assert list(queried_spans.to_pandas()["name"]) == ["first", "second"]
125133
assert [call.args[1].starting_after for call in telemetry_service.query_job_spans.call_args_list] == [
126134
None,
127135
next_page_start,

tilebox-workflows/tilebox/workflows/data.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import re
22
import warnings
33
from collections.abc import Callable
4-
from dataclasses import dataclass, field
4+
from dataclasses import asdict, dataclass, field
55
from datetime import datetime, timedelta, timezone
66
from enum import Enum
77
from functools import lru_cache
@@ -10,6 +10,7 @@
1010
from uuid import UUID
1111

1212
import boto3
13+
import pandas as pd
1314
from google.cloud.storage import Client as GoogleStorageClient
1415
from google.cloud.storage.bucket import Bucket
1516
from google.protobuf.duration_pb2 import Duration
@@ -542,6 +543,12 @@ def to_message(self) -> logs_pb2.ResourceLogs:
542543
)
543544

544545

546+
class LogRecords(list[LogRecord]):
547+
def to_pandas(self) -> Any:
548+
"""Convert log records to a pandas DataFrame."""
549+
return pd.DataFrame([asdict(record) for record in self])
550+
551+
545552
@dataclass
546553
class Span:
547554
start_time: datetime
@@ -614,6 +621,17 @@ def to_message(self) -> trace_pb2.ResourceSpans:
614621
)
615622

616623

624+
class Spans(list[Span]):
625+
def to_pandas(self) -> Any:
626+
"""Convert spans to a pandas DataFrame."""
627+
rows = []
628+
for span in self:
629+
row = asdict(span)
630+
row["duration"] = span.duration
631+
rows.append(row)
632+
return pd.DataFrame(rows)
633+
634+
617635
def _datetime_from_unix_nanos(unix_nanos: int) -> datetime:
618636
return datetime.fromtimestamp(unix_nanos / 1_000_000_000, tz=timezone.utc)
619637

@@ -635,7 +653,7 @@ def _any_value_to_python(value: common_pb2.AnyValue) -> Any: # noqa: PLR0911
635653
case "bytes_value":
636654
return value.bytes_value
637655
case "array_value":
638-
return [_any_value_to_python(item) for item in value.array_value.values]
656+
return [_any_value_to_python(item) for item in value.array_value.values] # noqa: PD011
639657
case "kvlist_value":
640658
return _key_values_to_dict(value.kvlist_value.values)
641659
case _:
@@ -691,26 +709,26 @@ def _span_event_to_message(event: dict[str, Any]) -> trace_pb2.Span.Event:
691709

692710
@dataclass(frozen=True)
693711
class QueryJobLogsResponse:
694-
logs: list[LogRecord]
712+
logs: LogRecords
695713
next_page: Pagination
696714

697715
@classmethod
698716
def from_message(cls, page: Any) -> "QueryJobLogsResponse":
699717
return cls(
700-
logs=[LogRecord.from_message(log_record) for log_record in page.resource_logs],
718+
logs=LogRecords(LogRecord.from_message(log_record) for log_record in page.resource_logs),
701719
next_page=Pagination.from_message(page.next_page),
702720
)
703721

704722

705723
@dataclass(frozen=True)
706724
class QueryJobSpansResponse:
707-
spans: list[Span]
725+
spans: Spans
708726
next_page: Pagination
709727

710728
@classmethod
711729
def from_message(cls, page: Any) -> "QueryJobSpansResponse":
712730
return cls(
713-
spans=[Span.from_message(span) for span in page.resource_spans],
731+
spans=Spans(Span.from_message(span) for span in page.resource_spans),
714732
next_page=Pagination.from_message(page.next_page),
715733
)
716734

tilebox-workflows/tilebox/workflows/jobs/client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
from tilebox.workflows.data import (
1111
Job,
1212
JobState,
13-
LogRecord,
13+
LogRecords,
1414
QueryFilters,
1515
QueryJobLogsResponse,
1616
QueryJobSpansResponse,
1717
QueryJobsResponse,
18-
Span,
18+
Spans,
1919
TaskState,
2020
)
2121
from tilebox.workflows.jobs.service import JobService
@@ -138,7 +138,7 @@ def find(self, job_id: JobIDLike) -> Job:
138138
"""
139139
return self._service.get_by_id(_to_uuid(job_id))
140140

141-
def query_logs(self, job_id: JobIDLike) -> list[LogRecord]:
141+
def query_logs(self, job_id: JobIDLike) -> LogRecords:
142142
"""Query logs emitted while running a job.
143143
144144
Args:
@@ -154,12 +154,12 @@ def request(page: PaginationProtocol) -> QueryJobLogsResponse:
154154

155155
pages = paginated_request(request, Pagination())
156156

157-
logs = []
157+
logs = LogRecords()
158158
for page in pages:
159159
logs.extend(page.logs)
160160
return logs
161161

162-
def query_spans(self, job_id: JobIDLike) -> list[Span]:
162+
def query_spans(self, job_id: JobIDLike) -> Spans:
163163
"""Query spans emitted while running a job.
164164
165165
Args:
@@ -175,7 +175,7 @@ def request(page: PaginationProtocol) -> QueryJobSpansResponse:
175175

176176
pages = paginated_request(request, Pagination())
177177

178-
spans = []
178+
spans = Spans()
179179
for page in pages:
180180
spans.extend(page.spans)
181181
return spans

tilebox-workflows/tilebox/workflows/runner/task_runner.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import contextlib
22
import json
3+
import logging
34
import random
45
import signal
56
import threading
@@ -540,7 +541,11 @@ def __init__(self, runner: TaskRunner, task: Task, job_cache: JobCache) -> None:
540541
self.job_cache = job_cache
541542
self._sub_tasks: list[FutureTask] = []
542543
self._progress_indicators: dict[str | None, ProgressUpdate] = {}
543-
self._logger = runner.task_logger.bind(task_id=str(task.id))
544+
if runner is None or task is None:
545+
# Some tests instantiate an execution context only to exercise local subtask merging helpers.
546+
self._logger = StructuredLogger(logging.getLogger("tilebox.workflows.noop"))
547+
else:
548+
self._logger = runner.task_logger.bind(task_id=str(task.id))
544549

545550
def submit_subtask(
546551
self,

0 commit comments

Comments
 (0)