Skip to content

Commit 1701975

Browse files
committed
refactor: support webserver with unix socket
Signed-off-by: thxCode <thxcode0824@gmail.com>
1 parent f776cc8 commit 1701975

8 files changed

Lines changed: 206 additions & 34 deletions

File tree

pkg/devicemanager/cmd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func NewCommand() *cobra.Command {
1717
c := &cobra.Command{
1818
Use: "device-manager",
1919
Aliases: []string{
20-
"devmgr",
20+
"dm",
2121
},
2222
Short: "assists in managing hardware resources.",
2323
}

pkg/devicemanager/option.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewOptions() *Options {
4949

5050
func (o *Options) AddFlags(fs *pflag.FlagSet) {
5151
// Server.
52-
o.ServerOptions.AddFlags(fs)
52+
o.ServerOptions.AddFlags(fs, webserver.WithoutBindUnixPath())
5353

5454
// Manager.
5555
o.ManagerOptions.AddFlags(fs, manager.WithoutKubeElectionOptions())

pkg/manager/manager.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,15 @@ func (m *Manager) Prepare(_ context.Context) error {
5858
// before starting the controller manager.
5959
func (m *Manager) Start(ctx context.Context) error {
6060
cm := m.CtrlManager
61-
ms := cm.GetWebhookServer()
61+
mu := cm.GetWebhookServer()
6262

6363
// Register /metrics.
6464
{
6565
h := promhttp.HandlerOpts{
6666
ErrorLog: klog.NewStandardLogger("WARNING"),
6767
ErrorHandling: promhttp.HTTPErrorOnError,
6868
}
69-
ms.Register("/metrics", promhttp.HandlerFor(ctrlmetrics.Registry, h))
69+
mu.Register("/metrics", promhttp.HandlerFor(ctrlmetrics.Registry, h))
7070
}
7171

7272
// Register /readyz.
@@ -78,7 +78,7 @@ func (m *Manager) Start(ctx context.Context) error {
7878
"log": healthz.LogHealthz.Check,
7979
},
8080
}
81-
ms.Register(p, http.StripPrefix(p, h))
81+
mu.Register(p, http.StripPrefix(p, h))
8282
}
8383

8484
// Register /livez.
@@ -102,18 +102,18 @@ func (m *Manager) Start(ctx context.Context) error {
102102
},
103103
},
104104
}
105-
ms.Register(p, http.StripPrefix(p, h))
105+
mu.Register(p, http.StripPrefix(p, h))
106106
}
107107

108108
// Register /debug.
109109
{
110110
runtime.SetBlockProfileRate(1)
111-
ms.Register("/debug/pprof/", httpx.LoopbackAccessHandlerFunc(pprof.Index))
112-
ms.Register("/debug/pprof/cmdline", httpx.LoopbackAccessHandlerFunc(pprof.Cmdline))
113-
ms.Register("/debug/pprof/profile", httpx.LoopbackAccessHandlerFunc(pprof.Profile))
114-
ms.Register("/debug/pprof/symbol", httpx.LoopbackAccessHandlerFunc(pprof.Symbol))
115-
ms.Register("/debug/pprof/trace", httpx.LoopbackAccessHandlerFunc(pprof.Trace))
116-
ms.Register("/debug/flags/v", httpx.LoopbackAccessHandlerFunc(routes.StringFlagPutHandler(logs.GlogSetter)))
111+
mu.Register("/debug/pprof/", httpx.LoopbackAccessHandlerFunc(pprof.Index))
112+
mu.Register("/debug/pprof/cmdline", httpx.LoopbackAccessHandlerFunc(pprof.Cmdline))
113+
mu.Register("/debug/pprof/profile", httpx.LoopbackAccessHandlerFunc(pprof.Profile))
114+
mu.Register("/debug/pprof/symbol", httpx.LoopbackAccessHandlerFunc(pprof.Symbol))
115+
mu.Register("/debug/pprof/trace", httpx.LoopbackAccessHandlerFunc(pprof.Trace))
116+
mu.Register("/debug/flags/v", httpx.LoopbackAccessHandlerFunc(routes.StringFlagPutHandler(logs.GlogSetter)))
117117
}
118118

119119
// Start.

pkg/utils/osx/file.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,14 @@ func Open(path string) (*os.File, error) {
3030
return os.Open(p)
3131
}
3232

33+
// ExistsParentDir checks if the parent directory of the given path exists.
34+
func ExistsParentDir(path string) bool {
35+
p := filepath.Clean(path)
36+
p = InlineTilde(p)
37+
38+
return ExistsDir(filepath.Dir(p))
39+
}
40+
3341
// Exists checks if the given path exists.
3442
func Exists(path string, checks ...func(os.FileInfo) bool) bool {
3543
p := filepath.Clean(path)
@@ -222,14 +230,94 @@ func IsEmptyFile(file string) bool {
222230
return s.Size() == 0
223231
}
224232

233+
// Remove removes the given file or directory,
234+
// and also supports additional checks before removal.
235+
func Remove(path string, checks ...func(os.FileInfo) error) error {
236+
p := filepath.Clean(path)
237+
p = InlineTilde(p)
238+
239+
stat, err := os.Lstat(p)
240+
if err != nil {
241+
if os.IsNotExist(err) {
242+
return nil
243+
}
244+
return err
245+
}
246+
247+
for i := range checks {
248+
if checks[i] == nil {
249+
continue
250+
}
251+
252+
if err = checks[i](stat); err != nil {
253+
return err
254+
}
255+
}
256+
257+
return os.Remove(p)
258+
}
259+
260+
// RemoveDir removes the given directory, and returns an error if the path is not a directory.
261+
func RemoveDir(path string) error {
262+
return Remove(path, func(stat os.FileInfo) error {
263+
if !stat.Mode().IsDir() {
264+
return fmt.Errorf("not a directory")
265+
}
266+
return nil
267+
})
268+
}
269+
270+
// RemoveFile removes the given file, and returns an error if the path is not a regular file.
271+
func RemoveFile(path string) error {
272+
return Remove(path, func(stat os.FileInfo) error {
273+
if !stat.Mode().IsRegular() {
274+
return fmt.Errorf("not a regular file")
275+
}
276+
return nil
277+
})
278+
}
279+
280+
// RemoveLink removes the given symbolic link, and returns an error if the path is not a symbolic link.
281+
func RemoveLink(path string) error {
282+
return Remove(path, func(stat os.FileInfo) error {
283+
if stat.Mode()&os.ModeSymlink == 0 {
284+
return fmt.Errorf("not a symbolic link")
285+
}
286+
return nil
287+
})
288+
}
289+
290+
// RemoveSocket removes the given socket, and returns an error if the path is not a socket.
291+
func RemoveSocket(path string) error {
292+
return Remove(path, func(stat os.FileInfo) error {
293+
if stat.Mode()&os.ModeSocket == 0 {
294+
return fmt.Errorf("not a socket")
295+
}
296+
return nil
297+
})
298+
}
299+
300+
// RemoveDevice removes the given device, and returns an error if the path is not a device.
301+
func RemoveDevice(path string) error {
302+
return Remove(path, func(stat os.FileInfo) error {
303+
if stat.Mode()&os.ModeDevice == 0 {
304+
return fmt.Errorf("not a device")
305+
}
306+
return nil
307+
})
308+
}
309+
225310
// DurableRemove removes the given file or directory,
226311
// and also syncs the parent directory to ensure the removal is durable.
227312
func DurableRemove(path string) error {
228-
err := os.Remove(path)
313+
p := filepath.Clean(path)
314+
p = InlineTilde(p)
315+
316+
err := os.Remove(p)
229317
if err != nil {
230318
return err
231319
}
232-
return syncDir(filepath.Dir(path))
320+
return syncDir(filepath.Dir(p))
233321
}
234322

235323
func syncDir(dir string) error {

pkg/webserver/null.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,6 @@ func (null) WebhookMux() *http.ServeMux {
4040
func (null) HostPort() (string, int, error) {
4141
return "", 0, errors.New("no listener")
4242
}
43+
44+
func (null) NotFoundHandler(http.Handler) {
45+
}

pkg/webserver/option.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ import (
1818
)
1919

2020
type Options struct {
21+
FlagOptions
22+
2123
// Establish.
22-
BindAddress net.IP
23-
BindPort int
24-
CertDir string
24+
BindUnixPath string
25+
BindAddress net.IP
26+
BindPort int
27+
CertDir string
2528
}
2629

2730
func NewOptions() *Options {
@@ -32,8 +35,31 @@ func NewOptions() *Options {
3235
}
3336
}
3437

35-
func (o *Options) AddFlags(fs *pflag.FlagSet) {
38+
type (
39+
FlagOptions struct {
40+
noBindUnixPath bool
41+
}
42+
43+
FlagOption func(opts *FlagOptions)
44+
)
45+
46+
func WithoutBindUnixPath() FlagOption {
47+
return func(opts *FlagOptions) {
48+
opts.noBindUnixPath = true
49+
}
50+
}
51+
52+
func (o *Options) AddFlags(fs *pflag.FlagSet, opts ...FlagOption) {
53+
for i := range opts {
54+
opts[i](&o.FlagOptions)
55+
}
56+
3657
// Establish.
58+
if !o.noBindUnixPath {
59+
fs.StringVar(&o.BindUnixPath, "bind-unix-path", o.BindUnixPath,
60+
"the unix socket path on which to serve. "+
61+
"if specified, --bind-address and --secure-port will be ignored.")
62+
}
3763
fs.IPVar(&o.BindAddress, "bind-address", o.BindAddress,
3864
"the IP address(without port) on which to serve.")
3965
fs.IntVar(&o.BindPort, "secure-port", o.BindPort,
@@ -45,6 +71,12 @@ func (o *Options) AddFlags(fs *pflag.FlagSet) {
4571

4672
func (o *Options) Validate(_ context.Context) error {
4773
// Establish.
74+
if !o.noBindUnixPath && o.BindUnixPath != "" {
75+
if !osx.ExistsParentDir(o.BindUnixPath) {
76+
return errors.New("--bind-unix-path: no found parent directory")
77+
}
78+
return nil
79+
}
4880
if o.BindPort < 1 || o.BindPort > 65535 {
4981
return errors.New("--secure-port: out of range")
5082
}
@@ -91,11 +123,24 @@ func (o *Options) Complete(_ context.Context) (*Config, error) {
91123
tlsCfg.GetCertificate = certMgr.GetCertificate
92124
}
93125

94-
listener, err := tls.Listen("tcp", net.JoinHostPort(o.BindAddress.String(), strconv.Itoa(o.BindPort)), tlsCfg)
126+
var (
127+
lis net.Listener
128+
err error
129+
)
130+
if o.BindUnixPath != "" {
131+
err = osx.RemoveSocket(o.BindUnixPath)
132+
if err != nil {
133+
return nil, fmt.Errorf("remove existing unix socket: %w", err)
134+
}
135+
lis, err = tls.Listen("unix", o.BindUnixPath, tlsCfg)
136+
} else {
137+
lis, err = tls.Listen("tcp",
138+
net.JoinHostPort(o.BindAddress.String(), strconv.Itoa(o.BindPort)), tlsCfg)
139+
}
95140
if err != nil {
96141
return nil, fmt.Errorf("create listener: %w", err)
97142
}
98143

99-
cfg.Listener = listener
144+
cfg.Listener = lis
100145
return &cfg, nil
101146
}

pkg/webserver/server.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@ import (
1616
"gpustack.ai/gpustack/pkg/utils/netx"
1717
)
1818

19+
// Server is a web server that can serve webhooks and other HTTP handlers.
1920
type Server interface {
2021
ctrlwebhook.Server
2122

23+
// HostPort returns the host and port that the server is listening on.
2224
HostPort() (string, int, error)
25+
26+
// NotFoundHandler sets the handler for requests that do not match any registered path.
27+
// If not set, the default http.NotFoundHandler will be used.
28+
NotFoundHandler(http.Handler)
2329
}
2430

2531
// New creates a Server with the given configuration.
@@ -32,7 +38,7 @@ func New(c *Config) (Server, error) {
3238
return nil, errors.New("listener must be a TCP listener")
3339
}
3440

35-
return server{
41+
return &server{
3642
listener: c.Listener,
3743
host: tcpAddr.IP.String(),
3844
port: tcpAddr.Port,
@@ -42,23 +48,24 @@ func New(c *Config) (Server, error) {
4248
}
4349

4450
type server struct {
45-
listener net.Listener
46-
host string
47-
port int
48-
runners []manager.Runnable
49-
mux *http.ServeMux
51+
listener net.Listener
52+
host string
53+
port int
54+
runners []manager.Runnable
55+
mux *http.ServeMux
56+
notFoundHandler http.Handler
5057
}
5158

52-
func (server) NeedLeaderElection() bool {
59+
func (*server) NeedLeaderElection() bool {
5360
return false
5461
}
5562

56-
func (s server) Register(path string, hook http.Handler) {
63+
func (s *server) Register(path string, hook http.Handler) {
5764
s.mux.Handle(path, hook)
5865
}
5966

60-
func (s server) Start(ctx context.Context) error {
61-
srv := newHttpServer(s.mux)
67+
func (s *server) Start(ctx context.Context) error {
68+
srv := s.getServer()
6269

6370
gp := gox.GroupWithContextIn(ctx)
6471
for i := range s.runners {
@@ -84,17 +91,43 @@ func (s server) Start(ctx context.Context) error {
8491
return gp.Wait()
8592
}
8693

87-
func (s server) StartedChecker() healthz.Checker {
94+
func (s *server) StartedChecker() healthz.Checker {
8895
addr := net.JoinHostPort(s.host, strconv.Itoa(s.port))
8996
return func(req *http.Request) error {
9097
return netx.IsConnected(req.Context(), "tls", addr, 10*time.Second)
9198
}
9299
}
93100

94-
func (s server) WebhookMux() *http.ServeMux {
101+
func (s *server) WebhookMux() *http.ServeMux {
95102
return s.mux
96103
}
97104

98-
func (s server) HostPort() (string, int, error) {
105+
func (s *server) HostPort() (string, int, error) {
99106
return s.host, s.port, nil
100107
}
108+
109+
func (s *server) NotFoundHandler(notFoundHandler http.Handler) {
110+
s.notFoundHandler = notFoundHandler
111+
}
112+
113+
func (s *server) getNotFoundHandler() http.Handler {
114+
if s.notFoundHandler == nil {
115+
return http.NotFoundHandler()
116+
}
117+
return s.notFoundHandler
118+
}
119+
120+
func (s *server) getServer() *http.Server {
121+
nf := s.getNotFoundHandler()
122+
123+
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
124+
h, pattern := s.mux.Handler(r)
125+
if pattern == "" {
126+
nf.ServeHTTP(w, r)
127+
return
128+
}
129+
h.ServeHTTP(w, r)
130+
})
131+
132+
return newHttpServer(h)
133+
}

pkg/worker/cmd.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@ func NewCommand() *cobra.Command {
1313
o := NewOptions()
1414

1515
c := &cobra.Command{
16-
Use: "worker",
16+
Use: "worker",
17+
Aliases: []string{
18+
"w",
19+
},
1720
Short: "assists in managing GPUStack Kubernetes resources.",
1821
PreRunE: func(c *cobra.Command, args []string) error {
1922
return o.Validate(c.Context())

0 commit comments

Comments
 (0)