Skip to content

Commit c852398

Browse files
authored
Set up background tasks Sentry tracing (#2932)
1 parent b5f26c8 commit c852398

15 files changed

+65
-10
lines changed

src/dstack/_internal/server/app.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from fastapi.responses import HTMLResponse, RedirectResponse
1414
from fastapi.staticfiles import StaticFiles
1515
from prometheus_client import Counter, Histogram
16+
from sentry_sdk.types import SamplingContext
1617

1718
from dstack._internal.cli.utils.common import console
1819
from dstack._internal.core.errors import ForbiddenError, ServerClientError
@@ -81,16 +82,6 @@
8182

8283

8384
def create_app() -> FastAPI:
84-
if settings.SENTRY_DSN is not None:
85-
sentry_sdk.init(
86-
dsn=settings.SENTRY_DSN,
87-
release=DSTACK_VERSION,
88-
environment=settings.SERVER_ENVIRONMENT,
89-
enable_tracing=True,
90-
traces_sample_rate=settings.SENTRY_TRACES_SAMPLE_RATE,
91-
profiles_sample_rate=settings.SENTRY_PROFILES_SAMPLE_RATE,
92-
)
93-
9485
app = FastAPI(
9586
docs_url="/api/docs",
9687
lifespan=lifespan,
@@ -102,6 +93,15 @@ def create_app() -> FastAPI:
10293
@asynccontextmanager
10394
async def lifespan(app: FastAPI):
10495
configure_logging()
96+
if settings.SENTRY_DSN is not None:
97+
sentry_sdk.init(
98+
dsn=settings.SENTRY_DSN,
99+
release=DSTACK_VERSION,
100+
environment=settings.SERVER_ENVIRONMENT,
101+
enable_tracing=True,
102+
traces_sampler=_sentry_traces_sampler,
103+
profiles_sample_rate=settings.SENTRY_PROFILES_SAMPLE_RATE,
104+
)
105105
server_executor = ThreadPoolExecutor(max_workers=settings.SERVER_EXECUTOR_MAX_WORKERS)
106106
asyncio.get_running_loop().set_default_executor(server_executor)
107107
await migrate()
@@ -379,3 +379,15 @@ def _print_dstack_logo():
379379
╰━━┻━━┻╯╱╰╯╰━━┻╯
380380
[/]"""
381381
)
382+
383+
384+
def _sentry_traces_sampler(sampling_context: SamplingContext) -> float:
385+
parent_sampling_decision = sampling_context["parent_sampled"]
386+
if parent_sampling_decision is not None:
387+
return float(parent_sampling_decision)
388+
transaction_context = sampling_context["transaction_context"]
389+
name = transaction_context.get("name")
390+
if name is not None:
391+
if name.startswith("background."):
392+
return settings.SENTRY_TRACES_BACKGROUND_SAMPLE_RATE
393+
return settings.SENTRY_TRACES_SAMPLE_RATE

src/dstack/_internal/server/background/tasks/process_fleets.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
is_fleet_in_use,
2020
)
2121
from dstack._internal.server.services.locking import get_locker
22+
from dstack._internal.server.utils import sentry_utils
2223
from dstack._internal.utils.common import get_current_datetime
2324
from dstack._internal.utils.logging import get_logger
2425

@@ -29,6 +30,7 @@
2930
MIN_PROCESSING_INTERVAL = timedelta(seconds=30)
3031

3132

33+
@sentry_utils.instrument_background_task
3234
async def process_fleets():
3335
lock, lockset = get_locker(get_db().dialect_name).get_lockset(FleetModel.__tablename__)
3436
async with get_session_ctx() as session:

src/dstack/_internal/server/background/tasks/process_gateways.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from dstack._internal.server.services.locking import advisory_lock_ctx, get_locker
1919
from dstack._internal.server.services.logging import fmt
20+
from dstack._internal.server.utils import sentry_utils
2021
from dstack._internal.utils.common import get_current_datetime
2122
from dstack._internal.utils.logging import get_logger
2223

@@ -28,6 +29,7 @@ async def process_gateways_connections():
2829
await _process_active_connections()
2930

3031

32+
@sentry_utils.instrument_background_task
3133
async def process_gateways():
3234
lock, lockset = get_locker(get_db().dialect_name).get_lockset(GatewayModel.__tablename__)
3335
async with get_session_ctx() as session:

src/dstack/_internal/server/background/tasks/process_idle_volumes.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717
get_volume_configuration,
1818
volume_model_to_volume,
1919
)
20+
from dstack._internal.server.utils import sentry_utils
2021
from dstack._internal.utils import common
2122
from dstack._internal.utils.common import get_current_datetime
2223
from dstack._internal.utils.logging import get_logger
2324

2425
logger = get_logger(__name__)
2526

2627

28+
@sentry_utils.instrument_background_task
2729
async def process_idle_volumes():
2830
lock, lockset = get_locker(get_db().dialect_name).get_lockset(VolumeModel.__tablename__)
2931
async with get_session_ctx() as session:

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
from dstack._internal.server.services.runner import client as runner_client
106106
from dstack._internal.server.services.runner.client import HealthStatus
107107
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
108+
from dstack._internal.server.utils import sentry_utils
108109
from dstack._internal.utils.common import (
109110
get_current_datetime,
110111
get_or_error,
@@ -136,6 +137,7 @@ async def process_instances(batch_size: int = 1):
136137
await asyncio.gather(*tasks)
137138

138139

140+
@sentry_utils.instrument_background_task
139141
async def _process_next_instance():
140142
lock, lockset = get_locker(get_db().dialect_name).get_lockset(InstanceModel.__tablename__)
141143
async with get_session_ctx() as session:

src/dstack/_internal/server/background/tasks/process_metrics.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_runtime_data
1616
from dstack._internal.server.services.runner import client
1717
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
18+
from dstack._internal.server.utils import sentry_utils
1819
from dstack._internal.utils.common import batched, get_current_datetime, get_or_error, run_async
1920
from dstack._internal.utils.logging import get_logger
2021

@@ -26,6 +27,7 @@
2627
MIN_COLLECT_INTERVAL_SECONDS = 9
2728

2829

30+
@sentry_utils.instrument_background_task
2931
async def collect_metrics():
3032
async with get_session_ctx() as session:
3133
res = await session.execute(
@@ -45,6 +47,7 @@ async def collect_metrics():
4547
await _collect_jobs_metrics(batch)
4648

4749

50+
@sentry_utils.instrument_background_task
4851
async def delete_metrics():
4952
now_timestamp_micro = int(get_current_datetime().timestamp() * 1_000_000)
5053
running_timestamp_micro_cutoff = (

src/dstack/_internal/server/background/tasks/process_placement_groups.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
from dstack._internal.server.services import backends as backends_services
1313
from dstack._internal.server.services.locking import get_locker
1414
from dstack._internal.server.services.placement import placement_group_model_to_placement_group
15+
from dstack._internal.server.utils import sentry_utils
1516
from dstack._internal.utils.common import get_current_datetime, run_async
1617
from dstack._internal.utils.logging import get_logger
1718

1819
logger = get_logger(__name__)
1920

2021

22+
@sentry_utils.instrument_background_task
2123
async def process_placement_groups():
2224
lock, lockset = get_locker(get_db().dialect_name).get_lockset(
2325
PlacementGroupModel.__tablename__

src/dstack/_internal/server/background/tasks/process_prometheus_metrics.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dstack._internal.server.services.jobs import get_job_provisioning_data, get_job_runtime_data
2020
from dstack._internal.server.services.runner import client
2121
from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel
22+
from dstack._internal.server.utils import sentry_utils
2223
from dstack._internal.server.utils.common import gather_map_async
2324
from dstack._internal.utils.common import batched, get_current_datetime, get_or_error, run_async
2425
from dstack._internal.utils.logging import get_logger
@@ -34,6 +35,7 @@
3435
METRICS_TTL_SECONDS = 600
3536

3637

38+
@sentry_utils.instrument_background_task
3739
async def collect_prometheus_metrics():
3840
now = get_current_datetime()
3941
cutoff = now - timedelta(seconds=MIN_COLLECT_INTERVAL_SECONDS)
@@ -61,6 +63,7 @@ async def collect_prometheus_metrics():
6163
await _collect_jobs_metrics(batch, now)
6264

6365

66+
@sentry_utils.instrument_background_task
6467
async def delete_prometheus_metrics():
6568
now = get_current_datetime()
6669
cutoff = now - timedelta(seconds=METRICS_TTL_SECONDS)

src/dstack/_internal/server/background/tasks/process_running_jobs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
)
7474
from dstack._internal.server.services.secrets import get_project_secrets_mapping
7575
from dstack._internal.server.services.storage import get_default_storage
76+
from dstack._internal.server.utils import sentry_utils
7677
from dstack._internal.utils import common as common_utils
7778
from dstack._internal.utils.interpolator import InterpolatorError, VariablesInterpolator
7879
from dstack._internal.utils.logging import get_logger
@@ -94,6 +95,7 @@ async def process_running_jobs(batch_size: int = 1):
9495
await asyncio.gather(*tasks)
9596

9697

98+
@sentry_utils.instrument_background_task
9799
async def _process_next_running_job():
98100
lock, lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__)
99101
async with get_session_ctx() as session:
@@ -159,6 +161,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
159161
job_model.status = JobStatus.TERMINATING
160162
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
161163
job_model.last_processed_at = common_utils.get_current_datetime()
164+
await session.commit()
162165
return
163166

164167
job = find_job(run.jobs, job_model.replica_num, job_model.job_num)
@@ -204,6 +207,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
204207
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
205208
job_model.termination_reason_message = e.args[0]
206209
job_model.last_processed_at = common_utils.get_current_datetime()
210+
await session.commit()
207211
return
208212

209213
server_ssh_private_keys = get_instance_ssh_private_keys(

src/dstack/_internal/server/background/tasks/process_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
)
4444
from dstack._internal.server.services.secrets import get_project_secrets_mapping
4545
from dstack._internal.server.services.services import update_service_desired_replica_count
46+
from dstack._internal.server.utils import sentry_utils
4647
from dstack._internal.utils import common
4748
from dstack._internal.utils.logging import get_logger
4849

@@ -59,6 +60,7 @@ async def process_runs(batch_size: int = 1):
5960
await asyncio.gather(*tasks)
6061

6162

63+
@sentry_utils.instrument_background_task
6264
async def _process_next_run():
6365
run_lock, run_lockset = get_locker(get_db().dialect_name).get_lockset(RunModel.__tablename__)
6466
job_lock, job_lockset = get_locker(get_db().dialect_name).get_lockset(JobModel.__tablename__)

0 commit comments

Comments
 (0)