Skip to content

Commit 1851c28

Browse files
authored
Automatically remove dangling tasks from shim (#3036)
- Terminate and remove dangling tasks from shim when processing the instance. Tasks can become dangling because shim was unavailable when the job was in the `terminating` status. - Adjust the shim API to return task statuses to avoid redundant termination. Dangling task termination and removal works with the old API too, but may result in redundant termination requests and warnings if the task is already terminated, but not removed. This is particularly noticeable if `DSTACK_SERVER_KEEP_SHIM_TASKS` is set.
1 parent 453bd9f commit 1851c28

File tree

15 files changed

+245
-26
lines changed

15 files changed

+245
-26
lines changed

runner/docs/shim.openapi.yaml

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,21 @@ components:
396396
title: shim.api.TaskListResponse
397397
type: object
398398
properties:
399-
ids:
399+
tasks:
400400
type: array
401401
items:
402-
$ref: "#/components/schemas/TaskID"
403-
description: A list of all task IDs tracked by shim
402+
type: object
403+
properties:
404+
id:
405+
$ref: "#/components/schemas/TaskID"
406+
status:
407+
$ref: "#/components/schemas/TaskStatus"
408+
required:
409+
- id
410+
- status
411+
description: A list of all tasks tracked by shim, each with its ID and status
404412
required:
405-
- ids
413+
- tasks
406414
additionalProperties: false
407415

408416
TaskInfoResponse:

runner/internal/shim/api/api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ func (ds *DummyRunner) Remove(context.Context, string) error {
3434
return nil
3535
}
3636

37-
func (ds *DummyRunner) TaskIDs() []string {
38-
return []string{}
37+
func (ds *DummyRunner) TaskList() []*shim.TaskListItem {
38+
return []*shim.TaskListItem{}
3939
}
4040

4141
func (ds *DummyRunner) TaskInfo(taskID string) shim.TaskInfo {

runner/internal/shim/api/handlers.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ func (s *ShimServer) InstanceHealthHandler(w http.ResponseWriter, r *http.Reques
3636
}
3737

3838
func (s *ShimServer) TaskListHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {
39-
return &TaskListResponse{IDs: s.runner.TaskIDs()}, nil
39+
tasks := s.runner.TaskList()
40+
return &TaskListResponse{tasks}, nil
4041
}
4142

4243
func (s *ShimServer) TaskInfoHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) {

runner/internal/shim/api/schemas.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type InstanceHealthResponse struct {
1515
}
1616

1717
type TaskListResponse struct {
18-
IDs []string `json:"ids"`
18+
Tasks []*shim.TaskListItem `json:"tasks"`
1919
}
2020

2121
type TaskInfoResponse struct {

runner/internal/shim/api/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type TaskRunner interface {
1919
Remove(ctx context.Context, taskID string) error
2020

2121
Resources(context.Context) shim.Resources
22-
TaskIDs() []string
22+
TaskList() []*shim.TaskListItem
2323
TaskInfo(taskID string) shim.TaskInfo
2424
}
2525

runner/internal/shim/docker.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,13 @@ func (d *DockerRunner) Resources(ctx context.Context) Resources {
216216
}
217217
}
218218

219-
func (d *DockerRunner) TaskIDs() []string {
220-
return d.tasks.IDs()
219+
func (d *DockerRunner) TaskList() []*TaskListItem {
220+
tasks := d.tasks.List()
221+
result := make([]*TaskListItem, 0, len(tasks))
222+
for _, task := range tasks {
223+
result = append(result, &TaskListItem{ID: task.ID, Status: task.Status})
224+
}
225+
return result
221226
}
222227

223228
func (d *DockerRunner) TaskInfo(taskID string) TaskInfo {

runner/internal/shim/models.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ type TaskConfig struct {
104104
ContainerSshKeys []string `json:"container_ssh_keys"`
105105
}
106106

107+
type TaskListItem struct {
108+
ID string `json:"id"`
109+
Status TaskStatus `json:"status"`
110+
}
111+
107112
type TaskInfo struct {
108113
ID string
109114
Status TaskStatus

runner/internal/shim/task.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,15 @@ type TaskStorage struct {
148148
mu sync.RWMutex
149149
}
150150

151-
func (ts *TaskStorage) IDs() []string {
151+
// Get a _copy_ of all tasks. To "commit" changes, use Update()
152+
func (ts *TaskStorage) List() []Task {
152153
ts.mu.RLock()
153154
defer ts.mu.RUnlock()
154-
ids := make([]string, 0, len(ts.tasks))
155-
for id := range ts.tasks {
156-
ids = append(ids, id)
155+
tasks := make([]Task, 0, len(ts.tasks))
156+
for _, task := range ts.tasks {
157+
tasks = append(tasks, task)
157158
}
158-
return ids
159+
return tasks
159160
}
160161

161162
// Get a _copy_ of the task. To "commit" changes, use Update()

src/dstack/_internal/server/background/tasks/process_instances.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,10 @@
8585
get_instance_provisioning_data,
8686
get_instance_requirements,
8787
get_instance_ssh_private_keys,
88+
remove_dangling_tasks_from_instance,
8889
)
8990
from dstack._internal.server.services.locking import get_locker
91+
from dstack._internal.server.services.logging import fmt
9092
from dstack._internal.server.services.offers import is_divisible_into_blocks
9193
from dstack._internal.server.services.placement import (
9294
get_fleet_placement_group_models,
@@ -789,6 +791,7 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non
789791
ssh_private_keys,
790792
job_provisioning_data,
791793
None,
794+
instance=instance,
792795
check_instance_health=check_instance_health,
793796
)
794797
if instance_check is False:
@@ -935,7 +938,7 @@ async def _wait_for_instance_provisioning_data(
935938

936939
@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1)
937940
def _check_instance_inner(
938-
ports: Dict[int, int], *, check_instance_health: bool = False
941+
ports: Dict[int, int], *, instance: InstanceModel, check_instance_health: bool = False
939942
) -> InstanceCheck:
940943
instance_health_response: Optional[InstanceHealthResponse] = None
941944
shim_client = runner_client.ShimClient(port=ports[DSTACK_SHIM_HTTP_PORT])
@@ -955,6 +958,10 @@ def _check_instance_inner(
955958
args = (method.__func__.__name__, e.__class__.__name__, e)
956959
logger.exception(template, *args)
957960
return InstanceCheck(reachable=False, message=template % args)
961+
try:
962+
remove_dangling_tasks_from_instance(shim_client, instance)
963+
except Exception as e:
964+
logger.exception("%s: error removing dangling tasks: %s", fmt(instance), e)
958965
return runner_client.healthcheck_response_to_instance_check(
959966
healthcheck_response, instance_health_response
960967
)

src/dstack/_internal/server/schemas/runner.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,16 @@ class GPUDevice(CoreModel):
159159
path_in_container: str
160160

161161

162+
class TaskListItem(CoreModel):
163+
id: str
164+
status: TaskStatus
165+
166+
167+
class TaskListResponse(CoreModel):
168+
ids: Optional[list[str]] = None # returned by pre-0.19.26 shim
169+
tasks: Optional[list[TaskListItem]] = None # returned by 0.19.26+ shim
170+
171+
162172
class TaskInfoResponse(CoreModel):
163173
id: str
164174
status: TaskStatus

0 commit comments

Comments
 (0)