Skip to content

Commit df2db32

Browse files
inerplatdonghyun-k
authored andcommitted
Add optional backend dial timeout
Add a configurable backend dial timeout for frontend dial requests. The option defaults to 0 to preserve existing behavior, and operators can opt in to bound DIAL_REQ send and DIAL_RSP wait time. Clean up pending dials on timeout and report backend_dial_timeout as a dial failure reason. Add an HTTP CONNECT regression test for a blocked backend DIAL_REQ send. Signed-off-by: DH Kim <inerplat@gmail.com>
1 parent ecd8d51 commit df2db32

7 files changed

Lines changed: 253 additions & 13 deletions

File tree

cmd/server/app/options/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ type ProxyRunOptions struct {
122122
NeedsKubernetesClient bool
123123
// Graceful shutdown timeout duration
124124
GracefulShutdownTimeout time.Duration
125+
// Backend dial timeout duration for frontend dial requests
126+
BackendDialTimeout time.Duration
125127
}
126128

127129
func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
@@ -164,6 +166,7 @@ func (o *ProxyRunOptions) Flags() *pflag.FlagSet {
164166
flags.StringVar(&o.LeaseNamespace, "lease-namespace", o.LeaseNamespace, "The namespace where lease objects are managed by the controller.")
165167
flags.StringVar(&o.LeaseLabel, "lease-label", o.LeaseLabel, "The labels on which the lease objects are managed.")
166168
flags.DurationVar(&o.GracefulShutdownTimeout, "graceful-shutdown-timeout", o.GracefulShutdownTimeout, "Timeout duration for graceful shutdown of the server. The server will wait for active connections to close before forcefully terminating. Set to 0 to disable graceful shutdown (default: 0).")
169+
flags.DurationVar(&o.BackendDialTimeout, "backend-dial-timeout", o.BackendDialTimeout, "Timeout duration for sending DIAL_REQ packets to backend agent streams and waiting for DIAL_RSP. Set to 0 to disable timeout.")
167170
flags.Bool("warn-on-channel-limit", true, "This behavior is now thread safe and always on. This flag will be removed in a future release.")
168171
flags.MarkDeprecated("warn-on-channel-limit", "This behavior is now thread safe and always on. This flag will be removed in a future release.")
169172

@@ -209,6 +212,7 @@ func (o *ProxyRunOptions) Print() {
209212
klog.V(1).Infof("TLSMinVersion set to %q.\n", o.TLSMinVersion)
210213
klog.V(1).Infof("XfrChannelSize set to %d.\n", o.XfrChannelSize)
211214
klog.V(1).Infof("GracefulShutdownTimeout set to %v.\n", o.GracefulShutdownTimeout)
215+
klog.V(1).Infof("BackendDialTimeout set to %v.\n", o.BackendDialTimeout)
212216
}
213217

214218
func (o *ProxyRunOptions) Validate() error {
@@ -368,6 +372,9 @@ func (o *ProxyRunOptions) Validate() error {
368372
if o.GracefulShutdownTimeout < 0 {
369373
return fmt.Errorf("graceful-shutdown-timeout must be >= 0, got %v", o.GracefulShutdownTimeout)
370374
}
375+
if o.BackendDialTimeout < 0 {
376+
return fmt.Errorf("backend-dial-timeout must be >= 0, got %v", o.BackendDialTimeout)
377+
}
371378

372379
o.NeedsKubernetesClient = usingServiceAccountAuth || o.EnableLeaseController
373380

@@ -414,6 +421,7 @@ func NewProxyRunOptions() *ProxyRunOptions {
414421
LeaseNamespace: "kube-system",
415422
LeaseLabel: "k8s-app=konnectivity-server",
416423
GracefulShutdownTimeout: 0,
424+
BackendDialTimeout: 0,
417425
}
418426
return &o
419427
}

cmd/server/app/options/options_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func TestDefaultServerOptions(t *testing.T) {
6666
assertDefaultValue(t, "XfrChannelSize", defaultServerOptions.XfrChannelSize, 10)
6767
assertDefaultValue(t, "APIContentType", defaultServerOptions.APIContentType, "application/vnd.kubernetes.protobuf")
6868
assertDefaultValue(t, "GracefulShutdownTimeout", defaultServerOptions.GracefulShutdownTimeout, 0*time.Second)
69+
assertDefaultValue(t, "BackendDialTimeout", defaultServerOptions.BackendDialTimeout, 0*time.Second)
6970

7071
}
7172

@@ -220,6 +221,21 @@ func TestValidate(t *testing.T) {
220221
value: 30 * time.Second,
221222
expected: nil,
222223
},
224+
"NegativeBackendDialTimeout": {
225+
field: "BackendDialTimeout",
226+
value: -1 * time.Second,
227+
expected: fmt.Errorf("backend-dial-timeout must be >= 0, got -1s"),
228+
},
229+
"ZeroBackendDialTimeout": {
230+
field: "BackendDialTimeout",
231+
value: 0 * time.Second,
232+
expected: nil,
233+
},
234+
"PositiveBackendDialTimeout": {
235+
field: "BackendDialTimeout",
236+
value: 30 * time.Second,
237+
expected: nil,
238+
},
223239
} {
224240
t.Run(desc, func(t *testing.T) {
225241
testServerOptions := NewProxyRunOptions()

cmd/server/app/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func (p *Proxy) Run(o *options.ProxyRunOptions, stopCh <-chan struct{}) error {
140140
return err
141141
}
142142
p.server = server.NewProxyServer(o.ServerID, ps, o.ServerCount, authOpt, o.XfrChannelSize)
143+
p.server.SetBackendDialTimeout(o.BackendDialTimeout)
143144

144145
frontendStop, err := p.runFrontendServer(ctx, o, p.server)
145146
if err != nil {

pkg/server/metrics/metrics.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ const (
328328
DialFailureUnrecognizedResponse DialFailureReason = "unrecognized_response" // Dial repsonse received for unrecognozide dial ID.
329329
DialFailureSendResponse DialFailureReason = "send_rsp" // Successful dial response from agent, but failed to send to frontend.
330330
DialFailureBackendClose DialFailureReason = "backend_close" // Received a DIAL_CLS from the backend before the dial completed.
331+
DialFailureBackendDialTimeout DialFailureReason = "backend_dial_timeout" // Timed out sending DIAL_REQ to or waiting for DIAL_RSP from the backend.
331332
DialFailureFrontendClose DialFailureReason = "frontend_close" // Received a DIAL_CLS from the frontend before the dial completed.
332333
)
333334

pkg/server/server.go

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,10 @@ const (
9090
ModeHTTPConnect = "http-connect"
9191
)
9292

93+
const defaultBackendDialTimeout = 0
94+
95+
var errBackendDialTimeout = errors.New("timed out waiting for backend dial")
96+
9397
type ProxyClientConnection struct {
9498
Mode string
9599
HTTP io.ReadWriter
@@ -274,6 +278,8 @@ type ProxyServer struct {
274278
// TODO: move strategies into BackendStorage
275279
proxyStrategies []proxystrategies.ProxyStrategy
276280
xfrChannelSize int
281+
282+
backendDialTimeout time.Duration
277283
}
278284

279285
// AgentTokenAuthenticationOptions contains list of parameters required for agent token based authentication
@@ -317,6 +323,64 @@ func (s *ProxyServer) getBackend(reqHost string) (*Backend, error) {
317323
return nil, &ErrNotFound{}
318324
}
319325

326+
func (s *ProxyServer) sendDialRequestToBackend(backend *Backend, pkt *client.Packet) error {
327+
timeout := s.backendDialTimeout
328+
if timeout <= 0 {
329+
return backend.Send(pkt)
330+
}
331+
332+
errCh := make(chan error, 1)
333+
go func() {
334+
errCh <- backend.Send(pkt)
335+
}()
336+
337+
timer := time.NewTimer(timeout)
338+
defer timer.Stop()
339+
340+
ctx := backend.Context()
341+
select {
342+
case err := <-errCh:
343+
return err
344+
case <-ctx.Done():
345+
return ctx.Err()
346+
case <-timer.C:
347+
return errBackendDialTimeout
348+
}
349+
}
350+
351+
func (s *ProxyServer) startPendingDialTimeout(random int64, backend *Backend, frontend *GrpcFrontend) {
352+
timeout := s.backendDialTimeout
353+
if timeout <= 0 {
354+
return
355+
}
356+
357+
go func() {
358+
timer := time.NewTimer(timeout)
359+
defer timer.Stop()
360+
361+
select {
362+
case <-timer.C:
363+
if s.PendingDial.Remove(random) == nil {
364+
return
365+
}
366+
metrics.Metrics.ObserveDialFailure(metrics.DialFailureBackendDialTimeout)
367+
resp := &client.Packet{
368+
Type: client.PacketType_DIAL_RSP,
369+
Payload: &client.Packet_DialResponse{
370+
DialResponse: &client.DialResponse{
371+
Random: random,
372+
Error: errBackendDialTimeout.Error(),
373+
},
374+
},
375+
}
376+
if err := frontend.Send(resp); err != nil {
377+
klog.V(5).InfoS("Failed to send DIAL_RSP for backend dial timeout", "error", err, "dialID", random)
378+
}
379+
case <-backend.Context().Done():
380+
}
381+
}()
382+
}
383+
320384
func (s *ProxyServer) addBackend(backend *Backend) {
321385
// TODO: refactor BackendStorage to acquire lock once, not up to 3 times.
322386
for _, bm := range s.BackendManagers {
@@ -455,12 +519,17 @@ func NewProxyServer(serverID string, proxyStrategies []proxystrategies.ProxyStra
455519
BackendManagers: bms,
456520
AgentAuthenticationOptions: agentAuthenticationOptions,
457521
// use the first backend-manager as the Readiness Manager
458-
Readiness: bms[0],
459-
proxyStrategies: proxyStrategies,
460-
xfrChannelSize: channelSize,
522+
Readiness: bms[0],
523+
proxyStrategies: proxyStrategies,
524+
xfrChannelSize: channelSize,
525+
backendDialTimeout: defaultBackendDialTimeout,
461526
}
462527
}
463528

529+
func (s *ProxyServer) SetBackendDialTimeout(timeout time.Duration) {
530+
s.backendDialTimeout = timeout
531+
}
532+
464533
// Proxy handles incoming streams from gRPC frontend.
465534
func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error {
466535
metrics.Metrics.ConnectionInc(metrics.Proxy)
@@ -613,11 +682,31 @@ func (s *ProxyServer) serveRecvFrontend(frontend *GrpcFrontend, recvCh <-chan *c
613682
backend: backend,
614683
dialAddress: address,
615684
})
616-
if err := backend.Send(pkt); err != nil {
685+
if err := s.sendDialRequestToBackend(backend, pkt); err != nil {
617686
klog.ErrorS(err, "DIAL_REQ to Backend failed", "dialID", random)
618-
} else {
619-
klog.V(5).InfoS("DIAL_REQ sent to backend", "dialID", random)
687+
if s.PendingDial.Remove(random) != nil {
688+
reason := metrics.DialFailureBackendClose
689+
if errors.Is(err, errBackendDialTimeout) {
690+
reason = metrics.DialFailureBackendDialTimeout
691+
}
692+
metrics.Metrics.ObserveDialFailure(reason)
693+
}
694+
resp := &client.Packet{
695+
Type: client.PacketType_DIAL_RSP,
696+
Payload: &client.Packet_DialResponse{
697+
DialResponse: &client.DialResponse{
698+
Random: random,
699+
Error: err.Error(),
700+
},
701+
},
702+
}
703+
if err := frontend.Send(resp); err != nil {
704+
klog.V(5).InfoS("Failed to send DIAL_RSP for backend send failure", "error", err, "dialID", random)
705+
}
706+
return
620707
}
708+
klog.V(5).InfoS("DIAL_REQ sent to backend", "dialID", random)
709+
s.startPendingDialTimeout(random, backend, frontend)
621710

622711
case client.PacketType_CLOSE_REQ:
623712
connID := pkt.GetCloseRequest().ConnectID

pkg/server/server_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@ limitations under the License.
1717
package server
1818

1919
import (
20+
"bufio"
2021
"context"
2122
"fmt"
2223
"io"
24+
"net"
25+
"net/http"
26+
"net/http/httptest"
27+
"net/url"
2328
"reflect"
2429
"sync"
2530
"testing"
@@ -219,6 +224,95 @@ func TestRemovePendingDialForStream(t *testing.T) {
219224
}
220225
}
221226

227+
func TestHTTPConnectTunnelBlockedBackendDialSendTimesOutAndCleansPendingDial(t *testing.T) {
228+
ctrl := gomock.NewController(t)
229+
defer ctrl.Finish()
230+
231+
metrics.Metrics.Reset()
232+
233+
proxyServer := NewProxyServer(uuid.New().String(), []proxystrategies.ProxyStrategy{proxystrategies.ProxyStrategyDefault}, 1, &AgentTokenAuthenticationOptions{}, xfrChannelSize)
234+
proxyServer.SetBackendDialTimeout(500 * time.Millisecond)
235+
236+
agentConn, _ := prepareAgentConnMD(t, ctrl, proxyServer, nil)
237+
sendStarted := make(chan *client.Packet, 1)
238+
releaseSend := make(chan struct{})
239+
sendReleased := make(chan struct{})
240+
agentConn.EXPECT().Send(gomock.AssignableToTypeOf(&client.Packet{})).DoAndReturn(func(pkt *client.Packet) error {
241+
sendStarted <- pkt
242+
<-releaseSend
243+
close(sendReleased)
244+
return nil
245+
}).Times(1)
246+
247+
front := httptest.NewServer(&Tunnel{Server: proxyServer})
248+
defer front.Close()
249+
250+
frontURL, err := url.Parse(front.URL)
251+
if err != nil {
252+
t.Fatalf("failed to parse front URL: %v", err)
253+
}
254+
conn, err := net.Dial("tcp", frontURL.Host)
255+
if err != nil {
256+
t.Fatalf("failed to connect to HTTP CONNECT front: %v", err)
257+
}
258+
defer conn.Close()
259+
260+
if _, err := fmt.Fprintf(conn, "CONNECT 127.0.0.1:443 HTTP/1.1\r\nHost: 127.0.0.1:443\r\n\r\n"); err != nil {
261+
t.Fatalf("failed to write CONNECT request: %v", err)
262+
}
263+
264+
var sent *client.Packet
265+
select {
266+
case sent = <-sendStarted:
267+
case <-time.After(time.Second):
268+
t.Fatal("timed out waiting for backend DIAL_REQ send to start")
269+
}
270+
if sent.Type != client.PacketType_DIAL_REQ {
271+
t.Fatalf("expected DIAL_REQ to backend, got %v", sent.Type)
272+
}
273+
if got := pendingDialCount(proxyServer); got != 1 {
274+
t.Fatalf("expected one pending dial while backend Send is blocked, got %d", got)
275+
}
276+
277+
respCh := make(chan struct {
278+
resp *http.Response
279+
err error
280+
}, 1)
281+
go func() {
282+
resp, err := http.ReadResponse(bufio.NewReader(conn), nil)
283+
respCh <- struct {
284+
resp *http.Response
285+
err error
286+
}{resp: resp, err: err}
287+
}()
288+
289+
var resp *http.Response
290+
select {
291+
case got := <-respCh:
292+
if got.err != nil {
293+
t.Fatalf("failed to read CONNECT response: %v", got.err)
294+
}
295+
resp = got.resp
296+
case <-time.After(2 * time.Second):
297+
t.Fatal("timed out waiting for backend dial timeout response")
298+
}
299+
defer resp.Body.Close()
300+
301+
if resp.StatusCode != http.StatusGatewayTimeout {
302+
t.Fatalf("expected %d for blocked backend dial send, got %s", http.StatusGatewayTimeout, resp.Status)
303+
}
304+
if got := pendingDialCount(proxyServer); got != 0 {
305+
t.Fatalf("expected pending dial to be cleaned after backend dial timeout, got %d", got)
306+
}
307+
308+
close(releaseSend)
309+
select {
310+
case <-sendReleased:
311+
case <-time.After(time.Second):
312+
t.Fatal("timed out waiting for blocked backend Send goroutine to exit")
313+
}
314+
}
315+
222316
func TestAddRemoveFrontends(t *testing.T) {
223317
agent1ConnID1 := new(ProxyClientConnection)
224318
agent1ConnID2 := new(ProxyClientConnection)
@@ -950,6 +1044,12 @@ func assertEstablishedConnsMetric(t testing.TB, expect int) {
9501044
}
9511045
}
9521046

1047+
func pendingDialCount(p *ProxyServer) int {
1048+
p.PendingDial.mu.RLock()
1049+
defer p.PendingDial.mu.RUnlock()
1050+
return len(p.PendingDial.pendingDial)
1051+
}
1052+
9531053
func assertReadyBackendsMetric(t testing.TB, expect int) {
9541054
t.Helper()
9551055
if err := metricstest.DefaultTester.ExpectServerReadyBackends(expect); err != nil {

0 commit comments

Comments
 (0)