-
-
Notifications
You must be signed in to change notification settings - Fork 222
Expand file tree
/
Copy pathserver.go
More file actions
136 lines (111 loc) · 3.68 KB
/
server.go
File metadata and controls
136 lines (111 loc) · 3.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package api
import (
"context"
"errors"
"net"
"net/http"
"reflect"
"sync"
"github.com/dstackai/dstack/runner/internal/common/api"
"github.com/dstackai/dstack/runner/internal/common/log"
"github.com/dstackai/dstack/runner/internal/shim"
"github.com/dstackai/dstack/runner/internal/shim/components"
"github.com/dstackai/dstack/runner/internal/shim/dcgm"
"github.com/dstackai/dstack/runner/internal/shim/netmeter"
)
type TaskRunner interface {
Submit(context.Context, shim.TaskConfig) error
Run(ctx context.Context, taskID string) error
Terminate(ctx context.Context, taskID string, timeout uint, reason string, message string) error
Remove(ctx context.Context, taskID string) error
Resources(context.Context) shim.Resources
TaskList() []*shim.TaskListItem
TaskInfo(taskID string) shim.TaskInfo
}
type ShimServer struct {
httpServer *http.Server
mu sync.RWMutex
ctx context.Context
inShutdown bool
inForceShutdown bool
bgJobsCtx context.Context
bgJobsCancel context.CancelFunc
bgJobsGroup *sync.WaitGroup
runner TaskRunner
dcgmExporter *dcgm.DCGMExporter
dcgmWrapper dcgm.DCGMWrapperInterface // interface with nil value normalized to plain nil
runnerManager components.ComponentManager
shimManager components.ComponentManager
netMeter *netmeter.NetMeter // may be nil if metering is unavailable
version string
}
func NewShimServer(
ctx context.Context, address string, version string,
runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper dcgm.DCGMWrapperInterface,
runnerManager components.ComponentManager, shimManager components.ComponentManager,
nm *netmeter.NetMeter,
) *ShimServer {
bgJobsCtx, bgJobsCancel := context.WithCancel(ctx)
if dcgmWrapper != nil && reflect.ValueOf(dcgmWrapper).IsNil() {
dcgmWrapper = nil
}
r := api.NewRouter()
s := &ShimServer{
httpServer: &http.Server{
Addr: address,
Handler: r,
BaseContext: func(l net.Listener) context.Context { return ctx },
},
ctx: ctx,
bgJobsCtx: bgJobsCtx,
bgJobsCancel: bgJobsCancel,
bgJobsGroup: &sync.WaitGroup{},
runner: runner,
dcgmExporter: dcgmExporter,
dcgmWrapper: dcgmWrapper,
runnerManager: runnerManager,
shimManager: shimManager,
netMeter: nm,
version: version,
}
// The healthcheck endpoint should stay backward compatible, as it is used for negotiation
r.AddHandler("GET", "/api/healthcheck", s.HealthcheckHandler)
r.AddHandler("POST", "/api/shutdown", s.ShutdownHandler)
r.AddHandler("GET", "/api/instance/health", s.InstanceHealthHandler)
r.AddHandler("GET", "/api/components", s.ComponentListHandler)
r.AddHandler("POST", "/api/components/install", s.ComponentInstallHandler)
r.AddHandler("GET", "/api/tasks", s.TaskListHandler)
r.AddHandler("GET", "/api/tasks/{id}", s.TaskInfoHandler)
r.AddHandler("POST", "/api/tasks", s.TaskSubmitHandler)
r.AddHandler("POST", "/api/tasks/{id}/terminate", s.TaskTerminateHandler)
r.AddHandler("POST", "/api/tasks/{id}/remove", s.TaskRemoveHandler)
r.HandleFunc("GET /metrics/tasks/{id}", s.TaskMetricsHandler)
return s
}
func (s *ShimServer) Serve() error {
if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil
}
func (s *ShimServer) Shutdown(ctx context.Context, force bool) error {
s.mu.Lock()
if s.inForceShutdown || s.inShutdown && !force {
log.Info(ctx, "Already shutting down, ignoring request")
s.mu.Unlock()
return nil
}
s.inShutdown = true
if force {
s.inForceShutdown = true
}
s.mu.Unlock()
log.Info(ctx, "Shutting down", "force", force)
s.bgJobsCancel()
if force {
return s.httpServer.Close()
}
err := s.httpServer.Shutdown(ctx)
s.bgJobsGroup.Wait()
return err
}