diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 6601fae5..86728f4d 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -262,11 +262,11 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) { } proxy.kubernetesAPIServerProxy.rwLock.Lock() - proxy.kubernetesAPIServerProxy.httpClient = append(proxy.kubernetesAPIServerProxy.httpClient, &http.Client{Transport: k8sTransport}) + proxy.kubernetesAPIServerProxy.Clients = append(proxy.kubernetesAPIServerProxy.Clients, &ApiClient{client: &http.Client{Transport: k8sTransport}, bearerToken: c.BearerToken}) proxy.kubernetesAPIServerProxy.rwLock.Unlock() proxy.kubesphereAPIServerProxy.rwLock.Lock() - proxy.kubesphereAPIServerProxy.httpClient = append(proxy.kubesphereAPIServerProxy.httpClient, &http.Client{Transport: ksTransport}) + proxy.kubesphereAPIServerProxy.Clients = append(proxy.kubesphereAPIServerProxy.Clients, &ApiClient{client: &http.Client{Transport: ksTransport}, bearerToken: c.BearerToken}) proxy.kubesphereAPIServerProxy.rwLock.Unlock() } @@ -283,11 +283,11 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) { proxy.kubernetesAPIServerProxy.rwLock.Lock() defer proxy.kubernetesAPIServerProxy.rwLock.Unlock() - k8sLen := len(proxy.kubernetesAPIServerProxy.httpClient) + k8sLen := len(proxy.kubernetesAPIServerProxy.Clients) proxy.kubesphereAPIServerProxy.rwLock.Lock() defer proxy.kubesphereAPIServerProxy.rwLock.Unlock() - ksLen := len(proxy.kubesphereAPIServerProxy.httpClient) + ksLen := len(proxy.kubesphereAPIServerProxy.Clients) // httpClientLength <= 1 means there is not enough agent connection // we need to delete the key, call cancel(), update cluster status @@ -300,18 +300,18 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) { return s.Update(client, false) }) } else { - for i, v := range proxy.kubernetesAPIServerProxy.httpClient { - if v.Transport == k8sTransport { - proxy.kubernetesAPIServerProxy.httpClient = append(proxy.kubernetesAPIServerProxy.httpClient[:i], - proxy.kubernetesAPIServerProxy.httpClient[i+1:]...) + for i, v := range proxy.kubernetesAPIServerProxy.Clients { + if v.client.Transport == k8sTransport { + proxy.kubernetesAPIServerProxy.Clients = append(proxy.kubernetesAPIServerProxy.Clients[:i], + proxy.kubernetesAPIServerProxy.Clients[i+1:]...) break } } - for i, v := range proxy.kubesphereAPIServerProxy.httpClient { - if v.Transport == ksTransport { - proxy.kubesphereAPIServerProxy.httpClient = append(proxy.kubesphereAPIServerProxy.httpClient[:i], - proxy.kubesphereAPIServerProxy.httpClient[i+1:]...) + for i, v := range proxy.kubesphereAPIServerProxy.Clients { + if v.client.Transport == ksTransport { + proxy.kubesphereAPIServerProxy.Clients = append(proxy.kubesphereAPIServerProxy.Clients[:i], + proxy.kubesphereAPIServerProxy.Clients[i+1:]...) break } } diff --git a/pkg/proxy/proxy_server.go b/pkg/proxy/proxy_server.go index ad90f38d..e46875e0 100644 --- a/pkg/proxy/proxy_server.go +++ b/pkg/proxy/proxy_server.go @@ -19,6 +19,12 @@ import ( "kubesphere.io/tower/pkg/utils" ) +// Each kubernetesClient has a bearerToken. The bearerToken has an expiration date. +type ApiClient struct { + client *http.Client + bearerToken []byte +} + type Server struct { // Server name used to identify name string @@ -35,17 +41,14 @@ type Server struct { // server *http.Server - // http client to do the real proxy - httpClient []*http.Client + // kubernetes client to do the real proxy + Clients []*ApiClient // RWMutex to implement safe operation while read or update httpClient Slice rwLock sync.RWMutex // Whether to use bearer token, if false, need to pass TLS client certificates useBearerToken bool - - // Bearer token to do oauth - bearerToken []byte } func newProxyServer(name, host, scheme string, port uint16, useBearerToken bool, transport *http.Transport, servertlsConfig *tls.Config, bearerToken []byte) (*Server, error) { @@ -60,11 +63,13 @@ func newProxyServer(name, host, scheme string, port uint16, useBearerToken bool, scheme: scheme, port: port, server: server, - httpClient: []*http.Client{ - {Transport: transport}, + Clients: []*ApiClient{ + { + client: &http.Client{Transport: transport}, + bearerToken: bearerToken, + }, }, useBearerToken: useBearerToken, - bearerToken: bearerToken, }, nil } @@ -168,17 +173,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { u.Host = s.host u.Scheme = s.scheme - if s.useBearerToken && len(s.bearerToken) > 0 { - req = utilnet.CloneRequest(req) - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.bearerToken)) - } - // we choose one httpClient randomly rand.Seed(time.Now().UnixNano()) s.rwLock.RLock() - index := rand.Intn(len(s.httpClient)) - klog.V(5).Infof("server %s current agent connection length %d, random slice index %d", s.name, len(s.httpClient), index) - httpProxy := k8sproxy.NewUpgradeAwareHandler(&u, s.httpClient[index].Transport, false, false, s) + index := rand.Intn(len(s.Clients)) + if s.useBearerToken && len(s.Clients[index].bearerToken) > 0 { + req = utilnet.CloneRequest(req) + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.Clients[index].bearerToken)) + } + klog.V(5).Infof("server %s current agent connection length %d, random slice index %d", s.name, len(s.Clients), index) + httpProxy := k8sproxy.NewUpgradeAwareHandler(&u, s.Clients[index].client.Transport, false, false, s) s.rwLock.RUnlock() httpProxy.ServeHTTP(w, req) }