@@ -43,6 +43,7 @@ import org.apache.mesos.v1.scheduler.Protos.Call
4343import org .apache .mesos .v1 .scheduler .Protos .Call ._
4444import org .apache .mesos .v1 .scheduler .Protos .Event
4545import pureconfig ._
46+ import pureconfig .loadConfigOrThrow
4647import scala .collection .JavaConverters ._
4748import scala .collection .mutable
4849import scala .collection .mutable .Buffer
@@ -119,7 +120,7 @@ case class CommandURIDef(uri: URI, extract: Boolean = true, cache: Boolean = fal
119120
120121// task states
121122sealed abstract class TaskState ()
122- case class SubmitPending (reqs : TaskDef , promise : Promise [Running ]) extends TaskState
123+ case class SubmitPending (reqs : TaskDef , promise : Promise [Running ], offerCycles : Int = 1 ) extends TaskState
123124case class Submitted (pending : SubmitPending ,
124125 taskInfo : TaskInfo ,
125126 offer : OfferID ,
@@ -137,10 +138,19 @@ case class DeletePending(taskId: String, promise: Promise[Deleted]) extends Task
137138case class Deleted (taskId : String , taskStatus : TaskStatus ) extends TaskState
138139case class Failed (taskId : String , agentId : String ) extends TaskState
139140
140- case class MesosActorConfig (agentStatsTTL : FiniteDuration , agentStatsPruningPeriod : FiniteDuration )
141+ case class MesosActorConfig (agentStatsTTL : FiniteDuration ,
142+ agentStatsPruningPeriod : FiniteDuration ,
143+ failPendingOfferCycles : Option [Int ])
141144case class AgentStats (mem : Double , cpu : Double , ports : Int , expiration : Instant )
142145
143146case class MesosAgentStats (stats : Map [String , AgentStats ])
147+
148+ case class CapacityFailure (requiredMem : Float ,
149+ requiredCpu : Float ,
150+ requiredPorts : Int ,
151+ remainingResources : List [(Float , Float , Int )])
152+ extends MesosException (" cluster does not have capacity" )
153+
144154// TODO: mesos authentication
145155trait MesosClientActor extends Actor with ActorLogging with MesosClientConnection {
146156 implicit val ec : ExecutionContext = context.dispatcher
@@ -167,7 +177,7 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
167177 var agentOfferHistory = Map .empty[String , AgentStats ] // Map[<agent hostname> -> <stats>] track the most recent offer stats per agent hostname
168178 val listener : Option [ActorRef ]
169179
170- val config = loadConfigOrThrow[ MesosActorConfig ]( " mesos-actor " )
180+ val config : MesosActorConfig
171181
172182 if (autoSubscribe) {
173183 log.info(s " auto-subscribing ${self} to mesos master at ${master}" )
@@ -191,13 +201,14 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
191201 }
192202 case object PruneStats
193203
194- val statsPruner =
204+ override def preStart () = {
195205 actorSystem.scheduler.schedule(30 .seconds, config.agentStatsPruningPeriod, context.actorOf(Props (new Actor {
196206 override def receive : Receive = {
197207 case PruneStats =>
198208 context.parent ! PruneStats // client actor needs to handle PruneStats to avoid concurrent update to stats map
199209 }
200210 })), PruneStats )
211+ }
201212 // cache the framework id, so that in case this actor restarts we can reconnect
202213 if (MesosClient .frameworkID.isEmpty) MesosClient .frameworkID = Some (FrameworkID .newBuilder().setValue(id()).build())
203214 private val frameworkID = MesosClient .frameworkID.get
@@ -234,7 +245,7 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
234245 tasks.get(taskID.getValue) match {
235246 case Some (taskDetails) =>
236247 taskDetails match {
237- case SubmitPending (taskDef, promise) => {
248+ case SubmitPending (taskDef, promise, _ ) => {
238249 log.info(s " deleting unlaunched task ${taskDef.taskId}" )
239250 tasks.remove(taskDef.taskId)
240251 }
@@ -434,7 +445,8 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
434445
435446 log.debug(s " agent offer stats: {} " , agentOfferHistory)
436447
437- val matchedTasks = taskMatcher.matchTasksToOffers(role, pending, event.getOffersList.asScala.toList, taskBuilder)
448+ val (matchedTasks, remaining) =
449+ taskMatcher.matchTasksToOffers(role, pending, event.getOffersList.asScala.toList, taskBuilder)
438450
439451 val matchedCount = matchedTasks.foldLeft(0 )(_ + _._2.size)
440452 log.info(s " matched ${matchedCount} tasks to ${matchedTasks.size} offers out of ${pending.size} pending tasks " )
@@ -474,7 +486,7 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
474486 matchedTasks.foreach(entry => {
475487 entry._2.foreach(task => {
476488 tasks(task._1.getTaskId.getValue) match {
477- case s @ SubmitPending (reqs, promise) =>
489+ case s @ SubmitPending (reqs, promise, _ ) =>
478490 // dig the hostname out of the offer whose agent id matches the agent id in the task info
479491 val hostname =
480492 event.getOffersList.asScala.find(p => p.getAgentId == task._1.getAgentId).get.getHostname
@@ -493,6 +505,27 @@ trait MesosClientActor extends Actor with ActorLogging with MesosClientConnectio
493505 }
494506
495507 }
508+ // generate failures for pending tasks that did not fit any offers
509+ config.failPendingOfferCycles.foreach { maxOfferCycles =>
510+ val submitPending = tasks.collect { case (_, s : SubmitPending ) => s }
511+ if (submitPending.nonEmpty) {
512+ submitPending.foreach { task =>
513+ // println(s"task offerCycles:${task.offerCycles} ${maxOfferCycles}")
514+ if (task.offerCycles > maxOfferCycles) {
515+ log.warning(s " failing task ${task.reqs.taskId} after ${task.offerCycles} unmatching offer cycles " )
516+ task.promise.failure(
517+ new CapacityFailure (
518+ task.reqs.mem.toFloat,
519+ task.reqs.cpus.toFloat,
520+ task.reqs.ports.size,
521+ remaining.values.toList))
522+ tasks.remove(task.reqs.taskId)
523+ } else {
524+ tasks.update(task.reqs.taskId, task.copy(offerCycles = task.offerCycles + 1 )) // increase the offer cycles this task has seen
525+ }
526+ }
527+ }
528+ }
496529
497530 // store a reference of last memory offer (total) from each agent
498531 val newOfferStats = MesosClient .getOfferStats(config, role, agentOfferMap)
@@ -691,7 +724,8 @@ class MesosClient(val id: () => String,
691724 val tasks : TaskStore ,
692725 val refuseSeconds : Double ,
693726 val heartbeatMaxFailures : Int ,
694- val listener : Option [ActorRef ])
727+ val listener : Option [ActorRef ],
728+ val config : MesosActorConfig )
695729 extends MesosClientActor
696730 with MesosClientHttpConnection {}
697731
@@ -710,7 +744,8 @@ object MesosClient {
710744 taskStore : TaskStore ,
711745 refuseSeconds : Double = 5.0 ,
712746 heartbeatMaxFailures : Int = 2 ,
713- listener : Option [ActorRef ] = None ): Props =
747+ listener : Option [ActorRef ] = None ,
748+ config : MesosActorConfig = loadConfigOrThrow[MesosActorConfig ](" mesos-actor" )): Props =
714749 Props (
715750 new MesosClient (
716751 id,
@@ -724,7 +759,8 @@ object MesosClient {
724759 taskStore,
725760 refuseSeconds,
726761 heartbeatMaxFailures,
727- listener))
762+ listener,
763+ config))
728764
729765 // TODO: allow task persistence/reconcile
730766
0 commit comments