Skip to content

Commit b7b0cd0

Browse files
committed
refactor: worker gateway
Signed-off-by: thxCode <thxcode0824@gmail.com>
1 parent 7bf8dd8 commit b7b0cd0

6 files changed

Lines changed: 169 additions & 39 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package httpx
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"io"
7+
"net"
8+
"net/http"
9+
10+
klog "k8s.io/klog/v2"
11+
)
12+
13+
type loggingResponseWriter struct {
14+
http.ResponseWriter
15+
status int
16+
}
17+
18+
func (w *loggingResponseWriter) WriteHeader(code int) {
19+
w.status = code
20+
w.ResponseWriter.WriteHeader(code)
21+
}
22+
23+
func (w *loggingResponseWriter) Write(p []byte) (int, error) {
24+
if w.status == 0 {
25+
w.status = http.StatusOK
26+
}
27+
return w.ResponseWriter.Write(p)
28+
}
29+
30+
func (w *loggingResponseWriter) Flush() {
31+
if flusher, ok := w.ResponseWriter.(http.Flusher); ok {
32+
flusher.Flush()
33+
}
34+
}
35+
36+
func (w *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
37+
hijacker, ok := w.ResponseWriter.(http.Hijacker)
38+
if !ok {
39+
return nil, nil, errors.New("response writer does not support hijacking")
40+
}
41+
return hijacker.Hijack()
42+
}
43+
44+
func (w *loggingResponseWriter) Push(target string, opts *http.PushOptions) error {
45+
pusher, ok := w.ResponseWriter.(http.Pusher)
46+
if !ok {
47+
return http.ErrNotSupported
48+
}
49+
return pusher.Push(target, opts)
50+
}
51+
52+
func (w *loggingResponseWriter) ReadFrom(r io.Reader) (n int64, err error) {
53+
if w.status == 0 {
54+
w.status = http.StatusOK
55+
}
56+
if readerFrom, ok := w.ResponseWriter.(io.ReaderFrom); ok {
57+
return readerFrom.ReadFrom(r)
58+
}
59+
return io.Copy(w.ResponseWriter, r)
60+
}
61+
62+
func (w *loggingResponseWriter) Unwrap() http.ResponseWriter {
63+
return w.ResponseWriter
64+
}
65+
66+
// AccessLog logs request method/path and response status for each request.
67+
func AccessLog(logger klog.Logger, queryAsValues bool) func(http.Handler) http.Handler {
68+
return func(next http.Handler) http.Handler {
69+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
70+
kvs := []any{
71+
"client", r.RemoteAddr,
72+
"method", r.Method,
73+
"path", r.URL.Path,
74+
}
75+
if queryAsValues {
76+
for k, v := range r.URL.Query() {
77+
if k == "token" || k == "password" {
78+
v = []string{"***"}
79+
}
80+
if len(v) == 1 {
81+
kvs = append(kvs, k, v[0])
82+
} else {
83+
kvs = append(kvs, k, v)
84+
}
85+
}
86+
}
87+
slg := logger.WithValues(kvs...)
88+
slg.Info("request")
89+
rw := &loggingResponseWriter{ResponseWriter: w}
90+
next.ServeHTTP(rw, r.WithContext(klog.NewContext(r.Context(), slg)))
91+
if rw.status == 0 {
92+
rw.status = http.StatusOK
93+
}
94+
if rw.status >= 400 {
95+
slg.Error(nil, "response", "status", rw.status)
96+
} else {
97+
slg.V(2).Info("response", "status", rw.status)
98+
}
99+
})
100+
}
101+
}

pkg/workergateway/manager/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
type (
1111
// ConstructRestConfigFunc defines a function type for constructing rest.Config for a given cluster.
12-
ConstructRestConfigFunc = func(cluster string) (*rest.Config, error)
12+
ConstructRestConfigFunc = func(cluster, token string) (*rest.Config, error)
1313

1414
Config struct {
1515
ConstructRestConfig ConstructRestConfigFunc

pkg/workergateway/manager/manager.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type (
6868
Manager interface {
6969
// SubscribeWorker subscribes a worker for the given cluster.
7070
// If force is true, it will unsubscribe the existing worker before subscribing a new one.
71-
SubscribeWorker(ctx context.Context, cluster string, force bool) error
71+
SubscribeWorker(ctx context.Context, cluster, token string, force bool) error
7272
// UnsubscribeWorker unsubscribes the worker for the given cluster.
7373
UnsubscribeWorker(ctx context.Context, cluster string)
7474
// ListWorkers lists all subscribed workers with their status.
@@ -140,17 +140,17 @@ func New(ctx context.Context, config *Config) (Manager, error) {
140140
}, nil
141141
}
142142

143-
func (wm *_Manager) SubscribeWorker(ctx context.Context, cluster string, force bool) error {
143+
func (wm *_Manager) SubscribeWorker(ctx context.Context, cluster, token string, force bool) error {
144144
logger := wm.Logger.WithValues("cluster", cluster)
145145

146146
if force {
147147
wm.UnsubscribeWorker(ctx, cluster)
148148
} else if wm.hasWorker(cluster) {
149-
logger.Info("worker already exists, skip")
149+
logger.V(2).Info("worker already exists, skip")
150150
return nil
151151
}
152152

153-
cfg, err := wm.ConstructRestConfig(cluster)
153+
cfg, err := wm.ConstructRestConfig(cluster, token)
154154
if err != nil {
155155
logger.Error(err, "construct rest config")
156156
return fmt.Errorf("construct rest config for cluster %q: %w", cluster, err)
@@ -172,6 +172,7 @@ func (wm *_Manager) SubscribeWorker(ctx context.Context, cluster string, force b
172172
default:
173173
}
174174

175+
logger.Info("checking api services")
175176
if err := apis.WaitForServicesReady(wkCtx, cli); err != nil {
176177
logger.Error(err, "wait for api services ready")
177178
continue
@@ -441,7 +442,7 @@ func registerEventHandler(
441442
DeleteFunc: func(obj any) { publishEvent(WorkerEventDeleted, obj) },
442443
}
443444
opts := cache.HandlerOptions{
444-
Logger: ptr.To(klog.FromContext(ctx).WithValues("gvk", gvk)),
445+
Logger: ptr.To(klog.FromContext(ctx).WithValues("gvk", gvk).V(4)),
445446
}
446447

447448
_, _ = inf.AddEventHandlerWithOptions(handler, opts)

pkg/workergateway/manager/option.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (o *Options) Complete(_ context.Context) (*Config, error) {
115115
}
116116

117117
func constructRestConfigByGPUStackAPI(port int, o *Options) ConstructRestConfigFunc {
118-
return func(cluster string) (*rest.Config, error) {
118+
return func(cluster, token string) (*rest.Config, error) {
119119
scheme := o.WorkerConnGPUStackAPIScheme
120120
if scheme == "" {
121121
scheme = "http"
@@ -125,7 +125,8 @@ func constructRestConfigByGPUStackAPI(port int, o *Options) ConstructRestConfigF
125125
}
126126

127127
cfg := &rest.Config{
128-
Host: fmt.Sprintf("%s://localhost:%d/v2/clusters/%s/proxy", scheme, port, cluster),
128+
Host: fmt.Sprintf("%s://localhost:%d/v2/clusters/%s/proxy", scheme, port, cluster),
129+
BearerToken: token,
129130
}
130131
cfg.UserAgent = version.GetUserAgent()
131132
cfg.Timeout = o.KubeConnTimeout
@@ -152,7 +153,7 @@ func constructRestConfigByLoopback(o *Options) (ConstructRestConfigFunc, error)
152153
cfg.Burst = o.KubeConnBurst
153154
cfg.ContentType = o.KubeContentType
154155

155-
return func(_ string) (*rest.Config, error) {
156+
return func(_, _ string) (*rest.Config, error) {
156157
return cfg, nil
157158
}, nil
158159
}

pkg/workergateway/service/helper.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ type ListAggregateInstanceTypes struct {
1919

2020
func OpListAggregateInstanceTypes() *ListAggregateInstanceTypes {
2121
return &ListAggregateInstanceTypes{
22+
list: AggregatedInstanceTypeList{
23+
Items: make([]AggregatedInstanceType, 0),
24+
},
2225
itemIndexer: make(map[AggregatedInstanceTypeSpec]int),
2326
}
2427
}
@@ -359,7 +362,11 @@ type ListClusterInstanceTypes struct {
359362
}
360363

361364
func OpListClusterInstanceTypes() *ListClusterInstanceTypes {
362-
return &ListClusterInstanceTypes{}
365+
return &ListClusterInstanceTypes{
366+
list: ClusterInstanceTypeList{
367+
Items: make([]ClusterInstanceType, 0),
368+
},
369+
}
363370
}
364371

365372
func (in *ListClusterInstanceTypes) Next(cluster string, obj runtime.Object) error {
@@ -411,7 +418,11 @@ type ListClusterInstances struct {
411418
}
412419

413420
func OpListClusterInstances() *ListClusterInstances {
414-
return &ListClusterInstances{}
421+
return &ListClusterInstances{
422+
list: ClusterInstanceList{
423+
Items: make([]ClusterInstance, 0),
424+
},
425+
}
415426
}
416427

417428
func (in *ListClusterInstances) Next(cluster string, obj runtime.Object) error {
@@ -468,7 +479,11 @@ type ListClusterInstancePersistentVolumeTypes struct {
468479
}
469480

470481
func OpListClusterInstancePersistentVolumeTypes() *ListClusterInstancePersistentVolumeTypes {
471-
return &ListClusterInstancePersistentVolumeTypes{}
482+
return &ListClusterInstancePersistentVolumeTypes{
483+
list: ClusterInstancePersistentVolumeTypeList{
484+
Items: make([]ClusterInstancePersistentVolumeType, 0),
485+
},
486+
}
472487
}
473488

474489
func (in *ListClusterInstancePersistentVolumeTypes) Next(cluster string, obj runtime.Object) error {
@@ -577,7 +592,11 @@ type ListClusterInstanceImagePullSecrets struct {
577592
}
578593

579594
func OpListClusterInstanceImagePullSecrets() *ListClusterInstanceImagePullSecrets {
580-
return &ListClusterInstanceImagePullSecrets{}
595+
return &ListClusterInstanceImagePullSecrets{
596+
list: ClusterInstanceImagePullSecretList{
597+
Items: make([]ClusterInstanceImagePullSecret, 0),
598+
},
599+
}
581600
}
582601

583602
func (in *ListClusterInstanceImagePullSecrets) Next(cluster string, obj runtime.Object) error {
@@ -634,7 +653,11 @@ type ListClusterInstanceSSHPublicKeys struct {
634653
}
635654

636655
func OpListClusterInstanceSSHPublicKeys() *ListClusterInstanceSSHPublicKeys {
637-
return &ListClusterInstanceSSHPublicKeys{}
656+
return &ListClusterInstanceSSHPublicKeys{
657+
list: ClusterInstanceSSHPublicKeyList{
658+
Items: make([]ClusterInstanceSSHPublicKey, 0),
659+
},
660+
}
638661
}
639662

640663
func (in *ListClusterInstanceSSHPublicKeys) Next(cluster string, obj runtime.Object) error {

0 commit comments

Comments
 (0)