-
Notifications
You must be signed in to change notification settings - Fork 20
Expand file tree
/
Copy pathorchestrator_main.py
More file actions
134 lines (105 loc) · 4.92 KB
/
Copy pathorchestrator_main.py
File metadata and controls
134 lines (105 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import pathlib
import os
import sqlalchemy
from sqlalchemy import orm
from cloud_pipelines_backend import orchestrator_sql
from cloud_pipelines_backend.instrumentation import bugsnag_instrumentation
from cloud_pipelines_backend.instrumentation import opentelemetry as otel
from cloud_pipelines_backend.launchers import kubernetes_launchers
from cloud_pipelines.orchestration.storage_providers import local_storage
def _build_launcher():
"""Select container launcher via TANGLE_LAUNCHER env var.
Values:
"kubernetes" (default) — KubernetesWithHostPathContainerLauncher
"kubernetes_gcs" — KubernetesWithGcsFuseContainerLauncher (GKE)
"skypilot" — SkyPilotKubernetesLauncher (requires `skypilot` extra)
"""
choice = os.environ.get("TANGLE_LAUNCHER", "kubernetes").strip().lower()
if choice == "skypilot":
# Lazy import so deployments without the [skypilot] extra don't pay for it.
from cloud_pipelines_backend.launchers.skypilot_launchers import (
SkyPilotKubernetesLauncher,
)
return SkyPilotKubernetesLauncher(
infra=os.environ.get("SKYPILOT_INFRA", "kubernetes"),
pool=os.environ.get("SKYPILOT_POOL"),
default_image=os.environ.get("DEFAULT_CONTAINER_IMAGE"),
priority_class=os.environ.get("DEFAULT_PRIORITY_CLASS"),
default_labels={"managed-by": "tangle"},
)
from kubernetes import config as k8s_config_lib
from kubernetes import client as k8s_client_lib
try:
k8s_config_lib.load_incluster_config()
except Exception:
k8s_config_lib.load_kube_config()
k8s_client = k8s_client_lib.ApiClient()
k8s_client_lib.VersionApi(k8s_client).get_code(_request_timeout=5)
if choice == "kubernetes_gcs":
return kubernetes_launchers.KubernetesWithGcsFuseContainerLauncher(
api_client=k8s_client,
)
return kubernetes_launchers.KubernetesWithHostPathContainerLauncher(
api_client=k8s_client,
)
def main():
logger = logging.getLogger(__name__)
orchestrator_logger = logging.getLogger("cloud_pipelines_backend.orchestrator_sql")
orchestrator_logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s")
file_handler = logging.FileHandler("orchestrator_main.log")
stderr_handler = logging.StreamHandler()
file_handler.setLevel(logging.DEBUG)
stderr_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
stderr_handler.setFormatter(formatter)
orchestrator_logger.addHandler(file_handler)
# TODO: Disable the default logger instead of not adding a new one
# orchestrator_logger.addHandler(stderr_handler)
logger.addHandler(file_handler)
logger.addHandler(stderr_handler)
logger.info("Starting the orchestrator")
bugsnag_instrumentation.setup(service_name="tangle-orchestrator")
otel.setup_providers()
DEFAULT_DATABASE_URI = "sqlite:///db.sqlite"
database_uri = os.environ.get("DATABASE_URI", DEFAULT_DATABASE_URI)
db_engine = sqlalchemy.create_engine(url=database_uri)
logger.info("Completed sqlalchemy.create_engine")
# With autobegin=False you always need to beging a transaction, even to query the DB.
session_factory = orm.sessionmaker(
autocommit=False, autoflush=False, bind=db_engine
)
artifact_store_root_dir = (pathlib.Path.cwd() / "tmp" / "artifacts").as_posix()
log_store_root_dir = (pathlib.Path.cwd() / "tmp" / "logs").as_posix()
default_task_annotations = {
kubernetes_launchers.RESOURCES_CPU_ANNOTATION_KEY: "1",
kubernetes_launchers.RESOURCES_MEMORY_ANNOTATION_KEY: "512Mi",
}
launcher = _build_launcher()
orchestrator = orchestrator_sql.OrchestratorService_Sql(
session_factory=session_factory,
launcher=launcher,
storage_provider=local_storage.LocalStorageProvider(),
data_root_uri=artifact_store_root_dir,
logs_root_uri=log_store_root_dir,
default_task_annotations=default_task_annotations,
sleep_seconds_between_queue_sweeps=5.0,
)
orchestrator.run_loop()
if __name__ == "__main__":
# This sets the root logger to write to stdout (your console).
# Your script/app needs to call this somewhere at least once.
# logging.basicConfig()
logging.basicConfig(
format="%(asctime)s\t%(levelname)s\t%(message)s", level=logging.NOTSET
)
# # By default the root logger is set to WARNING and all loggers you define
# # inherit that value. Here we set the root logger to NOTSET. This logging
# # level is automatically inherited by all existing and new sub-loggers
# # that do not set a less verbose level.
# logging.root.setLevel(logging.NOTSET)
# # The following line sets the root logger level as well.
# # It's equivalent to both previous statements combined:
# logging.basicConfig(level=logging.NOTSET)
main()