Skip to content

Commit 891158a

Browse files
Query job logs and traces
1 parent c334d76 commit 891158a

8 files changed

Lines changed: 449 additions & 34 deletions

File tree

tilebox-workflows/tests/jobs/test_client.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,18 @@
66
from tests.tasks_data import jobs
77

88
from _tilebox.grpc.error import NotFoundError
9+
from tilebox.datasets.query.pagination import Pagination
910
from tilebox.datasets.query.time_interval import datetime_to_timestamp
10-
from tilebox.workflows.data import Job, JobState, uuid_message_to_uuid, uuid_to_uuid_message
11+
from tilebox.workflows.data import (
12+
Job,
13+
JobState,
14+
LogRecord,
15+
QueryJobLogsResponse,
16+
QueryJobSpansResponse,
17+
Span,
18+
uuid_message_to_uuid,
19+
uuid_to_uuid_message,
20+
)
1121
from tilebox.workflows.jobs.client import JobClient
1222
from tilebox.workflows.jobs.service import JobService
1323
from tilebox.workflows.observability.tracing import NoopWorkflowTracer
@@ -34,6 +44,90 @@ def execute(self, context: ExecutionContext) -> None:
3444
_ = context
3545

3646

47+
def test_query_logs_paginates() -> None:
48+
next_page_start = uuid4()
49+
log_records = [
50+
LogRecord(
51+
time=datetime.now(tz=timezone.utc),
52+
severity_number=9,
53+
severity_text="INFO",
54+
body="first",
55+
trace_id=None,
56+
span_id=None,
57+
attributes={},
58+
runner_attributes={},
59+
),
60+
LogRecord(
61+
time=datetime.now(tz=timezone.utc),
62+
severity_number=9,
63+
severity_text="INFO",
64+
body="second",
65+
trace_id=None,
66+
span_id=None,
67+
attributes={},
68+
runner_attributes={},
69+
),
70+
]
71+
telemetry_service = MagicMock()
72+
telemetry_service.query_job_logs.side_effect = [
73+
QueryJobLogsResponse([log_records[0]], Pagination(starting_after=next_page_start)),
74+
QueryJobLogsResponse([log_records[1]], Pagination()),
75+
]
76+
job_client = JobClient(MagicMock(), telemetry_service, NoopWorkflowTracer())
77+
job_id = uuid4()
78+
79+
assert job_client.query_logs(job_id) == log_records
80+
assert [call.args[1].starting_after for call in telemetry_service.query_job_logs.call_args_list] == [
81+
None,
82+
next_page_start,
83+
]
84+
85+
86+
def test_query_spans_paginates() -> None:
87+
next_page_start = uuid4()
88+
spans = [
89+
Span(
90+
start_time=datetime.now(tz=timezone.utc),
91+
end_time=datetime.now(tz=timezone.utc),
92+
trace_id="00" * 16,
93+
span_id="00" * 8,
94+
parent_span_id=None,
95+
name="first",
96+
status_code="STATUS_CODE_UNSET",
97+
status_message="",
98+
attributes={},
99+
runner_attributes={},
100+
events=[],
101+
),
102+
Span(
103+
start_time=datetime.now(tz=timezone.utc),
104+
end_time=datetime.now(tz=timezone.utc),
105+
trace_id="00" * 16,
106+
span_id="00" * 8,
107+
parent_span_id=None,
108+
name="second",
109+
status_code="STATUS_CODE_UNSET",
110+
status_message="",
111+
attributes={},
112+
runner_attributes={},
113+
events=[],
114+
),
115+
]
116+
telemetry_service = MagicMock()
117+
telemetry_service.query_job_spans.side_effect = [
118+
QueryJobSpansResponse([spans[0]], Pagination(starting_after=next_page_start)),
119+
QueryJobSpansResponse([spans[1]], Pagination()),
120+
]
121+
job_client = JobClient(MagicMock(), telemetry_service, NoopWorkflowTracer())
122+
job_id = uuid4()
123+
124+
assert job_client.query_spans(job_id) == spans
125+
assert [call.args[1].starting_after for call in telemetry_service.query_job_spans.call_args_list] == [
126+
None,
127+
next_page_start,
128+
]
129+
130+
37131
class MockJobService(JobServiceStub):
38132
"""A mock implementation of the gRPC job service, that stores jobs in memory as a dict."""
39133

@@ -112,7 +206,7 @@ def __init__(self) -> None:
112206
super().__init__()
113207
service = JobService(MagicMock())
114208
service.service = MockJobService() # mock the gRPC service
115-
self.job_client = JobClient(service, NoopWorkflowTracer())
209+
self.job_client = JobClient(service, MagicMock(), NoopWorkflowTracer())
116210
self.count_total_submitted = 0
117211

118212
queued_jobs: Bundle[Job] = Bundle("queued_jobs")

tilebox-workflows/tests/observability/test_tracing.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def test_workflow_tracers_do_not_share_client_span_processors(
5353
tracers = [tracing.WorkflowTracer(service=None, url="https://api.tilebox.com", token=None) for _ in range(3)]
5454

5555
for index, tracer in enumerate(tracers):
56-
with tracer.start_as_current_span(f"span-{index}"):
56+
with tracer.span(f"span-{index}"):
5757
pass
5858

5959
assert [processor.span_names for processor in span_processors] == [["span-0"], ["span-1"], ["span-2"]]
@@ -66,7 +66,7 @@ def test_workflow_tracers_copy_configured_span_processors_once(
6666
tracers = [tracing.WorkflowTracer(service=None, url="https://api.tilebox.com", token=None) for _ in range(2)]
6767

6868
for index, tracer in enumerate(tracers):
69-
with tracer.start_as_current_span(f"span-{index}"):
69+
with tracer.span(f"span-{index}"):
7070
pass
7171

7272
assert [processor.span_names for processor in span_processors] == [

tilebox-workflows/tilebox/workflows/client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
)
1515
from tilebox.workflows.jobs.client import JobClient
1616
from tilebox.workflows.jobs.service import JobService
17+
from tilebox.workflows.jobs.telemetry_service import TelemetryService
1718
from tilebox.workflows.observability.logging import (
1819
OTELLoggingHandler,
1920
StructuredLogger,
@@ -101,7 +102,7 @@ def jobs(self) -> JobClient:
101102
A client for the jobs service.
102103
"""
103104

104-
return JobClient(JobService(self._channel), self._tracer)
105+
return JobClient(JobService(self._channel), TelemetryService(self._channel), self._tracer)
105106

106107
def runner(
107108
self,

0 commit comments

Comments
 (0)