Skip to content

Commit 6cdbd2f

Browse files
authored
Merge pull request #337 from nais/haproxy_http2_fix
fix(hookd): recover clusters after dispatch stream failure
2 parents e90640a + c0f6b72 commit 6cdbd2f

6 files changed

Lines changed: 235 additions & 0 deletions

File tree

.envrc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
use flake

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@
2121
.env
2222

2323
/.testbin/
24+
/.direnv/

flake.lock

Lines changed: 81 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flake.nix

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
{
2+
inputs = {
3+
flake-utils.url = "github:numtide/flake-utils";
4+
nixpkgs.url = "nixpkgs/nixos-unstable";
5+
treefmt-nix = {
6+
url = "github:numtide/treefmt-nix";
7+
inputs.nixpkgs.follows = "nixpkgs";
8+
};
9+
};
10+
11+
outputs =
12+
inputs:
13+
inputs.flake-utils.lib.eachDefaultSystem (
14+
system:
15+
let
16+
pkgs = import inputs.nixpkgs {
17+
localSystem = {
18+
inherit system;
19+
};
20+
};
21+
treefmt = inputs.treefmt-nix.lib.evalModule pkgs {
22+
projectRootFile = "flake.nix";
23+
programs.nixfmt.enable = true;
24+
};
25+
envtest-bins = pkgs.symlinkJoin {
26+
name = "envtest-bins";
27+
paths = [
28+
pkgs.etcd
29+
pkgs.kubernetes
30+
];
31+
};
32+
deploy = pkgs.buildGoModule {
33+
pname = "deploy";
34+
version = "0.0.0";
35+
src = ./.;
36+
vendorHash = "sha256-afFOb7DpB4gGyFErwM3lROMU2E1GVlT7+nSLU4zAV8E=";
37+
38+
subPackages = [
39+
"cmd/crypt"
40+
"cmd/deploy"
41+
"cmd/deployd"
42+
"cmd/hookd"
43+
"cmd/leakdetect"
44+
];
45+
46+
nativeBuildInputs = with pkgs; [
47+
protobuf
48+
protoc-gen-go
49+
protoc-gen-go-grpc
50+
];
51+
52+
doCheck = false;
53+
};
54+
in
55+
{
56+
packages.default = deploy;
57+
58+
devShells.default = pkgs.mkShell {
59+
inputsFrom = [
60+
deploy
61+
envtest-bins
62+
];
63+
shellHook = ''
64+
export KUBEBUILDER_ASSETS="${envtest-bins}/bin"
65+
mkdir -p .testbin
66+
ln -sfn ${pkgs.etcd}/bin/etcd .testbin/etcd
67+
ln -sfn ${pkgs.kubernetes}/bin/kube-apiserver .testbin/kube-apiserver
68+
ln -sfn ${pkgs.kubernetes}/bin/kubectl .testbin/kubectl
69+
'';
70+
};
71+
72+
formatter = treefmt.config.build.wrapper;
73+
}
74+
);
75+
}

pkg/grpc/dispatchserver/dispatchserver.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ func (s *dispatchServer) Deployments(opts *pb.GetDeploymentOpts, stream pb.Dispa
129129
case req := <-c:
130130
err := stream.Send(req.request)
131131
req.wait <- err
132+
if err != nil {
133+
return err
134+
}
132135
case <-time.After(30 * time.Minute):
133136
log.Warnf("Connection from cluster '%s' timed out", opts.Cluster)
134137
return fmt.Errorf("timeout")

pkg/grpc/dispatchserver/dispatchserver_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,84 @@ import (
1616
"google.golang.org/grpc"
1717
"google.golang.org/grpc/codes"
1818
"google.golang.org/grpc/credentials/insecure"
19+
"google.golang.org/grpc/metadata"
1920
"google.golang.org/grpc/status"
2021
"google.golang.org/grpc/test/bufconn"
2122
)
2223

24+
type erringDeploymentsStream struct {
25+
ctx context.Context
26+
}
27+
28+
func (s *erringDeploymentsStream) Send(*pb.DeploymentRequest) error {
29+
return status.Error(codes.Internal, "stream terminated by RST_STREAM with error code: INTERNAL_ERROR")
30+
}
31+
32+
func (s *erringDeploymentsStream) Context() context.Context { return s.ctx }
33+
34+
func (s *erringDeploymentsStream) SetHeader(metadata.MD) error { return nil }
35+
36+
func (s *erringDeploymentsStream) SendHeader(metadata.MD) error { return nil }
37+
38+
func (s *erringDeploymentsStream) SetTrailer(metadata.MD) {}
39+
40+
func (s *erringDeploymentsStream) SendMsg(any) error { return nil }
41+
42+
func (s *erringDeploymentsStream) RecvMsg(any) error { return nil }
43+
44+
func TestDeploymentsUnregistersClusterWhenSendFails(t *testing.T) {
45+
ctx, cancel := context.WithCancel(context.Background())
46+
defer cancel()
47+
_, _ = telemetry.New(ctx, "test", "")
48+
49+
deploymentStore := database.MockDeploymentStore{}
50+
deploymentStore.On("HistoricDeployments", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
51+
52+
mockApiClients, _ := apiclient.NewMockClient(t)
53+
ds := New(&deploymentStore, mockApiClients.Deployments()).(*dispatchServer)
54+
55+
stream := &erringDeploymentsStream{ctx: ctx}
56+
done := make(chan error, 1)
57+
go func() {
58+
done <- ds.Deployments(&pb.GetDeploymentOpts{Cluster: "dev", StartupTime: pb.TimeAsTimestamp(time.Now())}, stream)
59+
}()
60+
61+
requireEventually(t, time.Second, func() bool {
62+
return len(ds.onlineClusters()) == 1
63+
})
64+
65+
err := ds.SendDeploymentRequest(ctx, &pb.DeploymentRequest{Cluster: "dev"})
66+
if status.Code(err) != codes.Internal {
67+
t.Fatalf("expected send error to be propagated as Internal, got %v", err)
68+
}
69+
70+
requireEventually(t, time.Second, func() bool {
71+
return len(ds.onlineClusters()) == 0
72+
})
73+
74+
select {
75+
case err := <-done:
76+
if err == nil {
77+
t.Fatal("expected Deployments to exit with send error")
78+
}
79+
case <-time.After(time.Second):
80+
t.Fatal("Deployments did not exit after stream Send failed")
81+
}
82+
}
83+
84+
func requireEventually(t *testing.T, timeout time.Duration, condition func() bool) {
85+
t.Helper()
86+
87+
deadline := time.Now().Add(timeout)
88+
for time.Now().Before(deadline) {
89+
if condition() {
90+
return
91+
}
92+
time.Sleep(10 * time.Millisecond)
93+
}
94+
t.Fatal("condition was not satisfied before timeout")
95+
}
96+
2397
func bufDialer(b *bufconn.Listener) func(context.Context, string) (net.Conn, error) {
2498
return func(context.Context, string) (net.Conn, error) {
2599
return b.Dial()

0 commit comments

Comments
 (0)