diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index 1b4653d496c6d..40e3906e1ad4c 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -83,6 +83,7 @@ class DataProcessingDevice : public FairMQDevice uint64_t mLastMetricFlushedTimestamp = 0; /// The timestamp of the last time we actually flushed metrics uint64_t mBeginIterationTimestamp = 0; /// The timestamp of when the current ConditionalRun was started DataProcessingStats mStats; /// Stats about the actual data processing. + int mCurrentBackoff = 0; /// The current exponential backoff value. }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 0030b911b304d..2f9569d35129e 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -38,6 +38,7 @@ #include #include +#include #include #include @@ -51,6 +52,13 @@ using DataHeader = o2::header::DataHeader; constexpr unsigned int MONITORING_QUEUE_SIZE = 100; constexpr unsigned int MIN_RATE_LOGGING = 60; +// This should result in a minimum of 10Hz which should guarantee we do not use +// much time when idle. We do not sleep at all when we are at less then 100us, +// because that's what the default rate enforces in any case. +constexpr int MAX_BACKOFF = 10; +constexpr int MIN_BACKOFF_DELAY = 100; +constexpr int BACKOFF_DELAY_STEP = 100; + namespace o2::framework { @@ -172,6 +180,7 @@ bool DataProcessingDevice::ConditionalRun() &stats = mStats, &lastSent = mLastSlowMetricSentTimestamp, ¤tTime = mBeginIterationTimestamp, + ¤tBackoff = mCurrentBackoff, &monitoring = mServiceRegistry.get()]() -> void { if (currentTime - lastSent < 5000) { @@ -200,6 +209,7 @@ bool DataProcessingDevice::ConditionalRun() .addTag(Key::Subsystem, Value::DPL)); monitoring.send(Metric{(stats.lastTotalProcessedSize / (stats.lastLatency.maxLatency ? stats.lastLatency.maxLatency : 1) / 1000), "input_rate_mb_s"} .addTag(Key::Subsystem, Value::DPL)); + monitoring.send(Metric{(int)currentBackoff, "current_backoff"}.addTag(Key::Subsystem, Value::DPL)); lastSent = currentTime; O2_SIGNPOST_END(MonitoringStatus::ID, MonitoringStatus::SEND, 0, 0, O2_SIGNPOST_BLUE); @@ -267,7 +277,7 @@ bool DataProcessingDevice::ConditionalRun() if (active == false) { mServiceRegistry.get()(CallbackService::Id::Idle); } - mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry); + active |= mRelayer.processDanglingInputs(mExpirationHandlers, mServiceRegistry); this->tryDispatchComputation(); sendRelayerMetrics(); @@ -308,6 +318,23 @@ bool DataProcessingDevice::ConditionalRun() switchState(StreamingState::Idle); return true; } + // Update the backoff factor + // + // In principle we should use 1/rate for MIN_BACKOFF_DELAY and (1/maxRate - + // 1/minRate)/ 2^MAX_BACKOFF for BACKOFF_DELAY_STEP. We hardcode the values + // for the moment to some sensible default. + if (active) { + mCurrentBackoff = std::max(0, mCurrentBackoff - 1); + } else { + mCurrentBackoff = std::min(MAX_BACKOFF, mCurrentBackoff + 1); + } + + if (mCurrentBackoff != 0) { + auto delay = (rand() % ((1 << mCurrentBackoff) - 1)) * BACKOFF_DELAY_STEP; + if (delay > MIN_BACKOFF_DELAY) { + WaitFor(std::chrono::microseconds(delay - MIN_BACKOFF_DELAY)); + } + } return true; } diff --git a/Framework/Core/test/test_CallbackService.cxx b/Framework/Core/test/test_CallbackService.cxx index 232965daa57de..b8953577f1b4f 100644 --- a/Framework/Core/test/test_CallbackService.cxx +++ b/Framework/Core/test/test_CallbackService.cxx @@ -48,7 +48,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) }; ic.services().get().set(CallbackService::Id::ClockTick, callback); return [count](ProcessingContext& ctx) { - if (*count > 1000) { + if (*count > 10) { ctx.services().get().readyToQuit(QuitRequest::All); } };