diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 6f4eb701fd29a..404f4cf9e2d47 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -266,6 +266,7 @@ foreach(w Forwarding ParallelPipeline ParallelProducer + SlowConsumer SimpleDataProcessingDevice01 SimpleRDataFrameProcessing SimpleStatefulProcessing01 diff --git a/Framework/Core/include/Framework/ChannelInfo.h b/Framework/Core/include/Framework/ChannelInfo.h index 45779f3866a74..40c5dbf31fea4 100644 --- a/Framework/Core/include/Framework/ChannelInfo.h +++ b/Framework/Core/include/Framework/ChannelInfo.h @@ -31,6 +31,7 @@ enum struct InputChannelState { /// updated by Control or by the by the incoming flow of messages. struct InputChannelInfo { InputChannelState state = InputChannelState::Running; + uint32_t hasPendingEvents = 0; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index cf70758e97c8a..a18cb26597c75 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -489,42 +489,44 @@ void DataProcessingDevice::doPrepare(DataProcessorContext& context) int64_t result = -2; auto& fairMQChannel = context.device->GetChannel(channel.name, 0); auto& socket = fairMQChannel.GetSocket(); - uint32_t events; - socket.Events(&events); - if ((events & 1) == 0) { - continue; + // If we have pending events from a previous iteration, + // we do receive in any case. + // Otherwise we check if there is any pending event and skip + // this channel in case there is none. + if (info.hasPendingEvents == 0) { + socket.Events(&info.hasPendingEvents); + // If we do not read, we can continue. + if ((info.hasPendingEvents & 1) == 0) { + continue; + } } // Notice that there seems to be a difference between the documentation // of zeromq and the observed behavior. The fact that ZMQ_POLLIN // is raised does not mean that a message is immediately available to // read, just that it will be available soon, so the receive can // still return -2. To avoid this we keep receiving on the socket until - // we get a message, consume all the consecutive messages, and then go back - // to the usual loop. - do { - if (events & 1) { - bool oneMessage = false; - while (true) { - FairMQParts parts; - result = fairMQChannel.Receive(parts, 0); - if (result >= 0) { - // Receiving data counts as activity now, so that - // We can make sure we process all the pending - // messages without hanging on the uv_run. - *context.wasActive = true; - DataProcessingDevice::handleData(context, parts, info); - oneMessage = true; - } else { - if (oneMessage) { - break; - } - } - } - } else { + // we get a message. In order not to overflow the DPL queue we process + // one message at the time and we keep track of wether there were more + // to process. + while (true) { + FairMQParts parts; + result = fairMQChannel.Receive(parts, 0); + if (result >= 0) { + DataProcessingDevice::handleData(context, parts, info); + // Receiving data counts as activity now, so that + // We can make sure we process all the pending + // messages without hanging on the uv_run. break; } - socket.Events(&events); - } while (events & 1); + } + // We check once again for pending events, keeping track if this was the + // case so that we can immediately repeat this loop and avoid remaining + // stuck in uv_run. This is because we will not get notified on the socket + // if more events are pending due to zeromq level triggered approach. + socket.Events(&info.hasPendingEvents); + if (info.hasPendingEvents) { + *context.wasActive |= true; + } } } diff --git a/Framework/Core/test/test_SlowConsumer.cxx b/Framework/Core/test/test_SlowConsumer.cxx new file mode 100644 index 0000000000000..960573ce3e400 --- /dev/null +++ b/Framework/Core/test/test_SlowConsumer.cxx @@ -0,0 +1,57 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/ConfigParamSpec.h" +#include "Framework/CompletionPolicyHelpers.h" +#include "Framework/DeviceSpec.h" +#include "Framework/RawDeviceService.h" +#include "Framework/ControlService.h" +#include +#include + +#include +#include +#include + +#include "Framework/runDataProcessing.h" +using namespace o2::framework; + +// This is how you can define your processing in a declarative way +WorkflowSpec defineDataProcessing(ConfigContext const& specs) +{ + return WorkflowSpec{ + {"A", + Inputs{}, + {OutputSpec{{"a"}, "TST", "A"}}, + AlgorithmSpec{adaptStateful([]() { return adaptStateless( + [](DataAllocator& outputs, RawDeviceService& device, ControlService& control) { + static int count = 0; + auto& aData = outputs.make(OutputRef{"a"}); + LOG(info) << count; + aData = count++; + if (count > 3000) { + control.endOfStream(); + control.readyToQuit(QuitRequest::Me); + } + }); })}}, + {"B", + {InputSpec{"x", "TST", "A", Lifetime::Timeframe}}, + {}, + AlgorithmSpec{adaptStateful([]() { return adaptStateless( + [](InputRecord& inputs, RawDeviceService& device, ControlService& control) { + static int expected = 0; + device.device()->WaitFor(std::chrono::milliseconds(3)); + auto& count = inputs.get("x"); + if (expected != count) { + LOGP(ERROR, "Missing message. Expected: {}, Found {}.", expected, count); + control.readyToQuit(QuitRequest::All); + } + expected++; + }); })}}}; +}