Skip to content

Commit

Permalink
Reset task's delay only after viability threshold has been reached
Browse files Browse the repository at this point in the history
Conflicts:
	src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiter.scala
	src/main/scala/mesosphere/marathon/core/launchqueue/impl/RateLimiterActor.scala
  • Loading branch information
Florent Flament authored and pierrecdn committed Sep 19, 2016
1 parent 383bae4 commit 1cb5724
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ private[launchqueue] class RateLimiter(clock: Clock) {
/** The task launch delays per run spec and their last config change. */
private[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]()

def cleanUpOverdueDelays(): Unit = {
def getMinimumTaskExecutionSeconds: Long = minimumTaskExecutionSeconds

/**
* Reset delay for tasks that have reached the viability
* threshold. The deadline indicates when the task has been
* launched for the last time.
*/
def resetViableTasksDelays(): Unit = {
taskLaunchDelays = taskLaunchDelays.filter {
case (_, delay) => delay.deadline > clock.now()
case (_, delay) =>
clock.now() - FiniteDuration(minimumTaskExecutionSeconds, TimeUnit.SECONDS) < delay.deadline
}
}

def getDelay(spec: RunSpec): Timestamp =
def getDeadline(spec: RunSpec): Timestamp =
taskLaunchDelays.get(spec.id -> spec.versionInfo.lastConfigChangeVersion).map(_.deadline) getOrElse clock.now()

def addDelay(runSpec: RunSpec): Timestamp = {
Expand Down Expand Up @@ -68,10 +76,16 @@ private[launchqueue] class RateLimiter(clock: Clock) {
}

private object RateLimiter {
/**
* The viability threshold may be tuned later, once the mechanism
* has been validated.
*/
private val minimumTaskExecutionSeconds: Long = 60

private val log = LoggerFactory.getLogger(getClass.getName)

private object Delay {
def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock, runSpec.backoff)
def apply(clock: Clock, runSpec: RunSpec): Delay = Delay(clock.now() + runSpec.backoff, runSpec.backoff)
def apply(clock: Clock, delay: FiniteDuration): Delay = Delay(clock.now() + delay, delay)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props }
import akka.event.LoggingReceive
import mesosphere.marathon.core.launchqueue.impl.RateLimiterActor.{
AddDelay,
CleanupOverdueDelays,
ResetViableTasksDelays,
DecreaseDelay,
DelayUpdate,
GetDelay,
Expand Down Expand Up @@ -32,7 +32,7 @@ private[launchqueue] object RateLimiterActor {
private[impl] case class AddDelay(runSpec: RunSpec)
private[impl] case class DecreaseDelay(runSpec: RunSpec)

private case object CleanupOverdueDelays
private case object ResetViableTasksDelays
}

private class RateLimiterActor private (
Expand All @@ -42,7 +42,7 @@ private class RateLimiterActor private (

override def preStart(): Unit = {
import context.dispatcher
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, CleanupOverdueDelays)
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, ResetViableTasksDelays)
log.info("started RateLimiterActor")
}

Expand All @@ -57,27 +57,29 @@ private class RateLimiterActor private (
).reduceLeft(_.orElse[Any, Unit](_))
}

/**
* If an app gets removed or updated, the delay should be reset. If
* an app is considered viable, the delay should be reset too. We
* check and reset viable tasks' delays periodically.
*/
private[this] def receiveCleanup: Receive = {
case CleanupOverdueDelays =>
// If an run spec gets removed or updated, the delay should be reset.
// Still, we can remove overdue delays before that and also make leaks less likely
// by calling this periodically.
rateLimiter.cleanUpOverdueDelays()
case ResetViableTasksDelays =>
rateLimiter.resetViableTasksDelays()
}

private[this] def receiveDelayOps: Receive = {
case GetDelay(runSpec) =>
sender() ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
sender() ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))

case AddDelay(runSpec) =>
rateLimiter.addDelay(runSpec)
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))

case DecreaseDelay(runSpec) => // ignore for now

case ResetDelay(runSpec) =>
rateLimiter.resetDelay(runSpec)
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDelay(runSpec))
launchQueueRef ! DelayUpdate(runSpec, rateLimiter.getDeadline(runSpec))
sender() ! ResetDelayResponse
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class RateLimiterTest extends MarathonActorSupport with MarathonSpec with Matche

limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 10.seconds)
limiter.getDeadline(app) should be(clock.now() + 10.seconds)
}

test("addDelay for existing delay") {
Expand All @@ -28,22 +28,27 @@ class RateLimiterTest extends MarathonActorSupport with MarathonSpec with Matche
limiter.addDelay(app)
limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 20.seconds)
limiter.getDeadline(app) should be(clock.now() + 20.seconds)
}

test("cleanupOverdueDelays") {
test("resetViableTasksDelays") {
val time_origin = clock.now()
val limiter = new RateLimiter(clock)
val overdue = AppDefinition(id = "overdue".toPath, backoff = 10.seconds)
limiter.addDelay(overdue)
val stillWaiting = AppDefinition(id = "test".toPath, backoff = 20.seconds)
val threshold = limiter.getMinimumTaskExecutionSeconds
val viable = AppDefinition(id = "viable".toPath, backoff = 10.seconds)
limiter.addDelay(viable)
val notYetViable = AppDefinition(id = "notYetViable".toPath, backoff = 20.seconds)
limiter.addDelay(notYetViable)
val stillWaiting = AppDefinition(id = "test".toPath, backoff = (threshold + 20).seconds)
limiter.addDelay(stillWaiting)

clock += 11.seconds
clock += (threshold + 11).seconds

limiter.cleanUpOverdueDelays()
limiter.resetViableTasksDelays()

limiter.getDelay(overdue) should be(clock.now())
limiter.getDelay(stillWaiting) should be(clock.now() + 9.seconds)
limiter.getDeadline(viable) should be(clock.now())
limiter.getDeadline(notYetViable) should be(time_origin + 20.seconds)
limiter.getDeadline(stillWaiting) should be(time_origin + (threshold + 20).seconds)
}

test("resetDelay") {
Expand All @@ -54,7 +59,7 @@ class RateLimiterTest extends MarathonActorSupport with MarathonSpec with Matche

limiter.resetDelay(app)

limiter.getDelay(app) should be(clock.now())
limiter.getDeadline(app) should be(clock.now())
}

}

0 comments on commit 1cb5724

Please sign in to comment.