Skip to content

Commit

Permalink
finagle-core: Ability to configure StatsFilter with a HistogramCounte…
Browse files Browse the repository at this point in the history
…rFactory to track request burstiness

Problem

While we have a counter for the number of requests, we're limited by the frequency
of metrics collection to see how these requests are spread out; that is, how "bursty"
they are.

Solution

Introduce a HistogramCounter, created via a HistogramCounterFactory, that can be used
to track request burstiness. The factory is configured on a client/server, and, if configured,
is used in StatsFilter to track request burstiness.

Differential Revision: https://phabricator.twitter.biz/D1180751
  • Loading branch information
jcrossley authored and jenkins committed Nov 11, 2024
1 parent 05f5618 commit bc815d5
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 3 deletions.
12 changes: 12 additions & 0 deletions finagle-core/src/main/scala/com/twitter/finagle/param/Params.scala
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,15 @@ object ExceptionStatsHandler {
lazy val default = ExceptionStatsHandler(StatsFilter.DefaultExceptions)
}
}

/**
* A class eligible for configuring a
* [[com.twitter.finagle.stats.HistogramCounterFactory]] throughout finagle servers
* and clients.
*/
private[twitter] case class HistogramCounterFactory(
histogramCounterFactoryOpt: Option[stats.HistogramCounterFactory])
object HistogramCounterFactory {
implicit val param: Stack.Param[HistogramCounterFactory] =
Stack.Param(HistogramCounterFactory(None))
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ object StatsFilter {
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.service.StatsFilter]].
*/
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module7[
new Stack.Module8[
param.Stats,
param.ExceptionStatsHandler,
param.ResponseClassifier,
Param,
Now,
param.MetricBuilders,
param.StandardStats,
param.HistogramCounterFactory,
ServiceFactory[Req, Rep]
] {
val role: Stack.Role = StatsFilter.role
Expand All @@ -65,6 +66,7 @@ object StatsFilter {
now: Now,
metrics: param.MetricBuilders,
standardStats: param.StandardStats,
histogramCounterFactory: param.HistogramCounterFactory,
next: ServiceFactory[Req, Rep]
): ServiceFactory[Req, Rep] = {
val param.Stats(statsReceiver) = _stats
Expand All @@ -78,7 +80,8 @@ object StatsFilter {
_param.unit,
now.nowOrDefault(_param.unit),
metrics.registry,
standardStats.standardStats
standardStats.standardStats,
histogramCounterFactory.histogramCounterFactoryOpt
).andThen(next)
}
}
Expand Down Expand Up @@ -163,6 +166,10 @@ object StatsFilter {
*
* @param metricsRegistry an optional [MetricBuilderRegistry] set by stack parameter
* for injecting metrics and instrumenting top-line expressions
*
* @param histogramCounterFactoryOpt an optional [HistogramCounterFactory] that, if present, will
* be used to record a stat for requests received over a 100ms
* period. This can be used to see "burstiness" of requests.
*/
class StatsFilter[Req, Rep] private[service] (
statsReceiver: StatsReceiver,
Expand All @@ -171,7 +178,8 @@ class StatsFilter[Req, Rep] private[service] (
timeUnit: TimeUnit,
now: () => Long,
metricsRegistry: Option[CoreMetricsRegistry] = None,
standardStats: StandardStats = Disabled)
standardStats: StandardStats = Disabled,
histogramCounterFactoryOpt: Option[HistogramCounterFactory] = None)
extends SimpleFilter[Req, Rep] {

/**
Expand Down Expand Up @@ -278,6 +286,15 @@ class StatsFilter[Req, Rep] private[service] (
statsReceiver.addGauge(Descriptions.pending, "pending") {
outstandingRequestCount.sum()
}
private[this] val requestsHistogramCounterOpt = histogramCounterFactoryOpt match {
case Some(histogramCounterFactory) =>
Some(
histogramCounterFactory(
Seq("requests"),
StatsFrequency.HundredMilliSecondly,
statsReceiver))
case None => None
}

private[this] def isIgnorableResponse(rep: Try[Rep]): Boolean = rep match {
case Throw(f: FailureFlags[_]) => f.isFlagged(FailureFlags.Ignorable)
Expand All @@ -302,6 +319,12 @@ class StatsFilter[Req, Rep] private[service] (
stats.recordStats(request, response, duration)
case None => // no-op
}

requestsHistogramCounterOpt match {
case Some(requestsHistogramCounter) =>
requestsHistogramCounter.incr()
case None => // no-op
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,42 @@ class StatsFilterTest extends AnyFunSuite {
}
}

test("requests histogram counter stat") {
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer
val sr = new InMemoryStatsReceiver()
val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis)
val filter =
new StatsFilter[String, String](
sr,
ResponseClassifier.Default,
StatsFilter.DefaultExceptions,
TimeUnit.SECONDS,
() => Time.now.inSeconds,
histogramCounterFactoryOpt = Some(histogramCounterFactory)
)
var promise = new Promise[String]
val svc = filter.andThen(new Service[String, String] {
def apply(request: String): Promise[String] = promise
})
svc("1")
svc("1")
svc("1")
promise.setValue("done")
tc.advance(100.millis)
timer.tick()

promise = new Promise[String]
svc("1")
svc("1")
promise.setValue("done")
tc.advance(100.millis)
timer.tick()

assert(sr.stats(Seq("requests", "hundredMilliSecondly")) == Seq(3, 2))
}
}

test("report exceptions") {
val (promise, receiver, statsService) = getService()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package com.twitter.finagle.stats

import com.twitter.conversions.DurationOps._
import com.twitter.util.MockTimer
import com.twitter.util.Time
import org.scalatest.funsuite.AnyFunSuite

class HistogramCounterTest extends AnyFunSuite {

test("Records stat at given frequency") {
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer()
val statsReceiver = new InMemoryStatsReceiver
val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis)
val histogramCounter =
histogramCounterFactory(
Seq("foo", "bar"),
StatsFrequency.HundredMilliSecondly,
statsReceiver
)
histogramCounter.incr(5)
assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")).isEmpty)
histogramCounter.incr()

tc.advance(100.millis)
timer.tick()

assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6))

tc.advance(100.millis)
timer.tick()

assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0))

histogramCounter.incr(2)

tc.advance(50.millis)
timer.tick()

assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0))
histogramCounter.incr(3)

tc.advance(50.millis)
timer.tick()

assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(6, 0, 5))
}
}

test("Normalizes recorded stat to the elapsed time since last recording") {
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer()
val statsReceiver = new InMemoryStatsReceiver
val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis)
val histogramCounter =
histogramCounterFactory(
Seq("foo", "bar"),
StatsFrequency.HundredMilliSecondly,
statsReceiver
)
histogramCounter.incr(5)
histogramCounter.incr(10)
assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")).isEmpty)

// Our task was slow to execute on the timer :(
tc.advance(150.millis)
timer.tick()

// We have 15 requests in 1.5 windows, so 10 requests in 1 window.
assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(10))

histogramCounter.incr(12)

tc.advance(300.millis)
timer.tick()

// We have 12 requests in 3 windows, so 4 requests in 1 window.
assert(statsReceiver.stats(Seq("foo", "bar", "hundredMilliSecondly")) == Seq(10, 4))
}
}

test("Returns the same histogramCounter object for equivalent names") {
val timer = new MockTimer()
val metrics = Metrics.createDetached()
val statsReceiver = new MetricsStatsReceiver(metrics)

val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis)

val histogramCounter1 =
histogramCounterFactory(Seq("foo", "bar"), StatsFrequency.HundredMilliSecondly, statsReceiver)
val histogramCounter2 =
histogramCounterFactory(Seq("foo", "bar"), StatsFrequency.HundredMilliSecondly, statsReceiver)
val histogramCounter3 =
histogramCounterFactory(Seq("foo/bar"), StatsFrequency.HundredMilliSecondly, statsReceiver)

val scopedStatsReceiver = statsReceiver.scope("foo")

val histogramCounter4 =
histogramCounterFactory(Seq("bar"), StatsFrequency.HundredMilliSecondly, scopedStatsReceiver)

assert(histogramCounter1 eq histogramCounter2)
assert(histogramCounter1 eq histogramCounter3)
assert(histogramCounter1 eq histogramCounter4)
}

test("Doesn't schedule recording of stats after close is called") {
Time.withCurrentTimeFrozen { tc =>
val timer = new MockTimer()
assert(timer.tasks.size == 0)
val histogramCounterFactory = new HistogramCounterFactory(timer, () => Time.now.inMillis)
assert(timer.tasks.size == 1)
histogramCounterFactory.close()
tc.advance(100.millis)
timer.tick()
assert(timer.tasks.size == 0)
}
}
}

0 comments on commit bc815d5

Please sign in to comment.