Skip to content

Commit

Permalink
DPL: fix dropped obsolete messages with faster rates (#5462)
Browse files Browse the repository at this point in the history
This should reduce the amount of messages being dropped if the
rate becomes to high.
  • Loading branch information
ktf authored Feb 16, 2021
1 parent 8d63de6 commit 01b7ce7
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 28 deletions.
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ foreach(w
Forwarding
ParallelPipeline
ParallelProducer
SlowConsumer
SimpleDataProcessingDevice01
SimpleRDataFrameProcessing
SimpleStatefulProcessing01
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/ChannelInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ enum struct InputChannelState {
/// updated by Control or by the by the incoming flow of messages.
struct InputChannelInfo {
InputChannelState state = InputChannelState::Running;
uint32_t hasPendingEvents = 0;
};

} // namespace o2::framework
Expand Down
58 changes: 30 additions & 28 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -489,42 +489,44 @@ void DataProcessingDevice::doPrepare(DataProcessorContext& context)
int64_t result = -2;
auto& fairMQChannel = context.device->GetChannel(channel.name, 0);
auto& socket = fairMQChannel.GetSocket();
uint32_t events;
socket.Events(&events);
if ((events & 1) == 0) {
continue;
// If we have pending events from a previous iteration,
// we do receive in any case.
// Otherwise we check if there is any pending event and skip
// this channel in case there is none.
if (info.hasPendingEvents == 0) {
socket.Events(&info.hasPendingEvents);
// If we do not read, we can continue.
if ((info.hasPendingEvents & 1) == 0) {
continue;
}
}
// Notice that there seems to be a difference between the documentation
// of zeromq and the observed behavior. The fact that ZMQ_POLLIN
// is raised does not mean that a message is immediately available to
// read, just that it will be available soon, so the receive can
// still return -2. To avoid this we keep receiving on the socket until
// we get a message, consume all the consecutive messages, and then go back
// to the usual loop.
do {
if (events & 1) {
bool oneMessage = false;
while (true) {
FairMQParts parts;
result = fairMQChannel.Receive(parts, 0);
if (result >= 0) {
// Receiving data counts as activity now, so that
// We can make sure we process all the pending
// messages without hanging on the uv_run.
*context.wasActive = true;
DataProcessingDevice::handleData(context, parts, info);
oneMessage = true;
} else {
if (oneMessage) {
break;
}
}
}
} else {
// we get a message. In order not to overflow the DPL queue we process
// one message at the time and we keep track of wether there were more
// to process.
while (true) {
FairMQParts parts;
result = fairMQChannel.Receive(parts, 0);
if (result >= 0) {
DataProcessingDevice::handleData(context, parts, info);
// Receiving data counts as activity now, so that
// We can make sure we process all the pending
// messages without hanging on the uv_run.
break;
}
socket.Events(&events);
} while (events & 1);
}
// We check once again for pending events, keeping track if this was the
// case so that we can immediately repeat this loop and avoid remaining
// stuck in uv_run. This is because we will not get notified on the socket
// if more events are pending due to zeromq level triggered approach.
socket.Events(&info.hasPendingEvents);
if (info.hasPendingEvents) {
*context.wasActive |= true;
}
}
}

Expand Down
57 changes: 57 additions & 0 deletions Framework/Core/test/test_SlowConsumer.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/ConfigParamSpec.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/RawDeviceService.h"
#include "Framework/ControlService.h"
#include <FairMQDevice.h>
#include <InfoLogger/InfoLogger.hxx>

#include <chrono>
#include <thread>
#include <vector>

#include "Framework/runDataProcessing.h"
using namespace o2::framework;

// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
return WorkflowSpec{
{"A",
Inputs{},
{OutputSpec{{"a"}, "TST", "A"}},
AlgorithmSpec{adaptStateful([]() { return adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, ControlService& control) {
static int count = 0;
auto& aData = outputs.make<int>(OutputRef{"a"});
LOG(info) << count;
aData = count++;
if (count > 3000) {
control.endOfStream();
control.readyToQuit(QuitRequest::Me);
}
}); })}},
{"B",
{InputSpec{"x", "TST", "A", Lifetime::Timeframe}},
{},
AlgorithmSpec{adaptStateful([]() { return adaptStateless(
[](InputRecord& inputs, RawDeviceService& device, ControlService& control) {
static int expected = 0;
device.device()->WaitFor(std::chrono::milliseconds(3));
auto& count = inputs.get<int>("x");
if (expected != count) {
LOGP(ERROR, "Missing message. Expected: {}, Found {}.", expected, count);
control.readyToQuit(QuitRequest::All);
}
expected++;
}); })}}};
}

0 comments on commit 01b7ce7

Please sign in to comment.