Skip to content

Commit 009c997

Browse files
committed
(redux) add K8sJobRunnerSrv, which runs Cortex jobs using Kubernetes Jobs
1 parent 619a70f commit 009c997

8 files changed

Lines changed: 179 additions & 3 deletions

File tree

app/org/thp/cortex/services/JobRunnerSrv.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class JobRunnerSrv @Inject() (
2727
artifactModel: ArtifactModel,
2828
processJobRunnerSrv: ProcessJobRunnerSrv,
2929
dockerJobRunnerSrv: DockerJobRunnerSrv,
30+
k8sJobRunnerSrv: K8sJobRunnerSrv,
3031
workerSrv: WorkerSrv,
3132
createSrv: CreateSrv,
3233
updateSrv: UpdateSrv,
@@ -47,6 +48,7 @@ class JobRunnerSrv @Inject() (
4748
.getOrElse(Seq("docker", "process"))
4849
.map(_.toLowerCase)
4950
.collect {
51+
case "kubernetes" if k8sJobRunnerSrv.isAvailable => "kubernetes"
5052
case "docker" if dockerJobRunnerSrv.isAvailable => "docker"
5153
case "process" =>
5254
Seq("", "2", "3").foreach { pythonVersion =>
@@ -65,6 +67,7 @@ class JobRunnerSrv @Inject() (
6567

6668
lazy val processRunnerIsEnable: Boolean = runners.contains("process")
6769
lazy val dockerRunnerIsEnable: Boolean = runners.contains("docker")
70+
lazy val k8sRunnerIsEnable: Boolean = runners.contains("kubernetes")
6871

6972
private object deleteVisitor extends SimpleFileVisitor[Path] {
7073
override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = {
@@ -218,6 +221,14 @@ class JobRunnerSrv @Inject() (
218221
maybeJobFolder = Some(jobFolder)
219222
runners
220223
.foldLeft[Option[Try[Unit]]](None) {
224+
case (None, "kubernetes") =>
225+
worker
226+
.dockerImage()
227+
.map(dockerImage => k8sJobRunnerSrv.run(jobFolder, dockerImage, job, worker.jobTimeout().map(_.minutes)))
228+
.orElse {
229+
logger.warn(s"worker ${worker.id} can't be run with kubernetes (doesn't have image)")
230+
None
231+
}
221232
case (None, "docker") =>
222233
worker
223234
.dockerImage()
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package org.thp.cortex.services
2+
3+
import java.util.concurrent.TimeUnit
4+
import java.nio.file._
5+
6+
import scala.concurrent.duration.FiniteDuration
7+
import scala.util.{Try}
8+
import scala.collection.JavaConverters._
9+
10+
import play.api.{Configuration, Logger}
11+
12+
import akka.actor.ActorSystem
13+
import io.fabric8.kubernetes.client.{DefaultKubernetesClient}
14+
import io.fabric8.kubernetes.api.model.batch.{JobBuilder => KJobBuilder}
15+
import io.fabric8.kubernetes.api.model.{PersistentVolumeClaimVolumeSourceBuilder}
16+
import javax.inject.{Inject, Singleton}
17+
import org.thp.cortex.models._
18+
19+
@Singleton
20+
class K8sJobRunnerSrv(
21+
client: DefaultKubernetesClient,
22+
config: Configuration,
23+
autoUpdate: Boolean,
24+
jobBaseDirectory: Path,
25+
persistentVolumeClaimName: String,
26+
implicit val system: ActorSystem
27+
) {
28+
29+
@Inject()
30+
def this(config: Configuration, system: ActorSystem) =
31+
this(
32+
new DefaultKubernetesClient(),
33+
config,
34+
config.getOptional[Boolean]("job.kubernetes.autoUpdate").getOrElse(true),
35+
Paths.get(config.get[String]("job.directory")),
36+
config.get[String]("job.kubernetes.persistentVolumeClaimName"),
37+
system: ActorSystem
38+
)
39+
40+
lazy val logger = Logger(getClass)
41+
42+
lazy val isAvailable: Boolean =
43+
Try {
44+
val ver = client.getVersion()
45+
logger.info(s"Kubernetes is available: major ${ver.getMajor()} minor ${ver.getMinor()} git ${ver.getGitCommit()}")
46+
true
47+
}.recover {
48+
case error =>
49+
logger.info(s"Kubernetes is not available", error)
50+
false
51+
}.get
52+
53+
def run(jobDirectory: Path, dockerImage: String, job: Job, timeout: Option[FiniteDuration]): Try[Unit] = {
54+
val cacertsFile = jobDirectory.resolve("input").resolve("cacerts")
55+
val relativeJobDirectory = jobBaseDirectory.relativize(jobDirectory).toString()
56+
// make the default longer than likely values, but still not infinite
57+
val timeout_or_default = timeout getOrElse new FiniteDuration(8, TimeUnit.HOURS)
58+
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
59+
// FIXME: this collapses case, jeopardizing the uniqueness of the
60+
// identifier. LDH: lowercase, digits, hyphens.
61+
val ldh_jobid = "_".r.replaceAllIn(job.id.map(_.toLower), "-")
62+
val kjobName = "neuron-job-" + ldh_jobid
63+
val pvcvs = new PersistentVolumeClaimVolumeSourceBuilder()
64+
.withClaimName(persistentVolumeClaimName)
65+
.withReadOnly(false)
66+
.build();
67+
val kjob1 = new KJobBuilder()
68+
.withApiVersion("batch/v1")
69+
.withNewMetadata()
70+
.withName(kjobName)
71+
.withLabels(Map(
72+
"cortex-job-id" -> job.id,
73+
"cortex-worker-id" -> job.workerId(),
74+
"cortex-neuron-job" -> "true").asJava)
75+
.endMetadata()
76+
.withNewSpec()
77+
.withNewTemplate()
78+
.withNewSpec()
79+
.addNewVolume()
80+
.withName("job-directory")
81+
.withPersistentVolumeClaim(pvcvs)
82+
.endVolume()
83+
.addNewContainer()
84+
.withName("neuron")
85+
.withImage(dockerImage)
86+
.withArgs("/job")
87+
.addNewEnv()
88+
.withName("CORTEX_JOB_FOLDER")
89+
.withValue(relativeJobDirectory)
90+
.endEnv();
91+
val kjob2 = if (Files.exists(cacertsFile)) {
92+
kjob1.addNewEnv()
93+
.withName("REQUESTS_CA_BUNDLE")
94+
.withValue("/job/input/cacerts")
95+
.endEnv()
96+
} else {
97+
kjob1
98+
}
99+
val kjob3 = kjob2
100+
.addNewVolumeMount()
101+
.withName("job-directory")
102+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/input")
103+
.withMountPath("/job/input")
104+
.withReadOnly(true)
105+
.endVolumeMount()
106+
.addNewVolumeMount()
107+
.withName("job-directory")
108+
.withSubPathExpr("$(CORTEX_JOB_FOLDER)/output")
109+
.withMountPath("/job/output")
110+
.withReadOnly(false)
111+
.endVolumeMount()
112+
.endContainer()
113+
.withRestartPolicy("Never")
114+
.endSpec()
115+
.endTemplate()
116+
.endSpec()
117+
.build();
118+
119+
val execution = Try {
120+
val created_kjob = client.batch().jobs().create(kjob3)
121+
val created_env = created_kjob
122+
.getSpec().getTemplate().getSpec().getContainers().get(0)
123+
.getEnv().asScala;
124+
logger.info(
125+
s"Created Kubernetes Job ${created_kjob.getMetadata().getName()}\n" +
126+
s" timeout: ${timeout_or_default.toString}\n" +
127+
s" image : $dockerImage\n" +
128+
s" mount : pvc ${persistentVolumeClaimName} subdir ${relativeJobDirectory} as /job" +
129+
created_env.map(ev => s"\n env : ${ev.getName()} = ${ev.getValue()}").mkString)
130+
val ended_kjob = client.batch().jobs().withLabel("cortex-job-id", job.id)
131+
.waitUntilCondition((x => Option(x).flatMap(j =>
132+
Option(j.getStatus).flatMap(s =>
133+
Some(s.getConditions.asScala.map(_.getType).filter(t =>
134+
t.equals("Complete") || t.equals("Failed")).nonEmpty)))
135+
getOrElse false),
136+
timeout_or_default.length, timeout_or_default.unit);
137+
if(ended_kjob != null) {
138+
logger.info(s"Kubernetes Job ${ended_kjob.getMetadata().getName()} " +
139+
s"(for job ${job.id}) status is now ${ended_kjob.getStatus().toString()}")
140+
} else {
141+
logger.info(s"Kubernetes Job for ${job.id} no longer exists")
142+
}
143+
}
144+
// let's find the job by the attribute we know is fundamentally
145+
// unique, rather than one constructed from it
146+
val deleted = client.batch().jobs().withLabel("cortex-job-id", job.id).delete()
147+
if(deleted) {
148+
logger.info(s"Deleted Kubernetes Job for job ${job.id}")
149+
} else {
150+
logger.info(s"While trying to delete Kubernetes Job for ${job.id}, the job was not found; this is OK")
151+
}
152+
execution
153+
}
154+
}

app/org/thp/cortex/services/WorkerSrv.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,14 @@ class WorkerSrv @Inject() (
175175
workerDefinitions.filter {
176176
case w if w.command.isDefined && jobRunnerSrv.processRunnerIsEnable => true
177177
case w if w.dockerImage.isDefined && jobRunnerSrv.dockerRunnerIsEnable => true
178+
case w if w.dockerImage.isDefined && jobRunnerSrv.k8sRunnerIsEnable => true
178179
case w =>
179180
val reason =
180181
if (w.command.isDefined) "process runner is disabled"
181-
else if (w.dockerImage.isDefined) "Docker runner is disabled"
182+
else if (w.dockerImage.isDefined && !jobRunnerSrv.dockerRunnerIsEnable)
183+
"Docker runner is disabled"
184+
else if (w.dockerImage.isDefined && !jobRunnerSrv.k8sRunnerIsEnable)
185+
"Kubernetes runner is disabled"
182186
else "it doesn't have image nor command"
183187

184188
logger.warn(s"$workerType ${w.name} is disabled because $reason")

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ lazy val cortex = (project in file("."))
2020
Dependencies.reflections,
2121
Dependencies.zip4j,
2222
Dependencies.dockerClient,
23+
Dependencies.k8sClient,
2324
Dependencies.akkaCluster,
2425
Dependencies.akkaClusterTyped
2526
),

conf/reference.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ cache {
1111

1212
job {
1313
timeout = 30 minutes
14-
runners = [docker, process]
14+
runners = [kubernetes, docker, process]
1515
directory = ${java.io.tmpdir}
1616
dockerDirectory = ${job.directory}
1717
keepJobFolder = false

package/docker/entrypoint

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ SHOW_SECRET=${show_secret:-0}
2020
DAEMON_USER=${daemon_user:-cortex}
2121
JOB_DIRECTORY=${job_directory:-/tmp/cortex-jobs}
2222
DOCKER_JOB_DIRECTORY=${docker_job_directory:-}
23+
KUBERNETES_JOB_PVC=${kubernetes_job_pvc:-}
2324

2425
function usage {
2526
cat <<- _EOF_
@@ -33,6 +34,7 @@ function usage {
3334
--show-secret | show the generated secret
3435
--job-directory <dir> | use this directory to store job files
3536
--docker-job-directory <dir> | indicate the job directory in the host (not inside container)
37+
--kubernetes-job-pvc <name> | indicate the ReadWriteMany persistent volume claim holding job directory
3638
--analyzer-url <url> | where analyzers are located (url or path)
3739
--responder-url <url> | where responders are located (url or path)
3840
--start-docker | start a internal docker (inside container) to run analyzers/responders
@@ -56,6 +58,7 @@ do
5658
"--show-secret") SHOW_SECRET=1;;
5759
"--job-directory") shift; JOB_DIRECTORY=$1;;
5860
"--docker-job-directory") shift; DOCKER_JOB_DIRECTORY=$1;;
61+
"--kubernetes-job-pvc") shift; KUBERNETES_JOB_PVC=$1;;
5962
"--analyzer-path") echo "--analyzer-path is deprecated, please use --analyzer-url"
6063
shift; ANALYZER_URLS+=("$1");;
6164
"--responder-path") echo "--responder-path is deprecated, please use --responder-url"
@@ -112,6 +115,7 @@ then
112115

113116
test -n "$JOB_DIRECTORY" && echo "job.directory=\"$JOB_DIRECTORY\"" >> "$CONFIG_FILE"
114117
test -n "$DOCKER_JOB_DIRECTORY" && echo "job.dockerDirectory=\"$DOCKER_JOB_DIRECTORY\"" >> "$CONFIG_FILE"
118+
test -n "$KUBERNETES_JOB_PVC" && echo "job.kubernetes.persistentVolumeClaimName=\"$KUBERNETES_JOB_PVC\"" >> "$CONFIG_FILE"
115119

116120
function join_urls {
117121
echo -n "\"$1\""

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ object Dependencies {
2020
val zip4j = "net.lingala.zip4j" % "zip4j" % "2.10.0"
2121
val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6"
2222
val dockerClient = "com.spotify" % "docker-client" % "8.14.4"
23+
val k8sClient = "io.fabric8" % "kubernetes-client" % "5.0.2"
2324
val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion
2425
val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion
2526
}

www/src/app/pages/analyzers/analyzers.service.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export default class AnalyzerService {
4343

4444
if (def.dockerImage && def.dockerImage !== null) {
4545
def.runners.push('Docker');
46+
def.runners.push('Kubernetes');
4647
}
4748
});
4849

@@ -232,4 +233,4 @@ export default class AnalyzerService {
232233
return this.$http.post('./api/analyzer/' + id + '/run', postData);
233234
}
234235
}
235-
}
236+
}

0 commit comments

Comments
 (0)