@@ -20,6 +20,7 @@ import org.thp.cortex.models._
2020import org .elastic4play ._
2121import org .elastic4play .controllers ._
2222import org .elastic4play .services ._
23+ import org .elastic4play .utils .Hasher
2324
2425@ Singleton
2526class JobSrv (
@@ -243,11 +244,10 @@ class JobSrv(
243244 parameters : JsObject ,
244245 label : Option [String ],
245246 force : Boolean )(implicit authContext : AuthContext ): Future [Job ] = {
246- val previousJob = if (force) Future .successful(None )
247- else findSimilarJob(worker, dataType, dataAttachment, tlp, parameters)
247+ val previousJob = findSimilarJob(worker, dataType, dataAttachment, tlp, parameters, force)
248248 previousJob.flatMap {
249- case Some (job) ⇒ Future .successful(job)
250- case None ⇒ isUnderRateLimit(worker).flatMap {
249+ case Right (job) ⇒ Future .successful(job)
250+ case Left (cacheTag) ⇒ isUnderRateLimit(worker).flatMap {
251251 case true ⇒
252252 val fields = Fields (Json .obj(
253253 " workerDefinitionId" → worker.workerDefinitionId(),
@@ -260,7 +260,8 @@ class JobSrv(
260260 " pap" → pap,
261261 " message" → message,
262262 " parameters" → parameters.toString,
263- " type" → worker.tpe()))
263+ " type" → worker.tpe(),
264+ " cacheTag" → cacheTag))
264265 .set(" label" , label.map(JsString .apply))
265266 val fieldWithData = dataAttachment match {
266267 case Left (data) ⇒ fields.set(" data" , data)
@@ -298,28 +299,27 @@ class JobSrv(
298299 .getOrElse(Future .successful(true ))
299300 }
300301
301- def findSimilarJob (worker : Worker , dataType : String , dataAttachment : Either [String , Attachment ], tlp : Long , parameters : JsObject ): Future [Option [Job ]] = {
302- val cache = worker.jobCache().fold(jobCache)(_.minutes)
303- if (cache.length == 0 || worker.tpe() == WorkerType .responder) {
302+ def findSimilarJob (worker : Worker , dataType : String , dataAttachment : Either [String , Attachment ], tlp : Long , parameters : JsObject , force : Boolean ): Future [Either [String , Job ]] = {
303+ val cacheTag = Hasher (" MD5" ).fromString(s " ${worker.id}| $dataType| $tlp| ${dataAttachment.fold(data ⇒ data, attachment ⇒ attachment.id)}| $parameters" ).head.toString()
304+ lazy val cache = worker.jobCache().fold(jobCache)(_.minutes)
305+ if (force || cache.length == 0 || worker.tpe() == WorkerType .responder) {
304306 logger.info(" Job cache is disabled" )
305- Future .successful(None )
307+ Future .successful(Left (cacheTag) )
306308 }
307309 else {
308310 import org .elastic4play .services .QueryDSL ._
309311 logger.info(s " Looking for similar job in the last ${cache.toMinutes} minutes (worker= ${worker.id}, dataType= $dataType, data= $dataAttachment, tlp= $tlp, parameters= $parameters) " )
312+
310313 val now = new Date ().getTime
311314 find(and(
312- " workerId " ~= worker.id ,
315+ " cacheTag " ~= cacheTag ,
313316 " status" ~!= JobStatus .Failure ,
314317 " status" ~!= JobStatus .Deleted ,
315- " startDate" ~>= (now - cache.toMillis),
316- " dataType" ~= dataType,
317- " tlp" ~= tlp,
318- dataAttachment.fold(data ⇒ " data" ~= data, attachment ⇒ " attachment.id" ~= attachment.id),
319- " parameters" ~= parameters.toString), Some (" 0-1" ), Seq (" -createdAt" ))
318+ " startDate" ~>= (now - cache.toMillis)), Some (" 0-1" ), Seq (" -createdAt" ))
320319 ._1
321320 .map(j ⇒ new Job (jobModel, j.attributes + (" fromCache" → JsBoolean (true ))))
322321 .runWith(Sink .headOption)
322+ .map(_.toRight(cacheTag))
323323 }
324324 }
325325
0 commit comments