Skip to content

Commit

Permalink
DPL: introduce exponential backoff when idle
Browse files Browse the repository at this point in the history
This should reduce the amount of CPU used by data processors when they are idle
while retaining the ability to increase back the rate, should it be needed.
  • Loading branch information
ktf committed Feb 25, 2020
1 parent 174e54b commit f800483
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class DataProcessingDevice : public FairMQDevice
uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics
uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started
DataProcessingStats mStats; /// Stats about the actual data processing.
int mCurrentBackoff = 0; /// The current exponential backoff value.
};

} // namespace o2::framework
Expand Down
29 changes: 28 additions & 1 deletion Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <TMessage.h>
#include <TClonesArray.h>

#include <algorithm>
#include <vector>
#include <memory>

Expand All @@ -51,6 +52,13 @@ using DataHeader = o2::header::DataHeader;
constexpr unsigned int MONITORING_QUEUE_SIZE = 100;
constexpr unsigned int MIN_RATE_LOGGING = 60;

// This should result in a minimum of 10Hz which should guarantee we do not use
// much time when idle. We do not sleep at all when we are at less then 100us,
// because that's what the default rate enforces in any case.
constexpr int MAX_BACKOFF = 10;
constexpr int MIN_BACKOFF_DELAY = 100;
constexpr int BACKOFF_DELAY_STEP = 100;

namespace o2::framework
{

Expand Down Expand Up @@ -172,6 +180,7 @@ bool DataProcessingDevice::ConditionalRun()
&stats = mStats,
&lastSent = mLastSlowMetricSentTimestamp,
&currentTime = mBeginIterationTimestamp,
&currentBackoff = mCurrentBackoff,
&monitoring = mServiceRegistry.get<Monitoring>()]()
-> void {
if (currentTime - lastSent < 5000) {
Expand Down Expand Up @@ -200,6 +209,7 @@ bool DataProcessingDevice::ConditionalRun()
.addTag(Key::Subsystem, Value::DPL));
monitoring.send(Metric{(stats.lastTotalProcessedSize / (stats.lastLatency.maxLatency ? stats.lastLatency.maxLatency : 1) / 1000), "input_rate_mb_s"}
.addTag(Key::Subsystem, Value::DPL));
monitoring.send(Metric{(int)currentBackoff, "current_backoff"}.addTag(Key::Subsystem, Value::DPL));

lastSent = currentTime;
O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE);
Expand Down Expand Up @@ -267,7 +277,7 @@ bool DataProcessingDevice::ConditionalRun()
if (active == false) {
mServiceRegistry.get<CallbackService>()(CallbackService::Id::Idle);
}
mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry);
active |= mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry);
this->tryDispatchComputation();

sendRelayerMetrics();
Expand Down Expand Up @@ -308,6 +318,23 @@ bool DataProcessingDevice::ConditionalRun()
switchState(StreamingState::Idle);
return true;
}
// Update the backoff factor
//
// In principle we should use 1/rate for MIN_BACKOFF_DELAY and (1/maxRate -
// 1/minRate)/ 2^MAX_BACKOFF for BACKOFF_DELAY_STEP. We hardcode the values
// for the moment to some sensible default.
if (active) {
mCurrentBackoff = std::max(0, mCurrentBackoff - 1);
} else {
mCurrentBackoff = std::min(MAX_BACKOFF, mCurrentBackoff + 1);
}

if (mCurrentBackoff != 0) {
auto delay = (rand() % ((1 << mCurrentBackoff) - 1)) * BACKOFF_DELAY_STEP;
if (delay > MIN_BACKOFF_DELAY) {
WaitFor(std::chrono::microseconds(delay - MIN_BACKOFF_DELAY));
}
}
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/test/test_CallbackService.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
};
ic.services().get<CallbackService>().set(CallbackService::Id::ClockTick, callback);
return [count](ProcessingContext& ctx) {
if (*count > 1000) {
if (*count > 10) {
ctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
}
};
Expand Down

0 comments on commit f800483

Please sign in to comment.