Skip to content

Commit 6624cd8

Browse files
committed
create a Kubernetes job. the job does not run properly yet
1 parent 5ed8373 commit 6624cd8

1 file changed

Lines changed: 45 additions & 3 deletions

File tree

app/org/thp/cortex/services/K8sJobRunnerSrv.scala

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package org.thp.cortex.services
22

3+
import java.util.concurrent.TimeUnit
34
import java.nio.charset.StandardCharsets
45
import java.nio.file._
56

67
import scala.concurrent.duration.FiniteDuration
78
import scala.concurrent.{ExecutionContext, Future}
89
import scala.util.Try
10+
import scala.collection.JavaConverters._
911

1012
import play.api.libs.json.Json
1113
import play.api.{Configuration, Logger}
1214

1315
import akka.actor.ActorSystem
14-
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, ConfigBuilder}
16+
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, ConfigBuilder, Watcher}
17+
import io.fabric8.kubernetes.api.model.batch.{Job => KJob, JobBuilder => KJobBuilder}
18+
1519
// import com.spotify.docker.client.DockerClient.LogsParam
1620
// import com.spotify.docker.client.messages.HostConfig.Bind
1721
// import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}
@@ -44,14 +48,52 @@ class K8sJobRunnerSrv(
4448

4549
lazy val isAvailable: Boolean =
4650
Try {
47-
logger.info(s"Kubernetes is available:\n${client.getVersion().toString()}")
51+
val ver = client.getVersion()
52+
logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}")
4853
true
4954
}.recover {
5055
case error =>
5156
logger.info(s"Kubernetes is not available", error)
5257
false
5358
}.get
5459

55-
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = { Future {} }
60+
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration])(implicit ec: ExecutionContext): Future[Unit] = {
61+
val kjob = new KJobBuilder()
62+
.withApiVersion("batch/v1")
63+
.withNewMetadata()
64+
.withName(job.workerId())
65+
.withLabels(Map("cortex_workerId" -> job.workerId()).asJava)
66+
.endMetadata()
67+
.withNewSpec()
68+
.withNewTemplate()
69+
.withNewSpec()
70+
.addNewContainer()
71+
.withName("neuron")
72+
.withImage(dockerImage)
73+
.withArgs("/job")
74+
.endContainer()
75+
.withRestartPolicy("Never")
76+
.endSpec()
77+
.endTemplate()
78+
.endSpec()
79+
.build();
80+
logger.info(s"Constructed k8s Job ${job.workerId()}\n")
5681

82+
val execution = Future {
83+
val created_kjob = client.batch().jobs().create(kjob)
84+
logger.info(s"Created k8s Job ${created_kjob.getMetadata().getUid()}")
85+
logger.info(s"the status at creation time is ${created_kjob.getStatus().toString()}")
86+
// FIXME: use the given timeout value
87+
val ended_kjob = client.batch().jobs().withName(job.workerId())
88+
.waitUntilCondition(j => (j.getStatus().getFailed() > 0 || j.getStatus().getSucceeded() > 0),
89+
5, TimeUnit.MINUTES );
90+
()
91+
}.andThen {
92+
case r =>
93+
val foo_kjob = client.batch().jobs().withName(job.workerId()).get()
94+
logger.info(s"k8s Job ${foo_kjob.getMetadata().getUid()} status ${foo_kjob.getStatus().toString()}")
95+
Future {}
96+
}
97+
execution
98+
}
5799
}

0 commit comments

Comments
 (0)