Skip to content

Commit

Permalink
finagle/finage-core: Make FailureAccrual markDeadFor use exponential …
Browse files Browse the repository at this point in the history
…backoff by default

Problem
The default value for markDeadFor in FailureAccrualFactory is a constant,
so frequently failing nodes are regularly reinstated after a timeout.

Solution
FailureAccrualFactory uses jittered backoffs (starting at 5s, up to 300s)
as the duration to mark dead for, if markDeadFor is not configured.

RB_ID=746930
  • Loading branch information
jcrossley authored and jenkins committed Oct 15, 2015
1 parent 5dfe12d commit 0aec0a7
Show file tree
Hide file tree
Showing 9 changed files with 372 additions and 48 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ Runtime Behavior Changes
* finagle-core: `RetryPolicy.tries` now uses jittered backoffs instead of
having no delay. ``RB_ID=752629``

* finagle-core: `FailureAccrualFactory` uses jittered backoffs as the duration
to mark dead for, if `markDeadFor` is not configured. ``RB_ID=746930``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.net.{SocketAddress, InetSocketAddress}

object DefaultClient {
private def defaultFailureAccrual(sr: StatsReceiver): ServiceFactoryWrapper =
FailureAccrualFactory.wrapper(sr, 5, () => 5.seconds, "DefaultClient", DefaultLogger, unconnected)(DefaultTimer.twitter)
FailureAccrualFactory.wrapper(sr, 5, FailureAccrualFactory.jitteredBackoff, "DefaultClient", DefaultLogger, unconnected)(DefaultTimer.twitter)

/** marker trait for uninitialized failure accrual */
private[finagle] trait UninitializedFailureAccrual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object Backoff {
*
* @param start must be greater than 0 and less than or equal to `maximum`.
* @param maximum must be greater than 0 and greater than or equal to `start`.
* @see [[decorrelatedJittered]] for an alternative jittered approach.
* @see [[decorrelatedJittered]] and [[equalJittered]] for alternative jittered approaches.
*/
def exponentialJittered(start: Duration, maximum: Duration): Stream[Duration] =
exponentialJittered(start, maximum, Rng.threadLocal)
Expand Down Expand Up @@ -74,7 +74,7 @@ object Backoff {
*
* @param start must be greater than 0 and less than or equal to `maximum`.
* @param maximum must be greater than 0 and greater than or equal to `start`.
* @see [[exponentialJittered]] for an alternative jittered approach.
* @see [[exponentialJittered]] and [[equalJittered]] for alternative jittered approaches.
*/
def decorrelatedJittered(start: Duration, maximum: Duration): Stream[Duration] =
decorrelatedJittered(start, maximum, Rng.threadLocal)
Expand Down Expand Up @@ -103,6 +103,33 @@ object Backoff {
start #:: next(start)
}

/**
* Create backoffs that keep half of the exponential growth, and jitter
* between 0 and that amount.
*
* @see [[exponentialJittered]] and [[decorelatedJittered]] for alternative jittered approaches.
*/
def equalJittered(start: Duration, maximum: Duration): Stream[Duration] =
equalJittered(start, maximum, Rng.threadLocal)

/** Exposed for testing */
private[service] def equalJittered(
start: Duration,
maximum: Duration,
rng: Rng = Rng.threadLocal
): Stream[Duration] = {
require(start > Duration.Zero)
require(maximum > Duration.Zero)
require(start <= maximum)
// this is "equal jitter" via http://www.awsarchitectureblog.com/2015/03/backoff.html
def next(attempt: Int): Stream[Duration] = {
val halfExp = start * (1L << (attempt - 1))
val backoff = maximum.min(halfExp + Duration.fromNanoseconds(rng.nextLong(halfExp.inNanoseconds)))
backoff #:: next(attempt + 1)
}
start #:: next(1)
}

/**
* Create backoffs that grow linear by `offset`.
*/
Expand All @@ -122,6 +149,14 @@ object Backoff {
def const(start: Duration): Stream[Duration] =
Backoff(start)(Function.const(start))

/**
* Create backoffs with values produced by a given generation function.
*/
def fromFunction(f: () => Duration): Stream[Duration] = {
def next(): Stream[Duration] = f() #:: next()
next()
}

/**
* Convert a [[Stream]] of [[Duration Durations]] into a Java-friendly representation.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object FailureAccrualFactory {
private[finagle] def wrapper(
statsReceiver: StatsReceiver,
numFailures: Int,
markDeadFor: () => Duration,
markDeadFor: Stream[Duration],
label: String,
logger: Logger,
endpoint: SocketAddress
Expand All @@ -30,7 +30,12 @@ object FailureAccrualFactory {

private[this] val rng = new Random

private[FailureAccrualFactory] val AliveNoFailures = Alive(0)
// Use equalJittered backoff in order to wait more time in between
// each revival attempt on successive failures; if an endpoint has failed
// previous requests, it is likely to do so again. The recent
// "failure history" should influence how long to mark the endpoint
// dead for.
private[finagle] val jitteredBackoff: Stream[Duration] = Backoff.equalJittered(5.seconds, 300.seconds)

/**
* Add jitter in `markDeadFor` to reduce correlation.
Expand Down Expand Up @@ -60,11 +65,15 @@ object FailureAccrualFactory {
}

private[finagle] object Param {
case class Configured(numFailures: Int, markDeadFor: () => Duration) extends Param
case class Configured(numFailures: Int, markDeadFor: Stream[Duration]) extends Param {
def this(numFailures: Int, markDeadFor: () => Duration) =
this(numFailures, Backoff.fromFunction(markDeadFor))
}

case class Replaced(factory: Timer => ServiceFactoryWrapper) extends Param
case object Disabled extends Param

implicit val param: Stack.Param[Param] = Stack.Param(Param(5, () => 5.seconds))
implicit val param: Stack.Param[Param] = Stack.Param(Param.Configured(5, jitteredBackoff))
}

// -Implementation notes-
Expand All @@ -86,7 +95,7 @@ object FailureAccrualFactory {
* @param markDeadFor The duration to mark an endpoint as dead.
*/
def Param(numFailures: Int, markDeadFor: () => Duration): Param =
Param.Configured(numFailures, markDeadFor)
Param.Configured(numFailures, Backoff.fromFunction(markDeadFor))

/**
* Configures the [[FailureAccrualFactory]].
Expand All @@ -95,7 +104,7 @@ object FailureAccrualFactory {
* @param markDeadFor The duration to mark an endpoint as dead.
*/
def Param(numFailures: Int, markDeadFor: Duration): Param =
Param.Configured(numFailures, () => markDeadFor)
Param.Configured(numFailures, Backoff.const(markDeadFor))

/**
* Replaces the [[FailureAccrualFactory]] with the [[ServiceFactoryWrapper]]
Expand Down Expand Up @@ -172,11 +181,13 @@ object FailureAccrualFactory {
// Dead |
// `---> ProbeOpen

protected[finagle] sealed trait State
protected[finagle] case class Alive(failureCount: Int) extends State
protected[finagle] case object Dead extends State
protected[finagle] case object ProbeOpen extends State
protected[finagle] case object ProbeClosed extends State
protected[finagle] sealed trait State {
val nextMarkDeadFor: Stream[Duration]
}
protected[finagle] case class Alive(failureCount: Int, val nextMarkDeadFor: Stream[Duration]) extends State
protected[finagle] case class Dead(val nextMarkDeadFor: Stream[Duration]) extends State
protected[finagle] case class ProbeOpen(val nextMarkDeadFor: Stream[Duration]) extends State
protected[finagle] case class ProbeClosed(val nextMarkDeadFor: Stream[Duration]) extends State
}

/**
Expand All @@ -189,7 +200,7 @@ object FailureAccrualFactory {
class FailureAccrualFactory[Req, Rep] private[finagle](
underlying: ServiceFactory[Req, Rep],
numFailures: Int,
markDeadFor: () => Duration,
markDeadFor: Stream[Duration],
timer: Timer,
statsReceiver: StatsReceiver,
label: String = "",
Expand All @@ -207,9 +218,23 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
label: String,
logger: Logger,
endpoint: SocketAddress
) = this(underlying, numFailures, () => markDeadFor, timer, statsReceiver, label, logger, endpoint)

@volatile private[this] var state: State = FailureAccrualFactory.AliveNoFailures
) = this(
underlying,
numFailures,
Backoff.const(markDeadFor),
timer,
statsReceiver,
label,
logger,
endpoint)

// Pad the back of the stream to back off for 300 seconds when the given
// stream runs out.
private[this] val AliveNoFailures = Alive(0, markDeadFor ++ Backoff.const(300.seconds))

// The head of `nextMarkDeadFor` in `state` is next duration to mark dead for.
// The tail is the remainder of the durations.
@volatile private[this] var state: State = AliveNoFailures

private[this] var reviveTimerTask: Option[TimerTask] = None

Expand All @@ -218,25 +243,30 @@ class FailureAccrualFactory[Req, Rep] private[finagle](

private[this] def didFail() = synchronized {
state match {
case Alive(failureCount) =>
case Alive(failureCount, nextMarkDeadFor) =>
if (failureCount + 1 >= numFailures) markDead()
else state = Alive(failureCount + 1)
case ProbeClosed => markDead()
else state = Alive(failureCount + 1, nextMarkDeadFor)
case ProbeClosed(_) => markDead()
case _ =>
}
}

protected def didSucceed() = synchronized {
// Only count revivals when the probe succeeds.
if (state == ProbeClosed) revivalCounter.incr()
state = FailureAccrualFactory.AliveNoFailures
state match {
case ProbeClosed(_) => revivalCounter.incr()
case _ =>
}
state = AliveNoFailures
}

protected def markDead() = synchronized {
state = Dead
removalCounter.incr()
val timerTask = timer.schedule(state.nextMarkDeadFor.head.fromNow) { startProbing() }

// Consume the next duration to mark dead for.
state = Dead(state.nextMarkDeadFor.tail)

val timerTask = timer.schedule(markDeadFor().fromNow) { startProbing() }
reviveTimerTask = Some(timerTask)

if (logger.isLoggable(Level.DEBUG))
Expand All @@ -248,7 +278,7 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
* The service must satisfy one request before accepting more.
*/
private[this] def startProbing() = synchronized {
state = ProbeOpen
state = ProbeOpen(state.nextMarkDeadFor)
cancelReviveTimerTasks()
}

Expand All @@ -265,10 +295,15 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
// ProbeClosed state. The result of first to complete will determine
// whether the factory transitions to Alive (successful) or Dead
// (unsuccessful).
if (state == ProbeOpen) {
synchronized {
if (state == ProbeOpen) state = ProbeClosed
}
state match {
case ProbeOpen(_) =>
synchronized {
state match {
case ProbeOpen(next) => state = ProbeClosed(next)
case _ =>
}
}
case _ =>
}

service(request).respond { response =>
Expand All @@ -285,8 +320,8 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
}

override def status = state match {
case Alive(_) | ProbeOpen => underlying.status
case Dead | ProbeClosed => Status.Busy
case Alive(_, _) | ProbeOpen(_) => underlying.status
case Dead(_) | ProbeClosed(_) => Status.Busy
}

protected[this] def getState: State = state
Expand All @@ -308,5 +343,12 @@ class FailureAccrualFactory[Req, Rep] private[finagle](
numFailures: Int,
markDeadFor: Duration,
timer: Timer,
label: String) = this(underlying, numFailures, () => markDeadFor, timer, NullStatsReceiver, label)
label: String
) = this(
underlying,
numFailures,
Backoff.const(markDeadFor),
timer,
NullStatsReceiver,
label)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.finagle.client
import com.twitter.concurrent.AsyncQueue
import com.twitter.finagle._
import com.twitter.finagle.dispatch.SerialClientDispatcher
import com.twitter.finagle.service.FailureAccrualFactory
import com.twitter.finagle.service.{Backoff, FailureAccrualFactory}
import com.twitter.finagle.transport.{QueueTransport, Transport}
import com.twitter.util.{Await, Future, MockTimer, Time, Var, Closable, Return}
import com.twitter.util.TimeConversions.intToTimeableNumber
Expand Down Expand Up @@ -247,7 +247,7 @@ class DefaultClientTest extends FunSuite with Eventually with IntegrationPatienc
timer = timer,
statsReceiver = statsReceiver,
failureAccrual = { factory: ServiceFactory[Int, Int] =>
FailureAccrualFactory.wrapper(statsReceiver, 6, () => 3.seconds, name, DefaultLogger, unconnected)(timer) andThen factory
FailureAccrualFactory.wrapper(statsReceiver, 6, Backoff.const(3.seconds), name, DefaultLogger, unconnected)(timer) andThen factory
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.twitter.finagle.service

import com.twitter.conversions.time._
import com.twitter.finagle.util.Rng
import com.twitter.util.Duration
import org.junit.runner.RunWith
import org.scalacheck.Gen
import org.scalatest.FunSuite
Expand Down Expand Up @@ -67,6 +68,24 @@ class BackoffTest extends FunSuite
}
}

test("equalJittered") {
forAll { seed: Long =>
val rng = Rng(seed)
val maximum = 120.millis
val backoffs = Backoff.equalJittered(5.millis, maximum, rng)
.take(10).force.toSeq.map(_.inMillis)

assert(5 == backoffs.head)

val ranges = Seq((5, 10), (10, 20), (20, 40), (40, 80),
(80, 120), (80, 120), (80, 120), (80, 120), (80, 120))
backoffs.tail.zip(ranges).foreach { case (b, (min, max)) =>
assert(b >= min)
assert(b <= max)
}
}
}

test("linear") {
val backoffs = Backoff.linear(2.seconds, 10.seconds) take 10
assert(backoffs.head === 2.seconds)
Expand All @@ -83,4 +102,14 @@ class BackoffTest extends FunSuite
assert(backoffs.force.toSeq === (0 until 10 map { _ => 10.seconds}))
}

test("from function") {
forAll { seed: Long =>
val fRng, rng = Rng(seed)
val f: () => Duration = () => {
Duration.fromNanoseconds(fRng.nextLong(10))
}
val backoffs = Backoff.fromFunction(f).take(10).force.toSeq.map(_.inNanoseconds)
backoffs.foreach { b => assert(b == rng.nextLong(10)) }
}
}
}
Loading

0 comments on commit 0aec0a7

Please sign in to comment.