Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[20706] Make reader get_first_untaken_info() coherent with read()/take() (backport #4696) #4708

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion include/fastdds/dds/subscriber/DataReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ class DataReader : public DomainEntity
const void* instance) const;

/**
* @brief Returns information about the first untaken sample.
* @brief Returns information about the first untaken sample. This method is meant to be called prior to
* a read() or take() operation as it does not modify the status condition of the entity.
*
*
* @param [out] info Pointer to a SampleInfo_t structure to store first untaken sample information.
*
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/fastdds/subscriber/DataReaderImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ class DataReaderImpl
SampleInfoSeq& sample_infos);

/**
* @brief Returns information about the first untaken sample.
* @brief Returns information about the first untaken sample. This method is meant to be called prior to
* a read() or take() operation as it does not modify the status condition of the entity.
* @param [out] info Pointer to a SampleInfo structure to store first untaken sample information.
* @return true if sample info was returned. false if there is no sample to take.
*/
Expand Down
16 changes: 14 additions & 2 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,21 @@ bool DataReaderHistory::get_first_untaken_info(
for (auto& it : data_available_instances_)
{
auto& instance_changes = it.second->cache_changes;
if (!instance_changes.empty())
for (auto& instance_change : instance_changes)
{
ReadTakeCommand::generate_info(info, *(it.second), instance_changes.front());
WriterProxy* wp = nullptr;
bool is_future_change = false;

if (mp_reader->begin_sample_access_nts(instance_change, wp, is_future_change))
{
mp_reader->end_sample_access_nts(instance_change, wp, false);
if (is_future_change)
{
continue;
}
}

ReadTakeCommand::generate_info(info, *(it.second), instance_change);
return true;
}
}
Expand Down
106 changes: 106 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <gtest/gtest.h>

#include <fastdds/dds/core/StackAllocatedSequence.hpp>
#include <fastrtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "BlackboxTests.hpp"
Expand Down Expand Up @@ -230,6 +232,110 @@ TEST_P(DDSDataReader, ConsistentTotalUnreadAfterGetFirstUntakenInfo)
ASSERT_EQ(result, ReturnCode_t::RETCODE_OK) << "Reader's unread count is: " << reader.get_unread_count();
}

//! Regression test for #20706
//! get_first_untaken_info() returns the first valid change of an instance, not only the first
//! cache change. This implies searching in all the cache changes of the instance.
//! In the scenario of having multiple reliable writers and one reader with history size > 1 in the same topic,
//! it can happen that get_first_untaken_info() returns OK (as it is not currently checking whether the change is in the future)
//! but take() returns NO_DATA because it is waiting for a previous SequenceNumber from the writer.
TEST(DDSDataReader, GetFirstUntakenInfoReturnsTheFirstValidChange)
{
PubSubWriter<HelloWorldPubSubType> writer_1(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldPubSubType> writer_2(TEST_TOPIC_NAME);
// The reader should not take nor read any sample in this test
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME, false, false, false);

auto testTransport_1 = std::make_shared<test_UDPv4TransportDescriptor>();

EntityId_t writer1_id;
EntityId_t reader_id;

testTransport_1->drop_data_messages_filter_ =
[&writer1_id, &reader_id](eprosima::fastrtps::rtps::CDRMessage_t& msg)-> bool
{
uint32_t old_pos = msg.pos;

// see RTPS DDS 9.4.5.3 Data Submessage
EntityId_t readerID;
EntityId_t writerID;
SequenceNumber_t sn;

msg.pos += 2; // flags
msg.pos += 2; // octets to inline quos
CDRMessage::readEntityId(&msg, &readerID);
CDRMessage::readEntityId(&msg, &writerID);
CDRMessage::readSequenceNumber(&msg, &sn);

// restore buffer pos
msg.pos = old_pos;

// Loose Seqnum 1
if (writerID == writer1_id &&
readerID == reader_id &&
(sn == SequenceNumber_t{0, 1}))
{
return true;
}

return false;
};

writer_1.disable_builtin_transport()
.add_user_transport_to_pparams(testTransport_1)
.history_depth(3)
.init();

writer_2.history_depth(3)
.init();

reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS)
.history_depth(3)
.init();

ASSERT_TRUE(writer_1.isInitialized());
ASSERT_TRUE(writer_2.isInitialized());
ASSERT_TRUE(reader.isInitialized());

writer1_id = writer_1.datawriter_guid().entityId;
reader_id = reader.datareader_guid().entityId;

// Wait for discovery.
writer_1.wait_discovery();
writer_2.wait_discovery();
reader.wait_discovery(std::chrono::seconds::zero(), 2);

// Send writer_1 samples
auto data = default_helloworld_data_generator(3);

reader.startReception(data);
writer_1.send(data);

// The reader should have received samples 2,3 but not 1
// get_first_untaken_info() should never return OK since the received changes are all in the future.
// We try it several times in case the reader has not received the samples yet.
eprosima::fastdds::dds::SampleInfo info;
for (size_t i = 0; i < 3; i++)
{
ASSERT_NE(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK, reader.get_native_reader().get_first_untaken_info(
&info));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}

// Now we send data from writer_2 with no drops and all samples shall be received.
data = default_helloworld_data_generator(3);
writer_2.send(data);
reader.block_for_unread_count_of(3);

// get_first_untaken_info() must return OK now
ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
reader.get_native_reader().get_first_untaken_info(&info));
eprosima::fastdds::dds::StackAllocatedSequence<HelloWorld, 1> data_values;
eprosima::fastdds::dds::SampleInfoSeq sample_infos{1};
// As get_first_untaken_info() returns OK, take() must return OK too
ASSERT_EQ(eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK,
reader.get_native_reader().take(data_values, sample_infos));
}

//! Regression test for Issues #3822 Github #3875
//! This test needs to late join a reader in the same process.
//! Not setting this test as parametrized since it only makes sense in intraprocess.
Expand Down
Loading