Skip to content

Commit

Permalink
Reset task's delay only after viability threshold has been reached
Browse files Browse the repository at this point in the history
  • Loading branch information
Florent Flament authored and pierrecdn committed Feb 5, 2016
1 parent 0937b75 commit f405db4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@ private[launchqueue] class RateLimiter(clock: Clock) {
/** The task launch delays per app and their last config change. */
private[this] var taskLaunchDelays = Map[(PathId, Timestamp), Delay]()

def cleanUpOverdueDelays(): Unit = {
def getMinimumTaskExecutionSeconds: Long = minimumTaskExecutionSeconds

/**
* Reset delay for tasks that have reached the viability
* threshold. The deadline indicates when the task has been
* launched for the last time.
*/
def resetViableTasksDelays(): Unit = {
taskLaunchDelays = taskLaunchDelays.filter {
case (_, delay) => delay.deadline > clock.now()
case (_, delay) =>
clock.now() - FiniteDuration(minimumTaskExecutionSeconds, TimeUnit.SECONDS) < delay.deadline
}
}

def getDelay(app: AppDefinition): Timestamp =
def getDeadline(app: AppDefinition): Timestamp =
taskLaunchDelays.get(app.id -> app.versionInfo.lastConfigChangeVersion).map(_.deadline) getOrElse clock.now()

def addDelay(app: AppDefinition): Timestamp = {
Expand Down Expand Up @@ -69,10 +77,16 @@ private[launchqueue] class RateLimiter(clock: Clock) {
}

private object RateLimiter {
/**
* The viability threshold may be tuned later, once the mechanism
* has been validated.
*/
private val minimumTaskExecutionSeconds: Long = 60

private val log = LoggerFactory.getLogger(getClass.getName)

private object Delay {
def apply(clock: Clock, app: AppDefinition): Delay = Delay(clock, app.backoff)
def apply(clock: Clock, app: AppDefinition): Delay = Delay(clock.now() + app.backoff, app.backoff)
def apply(clock: Clock, delay: FiniteDuration): Delay = Delay(clock.now() + delay, delay)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props }
import akka.event.LoggingReceive
import mesosphere.marathon.core.launchqueue.impl.RateLimiterActor.{
AddDelay,
CleanupOverdueDelays,
ResetViableTasksDelays,
DecreaseDelay,
DelayUpdate,
GetDelay,
Expand Down Expand Up @@ -35,7 +35,7 @@ private[launchqueue] object RateLimiterActor {
private[impl] case class AddDelay(app: AppDefinition)
private[impl] case class DecreaseDelay(app: AppDefinition)

private case object CleanupOverdueDelays
private case object ResetViableTasksDelays
}

private class RateLimiterActor private (
Expand All @@ -47,7 +47,7 @@ private class RateLimiterActor private (

override def preStart(): Unit = {
import context.dispatcher
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, CleanupOverdueDelays)
cleanup = context.system.scheduler.schedule(10.seconds, 10.seconds, self, ResetViableTasksDelays)
log.info("started RateLimiterActor")
}

Expand All @@ -62,27 +62,29 @@ private class RateLimiterActor private (
).reduceLeft(_.orElse[Any, Unit](_))
}

/**
* If an app gets removed or updated, the delay should be reset. If
* an app is considered viable, the delay should be reset too. We
* check and reset viable tasks' delays periodically.
*/
private[this] def receiveCleanup: Receive = {
case CleanupOverdueDelays =>
// If an app gets removed or updated, the delay should be reset.
// Still, we can remove overdue delays before that and also make leaks less likely
// by calling this periodically.
rateLimiter.cleanUpOverdueDelays()
case ResetViableTasksDelays =>
rateLimiter.resetViableTasksDelays()
}

private[this] def receiveDelayOps: Receive = {
case GetDelay(app) =>
sender() ! DelayUpdate(app, rateLimiter.getDelay(app))
sender() ! DelayUpdate(app, rateLimiter.getDeadline(app))

case AddDelay(app) =>
rateLimiter.addDelay(app)
launchQueueRef ! DelayUpdate(app, rateLimiter.getDelay(app))
launchQueueRef ! DelayUpdate(app, rateLimiter.getDeadline(app))

case DecreaseDelay(app) => // ignore for now

case ResetDelay(app) =>
rateLimiter.resetDelay(app)
launchQueueRef ! DelayUpdate(app, rateLimiter.getDelay(app))
launchQueueRef ! DelayUpdate(app, rateLimiter.getDeadline(app))
sender() ! ResetDelayResponse
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class RateLimiterTest extends TestKit(ActorSystem("system")) with MarathonSpec w

limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 10.seconds)
limiter.getDeadline(app) should be(clock.now() + 10.seconds)
}

test("addDelay for existing delay") {
Expand All @@ -29,22 +29,27 @@ class RateLimiterTest extends TestKit(ActorSystem("system")) with MarathonSpec w
limiter.addDelay(app)
limiter.addDelay(app)

limiter.getDelay(app) should be(clock.now() + 20.seconds)
limiter.getDeadline(app) should be(clock.now() + 20.seconds)
}

test("cleanupOverdueDelays") {
test("resetViableTasksDelays") {
val time_origin = clock.now()
val limiter = new RateLimiter(clock)
val overdue = AppDefinition(id = "overdue".toPath, backoff = 10.seconds)
limiter.addDelay(overdue)
val stillWaiting = AppDefinition(id = "test".toPath, backoff = 20.seconds)
val threshold = limiter.getMinimumTaskExecutionSeconds
val viable = AppDefinition(id = "viable".toPath, backoff = 10.seconds)
limiter.addDelay(viable)
val notYetViable = AppDefinition(id = "notYetViable".toPath, backoff = 20.seconds)
limiter.addDelay(notYetViable)
val stillWaiting = AppDefinition(id = "test".toPath, backoff = (threshold + 20).seconds)
limiter.addDelay(stillWaiting)

clock += 11.seconds
clock += (threshold + 11).seconds

limiter.cleanUpOverdueDelays()
limiter.resetViableTasksDelays()

limiter.getDelay(overdue) should be(clock.now())
limiter.getDelay(stillWaiting) should be(clock.now() + 9.seconds)
limiter.getDeadline(viable) should be(clock.now())
limiter.getDeadline(notYetViable) should be(time_origin + 20.seconds)
limiter.getDeadline(stillWaiting) should be(time_origin + (threshold + 20).seconds)
}

test("resetDelay") {
Expand All @@ -55,7 +60,7 @@ class RateLimiterTest extends TestKit(ActorSystem("system")) with MarathonSpec w

limiter.resetDelay(app)

limiter.getDelay(app) should be(clock.now())
limiter.getDeadline(app) should be(clock.now())
}

}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.14.1"
version in ThisBuild := "0.14.1-criteo"

0 comments on commit f405db4

Please sign in to comment.