Skip to content

Commit 26038ec

Browse files
Worker execution runtime
1 parent 80122b7 commit 26038ec

30 files changed

Lines changed: 832 additions & 669 deletions

tilebox-datasets/tilebox/datasets/datasets/v1/collections_pb2_grpc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tilebox.datasets.datasets.v1 import core_pb2 as datasets_dot_v1_dot_core__pb2
77

88

9-
class CollectionServiceStub(object):
9+
class CollectionServiceStub:
1010
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
1111
"""
1212

@@ -38,7 +38,7 @@ def __init__(self, channel):
3838
_registered_method=True)
3939

4040

41-
class CollectionServiceServicer(object):
41+
class CollectionServiceServicer:
4242
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
4343
"""
4444

@@ -97,7 +97,7 @@ def add_CollectionServiceServicer_to_server(servicer, server):
9797

9898

9999
# This class is part of an EXPERIMENTAL API.
100-
class CollectionService(object):
100+
class CollectionService:
101101
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
102102
"""
103103

tilebox-datasets/tilebox/datasets/datasets/v1/data_access_pb2_grpc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tilebox.datasets.datasets.v1 import data_access_pb2 as datasets_dot_v1_dot_data__access__pb2
77

88

9-
class DataAccessServiceStub(object):
9+
class DataAccessServiceStub:
1010
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
1111
"""
1212

@@ -28,7 +28,7 @@ def __init__(self, channel):
2828
_registered_method=True)
2929

3030

31-
class DataAccessServiceServicer(object):
31+
class DataAccessServiceServicer:
3232
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
3333
"""
3434

@@ -67,7 +67,7 @@ def add_DataAccessServiceServicer_to_server(servicer, server):
6767

6868

6969
# This class is part of an EXPERIMENTAL API.
70-
class DataAccessService(object):
70+
class DataAccessService:
7171
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
7272
"""
7373

tilebox-datasets/tilebox/datasets/datasets/v1/data_ingestion_pb2_grpc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from tilebox.datasets.datasets.v1 import data_ingestion_pb2 as datasets_dot_v1_dot_data__ingestion__pb2
66

77

8-
class DataIngestionServiceStub(object):
8+
class DataIngestionServiceStub:
99
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
1010
"""
1111

@@ -27,7 +27,7 @@ def __init__(self, channel):
2727
_registered_method=True)
2828

2929

30-
class DataIngestionServiceServicer(object):
30+
class DataIngestionServiceServicer:
3131
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
3232
"""
3333

@@ -64,7 +64,7 @@ def add_DataIngestionServiceServicer_to_server(servicer, server):
6464

6565

6666
# This class is part of an EXPERIMENTAL API.
67-
class DataIngestionService(object):
67+
class DataIngestionService:
6868
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
6969
"""
7070

tilebox-datasets/tilebox/datasets/datasets/v1/datasets_pb2_grpc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from tilebox.datasets.datasets.v1 import datasets_pb2 as datasets_dot_v1_dot_datasets__pb2
77

88

9-
class DatasetServiceStub(object):
9+
class DatasetServiceStub:
1010
"""DatasetsService is the CRUD service for Tilebox datasets.
1111
"""
1212

@@ -43,7 +43,7 @@ def __init__(self, channel):
4343
_registered_method=True)
4444

4545

46-
class DatasetServiceServicer(object):
46+
class DatasetServiceServicer:
4747
"""DatasetsService is the CRUD service for Tilebox datasets.
4848
"""
4949

@@ -113,7 +113,7 @@ def add_DatasetServiceServicer_to_server(servicer, server):
113113

114114

115115
# This class is part of an EXPERIMENTAL API.
116-
class DatasetService(object):
116+
class DatasetService:
117117
"""DatasetsService is the CRUD service for Tilebox datasets.
118118
"""
119119

tilebox-workflows/tilebox/workflows/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55

66
from tilebox.workflows.client import Client
77
from tilebox.workflows.data import Job
8+
from tilebox.workflows.runner.runner import Runner
89
from tilebox.workflows.task import ExecutionContext, Task
910

10-
__all__ = ["Client", "ExecutionContext", "Job", "Task"]
11+
__all__ = ["Client", "ExecutionContext", "Job", "Runner", "Task"]
1112

1213

1314
def _init_logging(level: str = "INFO") -> None:

tilebox-workflows/tilebox/workflows/client.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from uuid import uuid4
55

66
from _tilebox.grpc.channel import open_channel, parse_channel_info
7-
from tilebox.datasets.sync.client import Client as DatasetsClient
87
from tilebox.workflows.automations.client import AutomationClient, AutomationService
98
from tilebox.workflows.cache import JobCache, NoCache
109
from tilebox.workflows.clusters.client import ClusterClient, ClusterSlugLike, to_cluster_slug
@@ -22,8 +21,10 @@
2221
_create_tilebox_logger_provider,
2322
)
2423
from tilebox.workflows.observability.tracing import WorkflowTracer
24+
from tilebox.workflows.runner.runner import Runner
2525
from tilebox.workflows.runner.task_runner import TaskRunner, _LeaseRenewer
2626
from tilebox.workflows.runner.task_service import TaskService
27+
from tilebox.workflows.task import Task
2728

2829

2930
class Client:
@@ -107,9 +108,10 @@ def jobs(self) -> JobClient:
107108
def runner(
108109
self,
109110
cluster: ClusterSlugLike | None = None,
110-
tasks: list[type] | None = None,
111+
tasks: list[type[Task]] | None = None,
111112
cache: JobCache | None = None,
112113
context: type[RunnerContext] | None = None,
114+
runner: Runner | None = None,
113115
) -> TaskRunner:
114116
"""Initialize a task runner.
115117
@@ -118,12 +120,17 @@ def runner(
118120
tasks: A list of task the runner is able to execute.
119121
cache: The cache to share between tasks.
120122
context: The type of the runner context to use. Defaults to RunnerContext.
123+
runner: A runner definition containing tasks, cache and context configuration.
121124
122125
Returns:
123126
A task runner.
124127
"""
128+
if runner is not None and (tasks is not None or cache is not None or context is not None):
129+
raise ValueError("Pass either runner or tasks/cache/context, not both.")
130+
131+
runner_definition = runner or Runner(tasks=tasks, cache=cache, context=context)
125132
if cache is None:
126-
cache = NoCache() # a no-op cache that will raise an error if it's used
133+
cache = runner_definition.cache or NoCache() # a no-op cache that will raise an error if it's used
127134

128135
found_cluster = self.clusters().find(to_cluster_slug(cluster or ""))
129136

@@ -134,14 +141,13 @@ def runner(
134141
# lets refactor this to a lazy loading mechanism in the future
135142
storage_locations = []
136143

137-
runner_context_type = context or RunnerContext
144+
runner_context_type = runner_definition.context or RunnerContext
138145
runner_context = runner_context_type(
139146
self._tracer,
140-
datasets_client=DatasetsClient(**self._auth), # ty: ignore[invalid-argument-type]
141147
storage_locations=storage_locations,
142148
)
143149

144-
runner = TaskRunner(
150+
task_runner = TaskRunner(
145151
TaskService(self._channel),
146152
found_cluster.slug,
147153
cache,
@@ -152,11 +158,10 @@ def runner(
152158
runner_logger=StructuredLogger(self._runner_logger, {}),
153159
)
154160

155-
if tasks is not None:
156-
for task in tasks:
157-
runner.register(task)
161+
for task in runner_definition.tasks_by_identifier.values():
162+
task_runner.register(task)
158163

159-
return runner
164+
return task_runner
160165

161166
def clusters(self) -> ClusterClient:
162167
"""

tilebox-workflows/tilebox/workflows/data.py

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
except ModuleNotFoundError:
3737
from typing import Any as S3Client
3838

39-
from tilebox.datasets.sync.client import Client as DatasetsClient
4039
from tilebox.workflows.observability.tracing import NoopWorkflowTracer, WorkflowTracer
4140
from tilebox.workflows.workflows.v1 import automation_pb2 as automation_pb
4241
from tilebox.workflows.workflows.v1 import core_pb2, job_pb2, task_pb2, workflows_pb2
@@ -332,10 +331,26 @@ def to_message(self) -> workflows_pb2.Cluster:
332331
return workflows_pb2.Cluster(slug=self.slug, display_name=self.display_name, deletable=self.deletable)
333332

334333

334+
@dataclass(order=True, frozen=True)
335+
class Workflow:
336+
slug: str
337+
name: str
338+
description: str
339+
340+
@classmethod
341+
def from_message(cls, workflow: workflows_pb2.Workflow) -> "Workflow":
342+
"""Convert a Workflow protobuf message to a Workflow object."""
343+
return cls(slug=workflow.slug, name=workflow.name, description=workflow.description)
344+
345+
def to_message(self) -> workflows_pb2.Workflow:
346+
"""Convert a Workflow object to a Workflow protobuf message."""
347+
return workflows_pb2.Workflow(slug=self.slug, name=self.name, description=self.description)
348+
349+
335350
@dataclass
336351
class NextTaskToRun:
337352
cluster_slug: str
338-
identifiers: dict[TaskIdentifier, type]
353+
identifiers: dict[TaskIdentifier, type[Any]]
339354

340355
# from message not needed, as we never return this from the server
341356

@@ -474,6 +489,51 @@ def to_message(self) -> task_pb2.ComputedTask:
474489
)
475490

476491

492+
@dataclass
493+
class FailedTask:
494+
task_id: UUID
495+
display: str | None
496+
was_workflow_error: bool
497+
progress_updates: list[ProgressIndicator]
498+
499+
@classmethod
500+
def from_message(cls, failed_task: task_pb2.TaskFailedRequest) -> "FailedTask":
501+
"""Convert a TaskFailedRequest protobuf message to a FailedTask object."""
502+
return cls(
503+
task_id=uuid_message_to_uuid(failed_task.task_id),
504+
display=failed_task.display,
505+
was_workflow_error=failed_task.was_workflow_error,
506+
progress_updates=[ProgressIndicator.from_message(progress) for progress in failed_task.progress_updates],
507+
)
508+
509+
@classmethod
510+
def from_task_error(
511+
cls,
512+
task: Task,
513+
error: Exception,
514+
was_workflow_error: bool,
515+
progress_updates: list[ProgressIndicator],
516+
) -> "FailedTask":
517+
# job output is limited to 1KB, so truncate the error message if necessary
518+
error_message = repr(error)[: (1024 - len(task.display or "None") - 1)]
519+
display = f"{task.display}" if error_message == "" else f"{task.display}\n{error_message}"
520+
return cls(
521+
task_id=task.id,
522+
display=display,
523+
was_workflow_error=was_workflow_error,
524+
progress_updates=progress_updates,
525+
)
526+
527+
def to_message(self) -> task_pb2.TaskFailedRequest:
528+
"""Convert a FailedTask object to a TaskFailedRequest protobuf message."""
529+
return task_pb2.TaskFailedRequest(
530+
task_id=uuid_to_uuid_message(self.task_id),
531+
display=self.display,
532+
was_workflow_error=self.was_workflow_error,
533+
progress_updates=[progress.to_message() for progress in self.progress_updates],
534+
)
535+
536+
477537
def _parse_version(version: str) -> tuple[int, int]:
478538
"""
479539
Parse the major and minor version from a string in the format "vMajor.Minor" and returns them as tuple of ints.
@@ -924,13 +984,11 @@ class RunnerContext:
924984
def __init__(
925985
self,
926986
tracer: WorkflowTracer | None = None,
927-
datasets_client: DatasetsClient | None = None,
928987
storage_locations: list[StorageLocation] | None = None,
929988
) -> None:
930989
if tracer is None:
931990
tracer = NoopWorkflowTracer()
932991
self.tracer = tracer
933-
self.datasets_client = datasets_client
934992
self.storage_locations = {
935993
sl.id: sl._with_runner_context(self) # noqa: SLF001
936994
for sl in storage_locations or []

tilebox-workflows/tilebox/workflows/interceptors.py

Lines changed: 0 additions & 62 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from tilebox.workflows.runner.runner import Runner
2+
from tilebox.workflows.runner.task_runner import TaskRunner
3+
4+
__all__ = ["Runner", "TaskRunner"]

0 commit comments

Comments
 (0)