Skip to content

Commit 73248f6

Browse files
committed
A submitted Kubernetes job has succeeded with this code.
Error handling still incomplete.
1 parent 034c329 commit 73248f6

1 file changed

Lines changed: 80 additions & 34 deletions

File tree

app/org/thp/cortex/services/K8sJobRunnerSrv.scala

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import play.api.{Configuration, Logger}
1515
import akka.actor.ActorSystem
1616
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, ConfigBuilder, Watcher}
1717
import io.fabric8.kubernetes.api.model.batch.{Job => KJob, JobBuilder => KJobBuilder}
18-
18+
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
1919
// import com.spotify.docker.client.DockerClient.LogsParam
2020
// import com.spotify.docker.client.messages.HostConfig.Bind
2121
// import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
@@ -58,42 +58,88 @@ class K8sJobRunnerSrv(
5858
}.get
5959

6060
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
61-
val kjob = new KJobBuilder()
62-
.withApiVersion("batch/v1")
63-
.withNewMetadata()
64-
.withName(job.workerId())
65-
.withLabels(Map("cortex_workerId" -> job.workerId()).asJava)
66-
.endMetadata()
67-
.withNewSpec()
68-
.withNewTemplate()
69-
.withNewSpec()
70-
.addNewContainer()
71-
.withName("neuron")
72-
.withImage(dockerImage)
73-
.withArgs("/job")
74-
.endContainer()
75-
.withRestartPolicy("Never")
76-
.endSpec()
61+
// Spicy meatball: under Kubernetes, executions can fail for reasons
62+
// other than bad inputs, for example, if a node dies. So we maybe
63+
// can't say, "Kubernetes, only do this once! I will do any
64+
// necessary retrying." But there may be quotas on analyzer usage,
65+
// and retrying outside Cortex may use them up.
66+
67+
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
68+
69+
// FIXME: this collapses case, jeopardizing the uniqueness of the
70+
// identifier.
71+
val kname = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
72+
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
73+
.withClaimName(persistentVolumeClaimName)
74+
.withReadOnly(false)
75+
.build();
76+
val kjob1 = new KJobBuilder()
77+
.withApiVersion("batch/v1")
78+
.withNewMetadata()
79+
.withName(kname)
80+
.withLabels(Map(
81+
"cortex-job-id" -> job.id,
82+
"cortex-worker-id" -> job.workerId(),
83+
"cortex-job" -> "true").asJava)
84+
.endMetadata()
85+
.withNewSpec()
86+
.withNewTemplate()
87+
.withNewSpec()
88+
.addNewVolume()
89+
.withName("job-directory")
90+
.withPersistentVolumeClaim(pvcvs)
91+
.endVolume()
92+
.addNewContainer()
93+
.withName("neuron")
94+
.withImage(dockerImage)
95+
.withArgs("/job")
96+
.addNewEnv()
97+
.withName("CORTEX_JOB_FOLDER")
98+
.withValue(jobBaseDirectory.relativize(jobDirectory).toString())
99+
.endEnv();
100+
val kjob2 = if (Files.exists(cacertsFile)) {
101+
kjob1.addNewEnv()
102+
.withName("REQUESTS_CA_BUNDLE")
103+
.withValue("/job/input/cacerts")
104+
.endEnv()
105+
} else {
106+
kjob1
107+
}
108+
val kjob3 = kjob2
109+
.addNewVolumeMount()
110+
.withName("job-directory")
111+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/input")
112+
.withMountPath("/job/input")
113+
.withReadOnly(true)
114+
.endVolumeMount()
115+
.addNewVolumeMount()
116+
.withName("job-directory")
117+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/output")
118+
.withMountPath("/job/output")
119+
.withReadOnly(false)
120+
.endVolumeMount()
121+
.endContainer()
122+
.withRestartPolicy("Never")
123+
.endSpec()
77124
.endTemplate()
78125
.endSpec()
79126
.build();
80-
logger.info(s"Constructed k8s Job ${job.workerId()}\n")
127+
logger.info(s"Constructed k8s Job ${kjob3.getMetadata().getName()}\n")
81128

82-
val execution = Future {
83-
val created_kjob = client.batch().jobs().create(kjob)
84-
logger.info(s"Created k8s Job ${created_kjob.getMetadata().getUid()}")
85-
logger.info(s"the status at creation time is ${created_kjob.getStatus().toString()}")
86-
// FIXME: use the given timeout value
87-
val ended_kjob = client.batch().jobs().withName(job.workerId())
88-
.waitUntilCondition(j => (j.getStatus().getFailed() > 0 || j.getStatus().getSucceeded() > 0),
89-
5, TimeUnit.MINUTES );
90-
()
91-
}.andThen {
92-
case r =>
93-
val foo_kjob = client.batch().jobs().withName(job.workerId()).get()
94-
logger.info(s"k8s Job ${foo_kjob.getMetadata().getUid()} status ${foo_kjob.getStatus().toString()}")
95-
Future {}
96-
}
97-
execution
129+
val execution = Future {
130+
val created_kjob = client.batch().jobs().create(kjob3)
131+
logger.info(s"Created k8s Job ${created_kjob.getMetadata().getName()}")
132+
// FIXME: use the given timeout value
133+
val ended_kjob = client.batch().jobs().withName(kname)
134+
.waitUntilCondition(j => (j.getStatus().getFailed() > 0 || j.getStatus().getSucceeded() > 0),
135+
5, TimeUnit.MINUTES );
136+
()
137+
}.andThen {
138+
case r =>
139+
val foo_kjob = client.batch().jobs().withName(kname).get()
140+
logger.info(s"k8s Job ${foo_kjob.getMetadata().getUid()} status ${foo_kjob.getStatus().toString()}")
141+
Future {}
142+
}
143+
execution
98144
}
99145
}

0 commit comments

Comments
 (0)