Skip to content

Commit c3b0b0f

Browse files
authored
fix: add runtime tcp readiness probes (#88)
1 parent df4b761 commit c3b0b0f

4 files changed

Lines changed: 96 additions & 14 deletions

File tree

internal/runtime/kubernetes/procedure_attempts.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (b *Backend) resumeRestartProcedureIndex(ctx context.Context, root string,
8080
return resumeIndex, nil
8181
}
8282

83-
func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, env map[string]string) (*batchv1.Job, error) {
83+
func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace string, root string, commandName string, procedureName string, baseName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string) (*batchv1.Job, error) {
8484
_, pvc, err := parseRef(root)
8585
if err != nil {
8686
return nil, err
@@ -134,7 +134,7 @@ func (b *Backend) createOrReuseProcedureJob(ctx context.Context, namespace strin
134134
return active, nil
135135
}
136136
name := procedureAttemptName(baseName, nextAttempt)
137-
job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, env, b.config.RegistrySecret)
137+
job, err := procedureJobSpec(namespace, root, commandName, procedureName, name, nextAttempt, procedure, globalPorts, env, b.config.RegistrySecret)
138138
if err != nil {
139139
return nil, err
140140
}

internal/runtime/kubernetes/procedures.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ func (b *Backend) runJobProcedure(scrollID string, commandName string, procedure
179179
zap.Int("expected_ports", len(procedure.ExpectedPorts)),
180180
zap.Int("mounts", len(procedure.Mounts)),
181181
)
182-
createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, env)
182+
createdJob, err := b.createOrReuseProcedureJob(ctx, namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env)
183183
if err != nil {
184184
logger.Log().Error("Failed to create Kubernetes job procedure", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.String("base_job", resourceName), zap.Error(err))
185185
return nil, err
@@ -237,7 +237,7 @@ func (b *Backend) ensurePersistentProcedure(ctx context.Context, scrollID string
237237
logger.Log().Error("Kubernetes persistent procedure root ref invalid", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("root", root), zap.Error(err))
238238
return err
239239
}
240-
statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, env, b.config.RegistrySecret)
240+
statefulSet, err := procedureStatefulSetSpec(namespace, root, commandName, procedureName, resourceName, procedure, globalPorts, env, b.config.RegistrySecret)
241241
if err != nil {
242242
logger.Log().Error("Failed to build Kubernetes persistent procedure StatefulSet", zap.String("scroll_id", scrollID), zap.String("command", commandName), zap.String("procedure", procedureName), zap.String("namespace", namespace), zap.Error(err))
243243
return err

internal/runtime/kubernetes/resources.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func helperJobSpec(namespace string, jobName string, pvc string, image string, c
148148
}
149149
}
150150

151-
func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, env map[string]string, registrySecret string) (*batchv1.Job, error) {
151+
func procedureJobSpec(namespace string, root string, commandName string, procedureName string, resourceName string, attempt int, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*batchv1.Job, error) {
152152
_, pvc, err := parseRef(root)
153153
if err != nil {
154154
return nil, err
@@ -173,6 +173,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu
173173
Env: envVars(env),
174174
VolumeMounts: volumeMounts(procedure.Mounts),
175175
}
176+
applyProcedureReadiness(&container, procedure, globalPorts)
176177
podSpec := corev1.PodSpec{
177178
RestartPolicy: corev1.RestartPolicyNever,
178179
Containers: []corev1.Container{container},
@@ -197,7 +198,7 @@ func procedureJobSpec(namespace string, root string, commandName string, procedu
197198
}, nil
198199
}
199200

200-
func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) {
201+
func procedureStatefulSetSpec(namespace string, root string, commandName string, procedureName string, resourceName string, procedure *domain.Procedure, globalPorts []domain.Port, env map[string]string, registrySecret string) (*appsv1.StatefulSet, error) {
201202
_, pvc, err := parseRef(root)
202203
if err != nil {
203204
return nil, err
@@ -221,6 +222,7 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string,
221222
Env: envVars(env),
222223
VolumeMounts: volumeMounts(procedure.Mounts),
223224
}
225+
applyProcedureReadiness(&container, procedure, globalPorts)
224226
podSpec := corev1.PodSpec{
225227
Containers: []corev1.Container{container},
226228
Volumes: []corev1.Volume{pvcVolume("data", pvc)},
@@ -246,6 +248,38 @@ func procedureStatefulSetSpec(namespace string, root string, commandName string,
246248
}, nil
247249
}
248250

251+
func applyProcedureReadiness(container *corev1.Container, procedure *domain.Procedure, globalPorts []domain.Port) {
252+
port, ok := firstTCPExpectedPort(procedure, globalPorts)
253+
if !ok {
254+
return
255+
}
256+
container.ReadinessProbe = &corev1.Probe{
257+
ProbeHandler: corev1.ProbeHandler{
258+
TCPSocket: &corev1.TCPSocketAction{Port: intstr.FromInt(port.Port)},
259+
},
260+
PeriodSeconds: 2,
261+
FailureThreshold: 3,
262+
}
263+
}
264+
265+
func firstTCPExpectedPort(procedure *domain.Procedure, globalPorts []domain.Port) (domain.Port, bool) {
266+
if procedure == nil {
267+
return domain.Port{}, false
268+
}
269+
ports := portsByName(globalPorts)
270+
for _, expected := range procedure.ExpectedPorts {
271+
port, ok := ports[expected.Name]
272+
if !ok {
273+
continue
274+
}
275+
if normalizeProtocol(port.Protocol) == "udp" {
276+
continue
277+
}
278+
return port, true
279+
}
280+
return domain.Port{}, false
281+
}
282+
249283
func devStatefulSetSpec(namespace string, root string, pvc string, image string, action ports.RuntimeDevAction, registrySecret string) *appsv1.StatefulSet {
250284
labels := baseLabels(pvc)
251285
labels[labelProcedure] = "dev"

internal/runtime/kubernetes/resources_test.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestProcedureJobSpecBuildsDeterministicMountsAndLabels(t *testing.T) {
9595
Mounts: []domain.Mount{{Path: "/work", SubPath: "cache"}},
9696
}
9797

98-
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, procedure.Env, "registry-secret")
98+
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, procedure.Env, "registry-secret")
9999
if err != nil {
100100
t.Fatal(err)
101101
}
@@ -129,7 +129,7 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) {
129129
"PROCEDURE_ONLY": "ignored",
130130
},
131131
}
132-
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, map[string]string{
132+
job, err := procedureJobSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", 1, procedure, nil, map[string]string{
133133
"DRUID_PORT_HTTP": "8080",
134134
}, "registry-secret")
135135
if err != nil {
@@ -141,14 +141,59 @@ func TestProcedureJobSpecUsesProvidedRuntimeEnv(t *testing.T) {
141141
}
142142
}
143143

144+
func TestProcedureJobSpecAddsTCPReadinessProbe(t *testing.T) {
145+
procedure := &domain.Procedure{
146+
Image: "itzg/minecraft-server",
147+
ExpectedPorts: []domain.ExpectedPort{{Name: "main"}},
148+
}
149+
job, err := procedureJobSpec("druid", ref("druid", "druid-minecraft-data"), "start", "start", "minecraft-start-0", 1, procedure, []domain.Port{{Name: "main", Protocol: "tcp", Port: 25565}}, nil, "registry-secret")
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
if probe := job.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 25565 {
154+
t.Fatalf("readiness probe = %#v, want tcp 25565", probe)
155+
}
156+
}
157+
158+
func TestProcedureReadinessProbeSkipsUDPOnlyPorts(t *testing.T) {
159+
procedure := &domain.Procedure{
160+
Image: "steam",
161+
ExpectedPorts: []domain.ExpectedPort{{Name: "query"}},
162+
}
163+
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{{Name: "query", Protocol: "udp", Port: 27015}}, nil, "registry-secret")
164+
if err != nil {
165+
t.Fatal(err)
166+
}
167+
if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe != nil {
168+
t.Fatalf("readiness probe = %#v, want nil for udp-only ports", probe)
169+
}
170+
}
171+
172+
func TestProcedureReadinessProbeUsesFirstTCPExpectedPort(t *testing.T) {
173+
procedure := &domain.Procedure{
174+
Image: "steam",
175+
ExpectedPorts: []domain.ExpectedPort{{Name: "query"}, {Name: "rcon"}},
176+
}
177+
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-game-data"), "start", "start", "game-start-0", procedure, []domain.Port{
178+
{Name: "query", Protocol: "udp", Port: 27015},
179+
{Name: "rcon", Protocol: "tcp", Port: 27020},
180+
}, nil, "registry-secret")
181+
if err != nil {
182+
t.Fatal(err)
183+
}
184+
if probe := statefulSet.Spec.Template.Spec.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 27020 {
185+
t.Fatalf("readiness probe = %#v, want tcp 27020", probe)
186+
}
187+
}
188+
144189
func TestProcedureStatefulSetSpecUsesProvidedRuntimeEnv(t *testing.T) {
145190
procedure := &domain.Procedure{
146191
Image: "nginx:1.27",
147192
Env: map[string]string{
148193
"PROCEDURE_ONLY": "ignored",
149194
},
150195
}
151-
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, map[string]string{
196+
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, nil, map[string]string{
152197
"DRUID_PORT_HTTP": "8080",
153198
}, "registry-secret")
154199
if err != nil {
@@ -168,7 +213,7 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) {
168213
Mounts: []domain.Mount{{Path: "/usr/share/nginx/html", SubPath: "site", ReadOnly: true}},
169214
}
170215

171-
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, procedure.Env, "registry-secret")
216+
statefulSet, err := procedureStatefulSetSpec("druid", ref("druid", "druid-static-web-data"), "start", "start", "static-web-start-0", procedure, []domain.Port{{Name: "http", Protocol: "tcp", Port: 8080}}, procedure.Env, "registry-secret")
172217
if err != nil {
173218
t.Fatal(err)
174219
}
@@ -189,6 +234,9 @@ func TestProcedureStatefulSetSpecBuildsPersistentWorkload(t *testing.T) {
189234
if len(pod.ImagePullSecrets) != 1 || pod.ImagePullSecrets[0].Name != "registry-secret" {
190235
t.Fatalf("image pull secrets = %#v", pod.ImagePullSecrets)
191236
}
237+
if probe := pod.Containers[0].ReadinessProbe; probe == nil || probe.TCPSocket == nil || probe.TCPSocket.Port.IntVal != 8080 {
238+
t.Fatalf("readiness probe = %#v, want tcp 8080", probe)
239+
}
192240
container := pod.Containers[0]
193241
if container.Image != "nginx:1.27" {
194242
t.Fatalf("image = %s", container.Image)
@@ -338,7 +386,7 @@ func TestCreateOrReuseProcedureJobRetainsFailedBaseAndCreatesRetry(t *testing.T)
338386
})
339387
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)
340388

341-
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil)
389+
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil)
342390
if err != nil {
343391
t.Fatal(err)
344392
}
@@ -359,7 +407,7 @@ func TestCreateOrReuseProcedureJobUsesNextRetryAttempt(t *testing.T) {
359407
)
360408
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)
361409

362-
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil)
410+
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "start.1", base, &domain.Procedure{Image: "alpine"}, nil, nil)
363411
if err != nil {
364412
t.Fatal(err)
365413
}
@@ -382,7 +430,7 @@ func TestCreateOrReuseProcedureJobReusesActiveAttempt(t *testing.T) {
382430
client := fake.NewSimpleClientset(active)
383431
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)
384432

385-
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil)
433+
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "start", "coldstart", base, &domain.Procedure{Image: "alpine"}, nil, nil)
386434
if err != nil {
387435
t.Fatal(err)
388436
}
@@ -478,7 +526,7 @@ func TestCreateOrReuseProcedureJobDeletesSucceededAttempt(t *testing.T) {
478526
})
479527
backend := NewWithClient(Config{Namespace: "druid"}, coreservices.NewConsoleManager(coreservices.NewLogManager()), client)
480528

481-
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil)
529+
created, err := backend.createOrReuseProcedureJob(context.Background(), "druid", root, "install", "install", base, &domain.Procedure{Image: "alpine"}, nil, nil)
482530
if err != nil {
483531
t.Fatal(err)
484532
}

0 commit comments

Comments
 (0)