Skip to content

Commit 266b179

Browse files
Update task span attributes
1 parent 6c11c0d commit 266b179

23 files changed

Lines changed: 1625 additions & 879 deletions

File tree

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ repos:
99
hooks:
1010
- id: sync-with-uv
1111
- repo: https://github.com/charliermarsh/ruff-pre-commit
12-
rev: v0.15.7
12+
rev: v0.15.12
1313
hooks:
1414
- id: ruff-check
1515
args: [--fix, --exit-non-zero-on-fix]

tilebox-datasets/tilebox/datasets/protobuf_conversion/field_types.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ def to_proto(self, value: tuple[float, float, float]) -> LatLonAlt | None:
254254

255255
def infer_field_type(field: FieldDescriptor) -> ProtobufFieldType:
256256
if field.type == FieldDescriptor.TYPE_MESSAGE:
257+
if field.message_type is None:
258+
raise ValueError("Expected message type for field but got None")
257259
message_name = field.message_type.full_name
258260
if message_name not in _MESSAGE_NAMES_TO_FIELDS:
259261
raise ValueError(f"Unsupported message type {message_name}")
@@ -281,6 +283,9 @@ def enum_mapping_from_field_descriptor(field: FieldDescriptor) -> dict[int, str]
281283
if field.type != FieldDescriptor.TYPE_ENUM:
282284
raise ValueError("Expected field to be of type FieldDescriptor.TYPE_ENUM")
283285

286+
if field.enum_type is None:
287+
raise ValueError("Expected enum type for field but got None")
288+
284289
# remove the enum type prefix from the enum values
285290
# e.g. FLIGHT_DIRECTION_ASCENDING of the FlightDirection enum will result in a value of ASCENDING
286291
enum_type_prefix = _camel_to_uppercase(field.enum_type.name) + "_"

tilebox-datasets/tilebox/datasets/protobuf_conversion/protobuf_xarray.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import contextlib
66
from collections.abc import Sequence
7-
from typing import Any, TypeVar
7+
from typing import Any, TypeVar, cast
88

99
import numpy as np
1010
import xarray as xr
@@ -352,7 +352,7 @@ def _create_field_converters(message: Message, buffer_size: int) -> dict[str, _F
352352
for field in message.DESCRIPTOR.fields:
353353
# if we have an unsupported field type we will get a ValueError, so we just skip those fields
354354
with contextlib.suppress(ValueError):
355-
converter = _create_field_converter(field)
355+
converter = _create_field_converter(cast(FieldDescriptor, field))
356356
converters[field.name] = converter
357357

358358
if buffer_size > 0:

tilebox-datasets/tilebox/datasets/protobuf_conversion/to_protobuf.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from collections import defaultdict
22
from collections.abc import Collection, Iterable, Iterator, Mapping
3-
from typing import Any
3+
from typing import Any, cast
44
from uuid import UUID
55

66
import numpy as np
77
import pandas as pd
88
import xarray as xr
9+
from google.protobuf.descriptor import Descriptor
910
from google.protobuf.message import Message
1011

1112
from tilebox.datasets.protobuf_conversion.field_types import (
@@ -40,7 +41,7 @@ def to_messages( # noqa: C901, PLR0912
4041

4142
ignore = set(ignore_fields or [])
4243

43-
field_descriptors_by_name = message_type.DESCRIPTOR.fields_by_name
44+
field_descriptors_by_name = cast(Descriptor, message_type.DESCRIPTOR).fields_by_name
4445

4546
# let's validate our fields, to make sure that they are all known fields for the given protobuf message
4647
# and that they are all lists of the same length

tilebox-workflows/pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ dependencies = [
3333
"ipywidgets>=8.1.7",
3434
"python-dateutil>=2.9.0.post0",
3535
"obstore>=0.8.2",
36+
"opentelemetry-proto>=1.30.0",
37+
"structlog>=25.5.0",
38+
# grpcio 1.80.0 contains unwanted log message spam: https://github.com/grpc/grpc/issues/42293
39+
"grpcio<1.80.0",
3640
]
3741

3842
[dependency-groups]

tilebox-workflows/tests/automations/test_storage_event.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from tilebox.workflows.automations import StorageEventTask
99
from tilebox.workflows.data import StorageEventType, StorageLocation
10+
from tilebox.workflows.observability.tracing import NoopWorkflowTracer
1011
from tilebox.workflows.task import RunnerContext
1112

1213

@@ -43,7 +44,7 @@ def test_storage_event_task_de_serialization_roundtrip(storage_location: Storage
4344
triggered_task = task.once(storage_location, "FM171/apid.json", StorageEventType.CREATED)
4445

4546
# serialized task only contains a bucket id, so for deserialization we need to provide a bucket lookup table
46-
context = RunnerContext(storage_locations=[storage_location])
47+
context = RunnerContext(NoopWorkflowTracer(), storage_locations=[storage_location])
4748

4849
serialized = triggered_task._serialize()
4950
assert ExampleStorageEventTask._deserialize(serialized, context) == triggered_task
@@ -55,7 +56,7 @@ def test_storage_event_task_de_serialization_roundtrip_protobuf(storage_location
5556
triggered_task = task.once(storage_location, "FM171/apid.json")
5657

5758
# serialized task only contains a bucket id, so for deserialization we need to provide a bucket lookup table
58-
context = RunnerContext(storage_locations=[storage_location])
59+
context = RunnerContext(NoopWorkflowTracer(), storage_locations=[storage_location])
5960

6061
serialized = triggered_task._serialize()
6162
assert ExampleProtoStorageEventTask._deserialize(serialized, context) == triggered_task

tilebox-workflows/tests/jobs/test_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from tilebox.workflows.data import Job, JobState, uuid_message_to_uuid, uuid_to_uuid_message
1111
from tilebox.workflows.jobs.client import JobClient
1212
from tilebox.workflows.jobs.service import JobService
13+
from tilebox.workflows.observability.tracing import NoopWorkflowTracer
1314
from tilebox.workflows.task import ExecutionContext, Task
1415
from tilebox.workflows.workflows.v1.core_pb2 import Job as JobMessage
1516
from tilebox.workflows.workflows.v1.core_pb2 import JobState as JobStateEnum
@@ -111,7 +112,7 @@ def __init__(self) -> None:
111112
super().__init__()
112113
service = JobService(MagicMock())
113114
service.service = MockJobService() # mock the gRPC service
114-
self.job_client = JobClient(service)
115+
self.job_client = JobClient(service, NoopWorkflowTracer())
115116
self.count_total_submitted = 0
116117

117118
queued_jobs: Bundle[Job] = Bundle("queued_jobs")

tilebox-workflows/tests/runner/test_runner.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from tilebox.workflows.cache import InMemoryCache, JobCache
1111
from tilebox.workflows.client import Client
1212
from tilebox.workflows.data import JobState, ProgressIndicator, RunnerContext, TaskState
13+
from tilebox.workflows.observability.tracing import NoopWorkflowTracer
1314
from tilebox.workflows.runner.task_runner import TaskRunner
1415

1516

@@ -206,10 +207,10 @@ def test_runner_disallow_duplicate_task_identifiers() -> None:
206207
MagicMock(),
207208
"dummy-cluster",
208209
InMemoryCache(),
209-
None,
210+
NoopWorkflowTracer(),
210211
None,
211212
MagicMock(),
212-
RunnerContext(),
213+
RunnerContext(NoopWorkflowTracer()),
213214
)
214215

215216
runner.register(FlakyTask)

tilebox-workflows/tilebox/workflows/client.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,30 @@
1212
)
1313
from tilebox.workflows.jobs.client import JobClient
1414
from tilebox.workflows.jobs.service import JobService
15-
from tilebox.workflows.observability.tracing import (
16-
WorkflowTracer,
17-
)
15+
from tilebox.workflows.observability.tracing import WorkflowTracer
1816
from tilebox.workflows.runner.task_runner import TaskRunner, _LeaseRenewer
1917
from tilebox.workflows.runner.task_service import TaskService
2018

2119

2220
class Client:
23-
def __init__(self, *, url: str = "https://api.tilebox.com", token: str | None = None) -> None:
21+
def __init__(
22+
self, *, url: str = "https://api.tilebox.com", token: str | None = None, name: str | None = None
23+
) -> None:
2424
"""
2525
Create a Tilebox workflows client.
2626
2727
Args:
2828
url: Tilebox API Url. Defaults to "https://api.tilebox.com".
2929
token: The API Key to authenticate with. If not set the `TILEBOX_API_KEY` environment variable will be used.
30+
name: An optional name of the client, used as service.name for telemetry. If not set, defaults to
31+
the service name provided by `tilebox.workflows.observability.tracing.configure_otel_tracing`,
32+
or "tilebox-python" if no external tracer is configured.
3033
"""
3134
token = _token_from_env(url, token)
3235
self._auth: dict[str, str] = {"token": token, "url": url}
3336
self._channel = open_channel(url, token)
37+
self._tracer = WorkflowTracer(service=name, url=url, token=token)
3438

35-
self._tracer: WorkflowTracer | None = None
3639
self._logger: logging.Logger | None = None
3740

3841
def configure_tracing(self, tracer: WorkflowTracer) -> None:
@@ -90,8 +93,6 @@ def runner(
9093
if cache is None:
9194
cache = NoCache() # a no-op cache that will raise an error if it's used
9295

93-
tracer = self._tracer or WorkflowTracer()
94-
9596
found_cluster = self.clusters().find(to_cluster_slug(cluster or ""))
9697

9798
try:
@@ -103,7 +104,7 @@ def runner(
103104

104105
runner_context_type = context or RunnerContext
105106
runner_context = runner_context_type(
106-
tracer._tracer, # noqa: SLF001
107+
self._tracer,
107108
datasets_client=DatasetsClient(**self._auth), # ty: ignore[invalid-argument-type]
108109
storage_locations=storage_locations,
109110
)
@@ -112,7 +113,7 @@ def runner(
112113
TaskService(self._channel),
113114
found_cluster.slug,
114115
cache,
115-
tracer,
116+
self._tracer,
116117
self._logger,
117118
_LeaseRenewer(**self._auth),
118119
runner_context,

tilebox-workflows/tilebox/workflows/data.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
from mypy_boto3_s3.client import S3Client
3131
except ModuleNotFoundError:
3232
from typing import Any as S3Client
33-
from opentelemetry.trace import ProxyTracerProvider, Tracer
3433

3534
from tilebox.datasets.sync.client import Client as DatasetsClient
35+
from tilebox.workflows.observability.tracing import NoopWorkflowTracer, WorkflowTracer
3636
from tilebox.workflows.workflows.v1 import automation_pb2 as automation_pb
3737
from tilebox.workflows.workflows.v1 import core_pb2, job_pb2, task_pb2
3838

@@ -675,14 +675,13 @@ def to_message(self) -> automation_pb.AutomationPrototype:
675675
class RunnerContext:
676676
def __init__(
677677
self,
678-
tracer: Tracer | None = None,
678+
tracer: WorkflowTracer | None = None,
679679
datasets_client: DatasetsClient | None = None,
680680
storage_locations: list[StorageLocation] | None = None,
681681
) -> None:
682682
if tracer is None:
683-
self.tracer = ProxyTracerProvider().get_tracer("tilebox.workflows.RunnerContext")
684-
else:
685-
self.tracer = tracer
683+
tracer = NoopWorkflowTracer()
684+
self.tracer = tracer
686685
self.datasets_client = datasets_client
687686
self.storage_locations = {
688687
sl.id: sl._with_runner_context(self) # noqa: SLF001

0 commit comments

Comments
 (0)