Skip to content

Commit 2154376

Browse files
committed
feat: introduce worker gateway
Signed-off-by: thxCode <thxcode0824@gmail.com>
1 parent be2b903 commit 2154376

14 files changed

Lines changed: 1163 additions & 11 deletions

File tree

cmd/gpustack/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"gpustack.ai/gpustack/pkg/devicemanager"
1010
"gpustack.ai/gpustack/pkg/utils/signalx"
1111
"gpustack.ai/gpustack/pkg/worker"
12+
"gpustack.ai/gpustack/pkg/workergateway"
1213
)
1314

1415
func main() {
@@ -18,6 +19,7 @@ func main() {
1819
Short: "manages GPUStack Kubernetes resources.",
1920
}
2021
c.AddCommand(worker.NewCommand())
22+
c.AddCommand(workergateway.NewCommand())
2123
c.AddCommand(devicemanager.NewCommand())
2224
c = cmd.Harness(c)
2325

pkg/utils/httpx/client.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,6 @@ func NewTraceRequest(uri string) (*http.Request, error) {
179179
return http.NewRequest(http.MethodTrace, uri, nil)
180180
}
181181

182-
// Error is similar to http.Error,
183-
// but it can get the error message by the given code.
184-
func Error(rw http.ResponseWriter, code int) {
185-
http.Error(rw, http.StatusText(code), code)
186-
}
187-
188182
// Close closes the http response body without error.
189183
func Close(resp *http.Response) {
190184
if resp != nil && resp.Body != nil {

pkg/utils/httpx/writer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,9 @@ func JSON(w http.ResponseWriter, code int, v any) {
3636
w.WriteHeader(code)
3737
_, _ = w.Write(buf.Bytes())
3838
}
39+
40+
// Error is similar to http.Error,
41+
// but it can get the error message by the given code.
42+
func Error(w http.ResponseWriter, code int) {
43+
http.Error(w, http.StatusText(code), code)
44+
}

pkg/webserver/server.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,20 @@ func New(c *Config) (Server, error) {
3333
if c.Listener == nil {
3434
return nil, errors.New("listener is required")
3535
}
36-
tcpAddr, ok := c.Listener.Addr().(*net.TCPAddr)
37-
if !ok {
38-
return nil, errors.New("listener must be a TCP listener")
36+
37+
var (
38+
host string
39+
port int
40+
)
41+
if tcpAddr, ok := c.Listener.Addr().(*net.TCPAddr); ok {
42+
host = tcpAddr.IP.String()
43+
port = tcpAddr.Port
3944
}
4045

4146
return &server{
4247
listener: c.Listener,
43-
host: tcpAddr.IP.String(),
44-
port: tcpAddr.Port,
48+
host: host,
49+
port: port,
4550
runners: c.Runners,
4651
mux: http.NewServeMux(),
4752
}, nil
@@ -103,6 +108,9 @@ func (s *server) WebhookMux() *http.ServeMux {
103108
}
104109

105110
func (s *server) HostPort() (string, int, error) {
111+
if s.host == "" || s.port == 0 {
112+
return "", 0, errors.New("server is not listening on a TCP address")
113+
}
106114
return s.host, s.port, nil
107115
}
108116

pkg/workergateway/apis/types.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package apis
2+
3+
import (
4+
"k8s.io/apimachinery/pkg/api/resource"
5+
6+
worker "gpustack.ai/gpustack/api/worker/v1"
7+
)
8+
9+
type (
10+
AggregatedInstanceTypeSpec = worker.InstanceTypeSpec
11+
AggregatedInstanceTypeResource = worker.InstanceTypeResource
12+
)
13+
14+
type AggregatedInstanceType struct {
15+
Name string `json:"name"`
16+
17+
Spec AggregatedInstanceTypeSpec `json:"spec"`
18+
Status AggregatedInstanceTypeStatus `json:"status"`
19+
}
20+
21+
type AggregatedInstanceTypeStatus struct {
22+
// Accelerator is the accelerator resource of the AggregatedInstanceType, e.g. "1", "4".
23+
Accelerator AggregatedInstanceTypeResource `json:"accelerator"`
24+
25+
// CPU is the CPU resource of the AggregatedInstanceType, e.g. "4", "8".
26+
CPU AggregatedInstanceTypeResource `json:"cpu"`
27+
28+
// RAM is the RAM resource of the AggregatedInstanceType, e.g. "40G", "16G".
29+
RAM AggregatedInstanceTypeResource `json:"ram"`
30+
31+
// LocalStorage is the local storage resource of the AggregatedInstanceType, e.g. "100G", "500G".
32+
LocalStorage AggregatedInstanceTypeResource `json:"localStorage"`
33+
34+
// Tiers is the list of once max request tiers of the AggregatedInstanceType.
35+
//
36+
// Each tier represents a combination of the AggregatedInstanceTypeOnceMaxRequestCandidate those satisfy the once max request of the tier.
37+
Tiers []AggregatedInstanceTypeOnceMaxRequestTier `json:"tiers"`
38+
}
39+
40+
type AggregatedInstanceTypeOnceMaxRequestTier struct {
41+
// OnceMaxRequest is the accelerator once max request resource of the tier, e.g. "4", "8".
42+
OnceMaxRequest resource.Quantity `json:"onceMaxRequest"`
43+
44+
// Candidates is the list of candidates of the tier.
45+
//
46+
// All candidates in the same tier must satisfy the max request of the tier,
47+
// but may have different resource values.
48+
Candidates []AggregatedInstanceTypeOnceMaxRequestCandidate `json:"candidates"`
49+
}
50+
51+
// AggregatedInstanceTypeOnceMaxRequestCandidate represents a candidate of the max request tier of the AggregatedInstanceType.
52+
type AggregatedInstanceTypeOnceMaxRequestCandidate struct {
53+
// Cluster is the candidate belongs to, e.g. "cluster-a", "cluster-b".
54+
Cluster string `json:"cluster"`
55+
56+
// InstanceType is the instance type name of the candidate, e.g. "nvidia-a100-40g", "nvidia-v100-32g".
57+
InstanceType string `json:"instanceType"`
58+
59+
// CPU is the CPU once max request resource of the candidate, e.g. "4", "8".
60+
CPU resource.Quantity `json:"cpu"`
61+
62+
// RAM is the RAM once max request resource of the candidate, e.g. "40G", "16G".
63+
RAM resource.Quantity `json:"ram"`
64+
65+
// LocalStorage is the local storage once max request resource of the candidate, e.g. "100G", "500G".
66+
LocalStorage resource.Quantity `json:"localStorage"`
67+
}
68+
69+
type AggregatedInstanceTypeList struct {
70+
Items []AggregatedInstanceType `json:"items,omitempty"`
71+
}

pkg/workergateway/cmd.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package workergateway
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
"github.com/spf13/cobra"
9+
)
10+
11+
// NewCommand returns a new cobra command for the worker gateway.
12+
func NewCommand() *cobra.Command {
13+
o := NewOptions()
14+
15+
c := &cobra.Command{
16+
Use: "worker-gateway",
17+
Aliases: []string{
18+
"wg",
19+
},
20+
Short: "aggregates resources from upstream Kubernetes clusters.",
21+
PreRunE: func(c *cobra.Command, args []string) error {
22+
return o.Validate(c.Context())
23+
},
24+
RunE: func(c *cobra.Command, args []string) error {
25+
ctx := c.Context()
26+
27+
cfg, err := o.Complete(ctx)
28+
if err != nil {
29+
return fmt.Errorf("complete config: %w", err)
30+
}
31+
wg, err := cfg.Apply(ctx)
32+
if err != nil {
33+
return fmt.Errorf("apply config: %w", err)
34+
}
35+
if err = wg.Prepare(ctx); err != nil {
36+
return fmt.Errorf("prepare: %w", err)
37+
}
38+
err = wg.Start(ctx)
39+
if err != nil && !errors.Is(err, context.Canceled) {
40+
return err
41+
}
42+
return nil
43+
},
44+
}
45+
46+
o.AddFlags(c.Flags())
47+
48+
return c
49+
}

pkg/workergateway/config.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package workergateway
2+
3+
import (
4+
"context"
5+
6+
"gpustack.ai/gpustack/pkg/webserver"
7+
"gpustack.ai/gpustack/pkg/workergateway/manager"
8+
)
9+
10+
type Config struct {
11+
// Manager.
12+
ManagerConfig *manager.Config
13+
14+
// Server.
15+
ServerConfig *webserver.Config
16+
}
17+
18+
func (c *Config) Apply(ctx context.Context) (*WorkerGateway, error) {
19+
mgr, err := c.ManagerConfig.Apply(ctx)
20+
if err != nil {
21+
return nil, err
22+
}
23+
24+
srv, err := c.ServerConfig.Apply(ctx)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
return &WorkerGateway{
30+
Manager: mgr,
31+
Server: srv,
32+
}, nil
33+
}

pkg/workergateway/gateway.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package workergateway
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/http/pprof"
8+
"runtime"
9+
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/collectors"
12+
"github.com/prometheus/client_golang/prometheus/promhttp"
13+
"k8s.io/apiserver/pkg/server/healthz"
14+
"k8s.io/apiserver/pkg/server/routes"
15+
"k8s.io/component-base/logs"
16+
"k8s.io/component-base/metrics/legacyregistry"
17+
klog "k8s.io/klog/v2"
18+
ctrlhealthz "sigs.k8s.io/controller-runtime/pkg/healthz"
19+
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
20+
21+
"gpustack.ai/gpustack/pkg/utils/gox"
22+
"gpustack.ai/gpustack/pkg/utils/httpx"
23+
"gpustack.ai/gpustack/pkg/webserver"
24+
"gpustack.ai/gpustack/pkg/workergateway/manager"
25+
)
26+
27+
func init() {
28+
ctrlmetrics.Registry = struct {
29+
prometheus.Registerer
30+
prometheus.Gatherer
31+
}{
32+
Registerer: legacyregistry.Registerer(),
33+
Gatherer: legacyregistry.DefaultGatherer,
34+
}
35+
}
36+
37+
type WorkerGateway struct {
38+
// Manager.
39+
Manager manager.Manager
40+
41+
// Server.
42+
Server webserver.Server
43+
}
44+
45+
// Prepare prepares the runtime for the worker gateway,
46+
// including installing system resources, etc.
47+
func (wg *WorkerGateway) Prepare(ctx context.Context) error {
48+
// Register metric collectors.
49+
{
50+
reg := ctrlmetrics.Registry
51+
cs := []prometheus.Collector{
52+
collectors.NewBuildInfoCollector(),
53+
gox.NewStatsCollector(),
54+
}
55+
for i := range cs {
56+
err := reg.Register(cs[i])
57+
if err != nil {
58+
return fmt.Errorf("register metric collector: %w", err)
59+
}
60+
}
61+
}
62+
63+
return nil
64+
}
65+
66+
func (wg *WorkerGateway) Start(ctx context.Context) error {
67+
mu := wg.Server
68+
69+
// Register /metrics.
70+
{
71+
h := promhttp.HandlerOpts{
72+
ErrorLog: klog.NewStandardLogger("WARNING"),
73+
ErrorHandling: promhttp.HTTPErrorOnError,
74+
}
75+
mu.Register("/metrics", promhttp.HandlerFor(ctrlmetrics.Registry, h))
76+
}
77+
78+
// Register /healthz.
79+
{
80+
p := "/readyz"
81+
h := &ctrlhealthz.Handler{
82+
Checks: map[string]ctrlhealthz.Checker{
83+
"ping": ctrlhealthz.Ping,
84+
"log": healthz.LogHealthz.Check,
85+
},
86+
}
87+
mu.Register(p, http.StripPrefix(p, h))
88+
}
89+
90+
// Register /livez.
91+
{
92+
p := "/livez"
93+
h := &ctrlhealthz.Handler{
94+
Checks: map[string]ctrlhealthz.Checker{
95+
"ping": ctrlhealthz.Ping,
96+
"log": healthz.LogHealthz.Check,
97+
"gopool": func(r *http.Request) error {
98+
return gox.IsHealthy()
99+
},
100+
},
101+
}
102+
mu.Register(p, http.StripPrefix(p, h))
103+
}
104+
105+
// Register /debug.
106+
{
107+
runtime.SetBlockProfileRate(1)
108+
mu.Register("/debug/pprof/", httpx.LoopbackAccessHandlerFunc(pprof.Index))
109+
mu.Register("/debug/pprof/cmdline", httpx.LoopbackAccessHandlerFunc(pprof.Cmdline))
110+
mu.Register("/debug/pprof/profile", httpx.LoopbackAccessHandlerFunc(pprof.Profile))
111+
mu.Register("/debug/pprof/symbol", httpx.LoopbackAccessHandlerFunc(pprof.Symbol))
112+
mu.Register("/debug/pprof/trace", httpx.LoopbackAccessHandlerFunc(pprof.Trace))
113+
mu.Register("/debug/flags/v", httpx.LoopbackAccessHandlerFunc(routes.StringFlagPutHandler(logs.GlogSetter)))
114+
}
115+
116+
// Register API routes.
117+
{
118+
p := "/apis"
119+
mu.Register(p+"/", http.StripPrefix(p, wg.getHandleApis()))
120+
}
121+
122+
klog.Info("starting worker gateway")
123+
return mu.Start(ctx)
124+
}

0 commit comments

Comments
 (0)