Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SHM transport: The longer it runs, The more latency increases. (Buffer Full Exception) #789

Open
fujitatomoya opened this issue Nov 20, 2024 · 10 comments

Comments

@fujitatomoya
Copy link
Collaborator

Description

tf / tf_static topics start generating the dropping message warning, 'the timestamp on the message is earlier than all the data in the transform cache`.
once it starts happening, the frequency just increases linearly as time goes.
this actually increases the delay and latency between publisher and subscribers significantly.

System Information

XML configuration

<?xml version="1.0" encoding="UTF-8"?>
<dds xmlns="http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles">
    <profiles>
        <transport_descriptors>
            <!-- Create a descriptor for the new transport -->
            <transport_descriptor>
                <transport_id>shm_transport</transport_id>
                <type>SHM</type>
                <segment_size>10485760</segment_size> 
                <port_queue_capacity>256</port_queue_capacity>
            </transport_descriptor>
        </transport_descriptors>
        <participant profile_name="SHMParticipant" is_default_profile="true">
            <rtps>
                <!-- Link the Transport Layer to the Participant -->
                <userTransports>
                  <transport_id>shm_transport</transport_id>
                </userTransports>
                <!-- <useBuiltinTransports>false</useBuiltinTransports> -->
            </rtps>
        </participant>
        <publisher profile_name="service">
            <qos>
                <reliability>
                    <max_blocking_time>
                        <sec>10</sec>
                    </max_blocking_time>
                </reliability>
            </qos>
        </publisher>
    </profiles>
</dds>

Additional Information

@fujitatomoya
Copy link
Collaborator Author

@MiguelCompany we have detected this issue with production environment. we will give it a shot if falling back to udp_transport see if the problem happens. do you happen know any known issue for this?

@Barry-Xu-2018
Copy link
Contributor

I have some questions about the code in class MultiProducerConsumerRingBuffer. If my understanding is incorrect, please correct me.

Assuming there's currently only 1 free cell (that is, free_cells is 1)

At this moment, a data will push to a cell. It will throw an exception "buffer full". At this time, the value of free_cells is changed from 1 to 0. So, even though no cell was used, the value of the free_cells still decreased by 1.

https://github.com/eProsima/Fast-DDS/blob/b1e4707ad3cfe4cad7e5100318402206e0bd5e78/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp#L219-L232

    bool push(
            const T& data)
    {
        // If no listeners the buffer is dropped
        if (node_->registered_listeners_ == 0)
        {
            return false;
        }

        auto pointer = node_->pointer_.load(std::memory_order_relaxed);

        // if free cells, increase the write pointer and decrease the free cells
        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_relaxed))
        {
        }

        if (pointer.ptr.free_cells == 0)
        {
            throw std::runtime_error("Buffer full");
        }
  ...
 }

If a listener pop cell, and it is the last listener on this cell, this cell can be reused (free_cells should be increased).
At this time, free_cells become 1.
https://github.com/eProsima/Fast-DDS/blob/b1e4707ad3cfe4cad7e5100318402206e0bd5e78/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp#L120-L131

        bool pop()
        {
            ...

            // If all the listeners have read the cell
            if (counter == 1)
            {
                // Increase the free cells => increase the global read pointer
                auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
                while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
                        { { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
                        std::memory_order_release,
                        std::memory_order_relaxed))
                {
                }
            }

And then, a new data will push to a cell. Then we'll face the same issue --- the push didn't succeed, but the free_cells are reduced to 0 again.
This will cause the available cells not to be used and will always be in a buffer full state.

@MiguelCompany
Copy link
Collaborator

At this moment, a data will push to a cell. It will throw an exception "buffer full".

The exception would never be thrown if the current thread successfully updated the counter from 1 to 0.
In that case, the local value of pointer.ptr.free_cells would be 1.

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

@fujitatomoya
Copy link
Collaborator Author

@Barry-Xu-2018 you are saying, this if statement should be moved above?

diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..52e2be7f4 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -216,6 +216,11 @@ public:

         auto pointer = node_->pointer_.load(std::memory_order_relaxed);

+        if (pointer.ptr.free_cells == 0)
+        {
+            throw std::runtime_error("Buffer full");
+        }
+
         // if free cells, increase the write pointer and decrease the free cells
         while (pointer.ptr.free_cells > 0 &&
                 !node_->pointer_.compare_exchange_weak(pointer,
@@ -225,11 +230,6 @@ public:
         {
         }

-        if (pointer.ptr.free_cells == 0)
-        {
-            throw std::runtime_error("Buffer full");
-        }
-
         auto& cell = cells_[get_pointer_value(pointer.ptr.write_p)];

         cell.data(data);

@fujitatomoya
Copy link
Collaborator Author

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

okay that guarantees that previous write operation with other threads before release to be visible when it loads the std::atomic<PtrType>. the expectation is that other threads are writing the data to this atomic field but it is not synchronized yet. and that could lead the buffer full situation with many threads are running?

diff --git a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
index 51e559155..d2be85ada 100644
--- a/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
+++ b/src/cpp/rtps/transport/shared_mem/MultiProducerConsumerRingBuffer.hpp
@@ -85,7 +85,7 @@ public:
          */
         Cell* head()
         {
-            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+            auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);

             // If local read_pointer and write_pointer are equal => buffer is empty for this listener
             if (read_p_ == pointer.ptr.write_p )
@@ -121,7 +121,7 @@ public:
             if (counter == 1)
             {
                 // Increase the free cells => increase the global read pointer
-                auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
+                auto pointer = buffer_.node_->pointer_.load(std::memory_order_acquire);
                 while (!buffer_.node_->pointer_.compare_exchange_weak(pointer,
                         { { pointer.ptr.write_p, pointer.ptr.free_cells + 1 } },
                         std::memory_order_release,
@@ -214,7 +214,7 @@ public:
             return false;
         }

-        auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+        auto pointer = node_->pointer_.load(std::memory_order_acquire);

         // if free cells, increase the write pointer and decrease the free cells
         while (pointer.ptr.free_cells > 0 &&
@@ -240,12 +240,12 @@ public:

     bool is_buffer_full()
     {
-        return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == 0);
+        return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == 0);
     }

     bool is_buffer_empty()
     {
-        return (node_->pointer_.load(std::memory_order_relaxed).ptr.free_cells == node_->total_cells_);
+        return (node_->pointer_.load(std::memory_order_acquire).ptr.free_cells == node_->total_cells_);
     }

     /**
@@ -261,7 +261,7 @@ public:
         // The new listener's read pointer is the current write pointer
         auto listener = std::unique_ptr<Listener>(
             new Listener(
-                *this, node_->pointer_.load(std::memory_order_relaxed).ptr.write_p));
+                *this, node_->pointer_.load(std::memory_order_acquire).ptr.write_p));

         node_->registered_listeners_++;

@@ -293,7 +293,7 @@ public:
     {
         if (node_->registered_listeners_ > 0)
         {
-            auto pointer = node_->pointer_.load(std::memory_order_relaxed);
+            auto pointer = node_->pointer_.load(std::memory_order_acquire);

             uint32_t p = pointer_to_head(pointer);

@Barry-Xu-2018
Copy link
Contributor

Barry-Xu-2018 commented Nov 21, 2024

@MiguelCompany

The exception would never be thrown if the current thread successfully updated the counter from 1 to 0.
In that case, the local value of pointer.ptr.free_cells would be 1.

Thanks. I misunderstand.

You might want to try upgrading all node_->pointer_.load() calls to std::memory_order_acquire, and check whether the situation improves

        // if free cells, increase the write pointer and decrease the free cells
        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_relaxed))
        {
        }

        if (pointer.ptr.free_cells == 0)
        {
            throw std::runtime_error("Buffer full");
        }

No. I think std::memory_order_relaxed is unsuitable, while compare_exchange_weak is failure.
While compare_exchange_weak return failure, inode_->pointer_ will be loaded to pointer. And following comparison "pointer.ptr.free_cells > 0" and "pointer.ptr.free_cells == 0" will depend on updated pointer(Not sure whether read must be after updated pointer). So whether the memory order should be memory_order_acquire while compare_exchange_weak return failure.
That is

        while (pointer.ptr.free_cells > 0 &&
                !node_->pointer_.compare_exchange_weak(pointer,
                { { inc_pointer(pointer.ptr.write_p), pointer.ptr.free_cells - 1 } },
                std::memory_order_release,
                std::memory_order_acquire))

CC: @fujitatomoya This is my thought.

@MiguelCompany
Copy link
Collaborator

Looking at the following example from the std::memory_order documentation makes me think that we only need the changes suggested by @fujitatomoya in #789 (comment)

#include <atomic>
#include <cassert>
#include <thread>
#include <vector>
 
std::vector<int> data;
std::atomic<int> flag = {0};
 
void thread_1()
{
    data.push_back(42);
    flag.store(1, std::memory_order_release);
}
 
void thread_2()
{
    int expected = 1;
    // memory_order_relaxed is okay because this is an RMW,
    // and RMWs (with any ordering) following a release form a release sequence
    while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed))
    {
        expected = 1;
    }
}
 
void thread_3()
{
    while (flag.load(std::memory_order_acquire) < 2)
        ;
    // if we read the value 2 from the atomic flag, we see 42 in the vector
    [assert](http://en.cppreference.com/w/cpp/error/assert)(data.at(0) == 42); // will never fire
}
 
int main()
{
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) a(thread_1);
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) b(thread_2);
    [std::thread](http://en.cppreference.com/w/cpp/thread/thread) c(thread_3);
    a.join(); b.join(); c.join();
}

@Barry-Xu-2018
Copy link
Contributor

        Cell* head()
        {
            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

            // If local read_pointer and write_pointer are equal => buffer is empty for this listener
            if (read_p_ == pointer.ptr.write_p )
            {
                return nullptr;
            }

            auto cell = &buffer_.cells_[get_pointer_value(read_p_)];

            return cell->ref_counter() != 0 ? cell : nullptr;
        }

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);
But I think read_p_ == pointer.ptr.write_p have to be located after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed); since pointer is a temporary variable. But auto cell = &buffer_.cells_[get_pointer_value(read_p_)]; may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed); . Even if executed in this order, the final output won't be affected.

Of course, to ensure order, I agree with changing to use memory_order_acquire.

@fujitatomoya
Copy link
Collaborator Author

@Barry-Xu-2018

i may be mistaken on some points...

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

not really sure what this means here.
i think it just make sure all the store operation in other threads where calls release can be visible on this thread here.

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

this should not be happening, and compiler should not do that. if it does, it breaks the program sequence.

@Barry-Xu-2018
Copy link
Contributor

@fujitatomoya

If memory order is memory_order_acquire, it can make sure reading operations (such as read_p_ == pointer.ptr.write_p) must be after auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

not really sure what this means here.
i think it just make sure all the store operation in other threads where calls release can be visible on this thread here.

I want to express that no reads or writes in the current thread can be reordered before this load.
Refer to the explanation of memory_order_acquire in https://en.cppreference.com/w/cpp/atomic/memory_order.

if memory order is memory_order_relaxed, reading operations may be done before auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

this should not be happening, and compiler should not do that. if it does, it breaks the program sequence.

memory_order_relaxed only guarantees that the operation is atomic. Due to compiler or execution optimization, it's possible that "auto cell = &buffer_.cells_[get_pointer_value(read_p_)];" might be moved ahead of "buffer_.node_->pointer_.load()".
memory_order_acquire can avoid this optimization.

        Cell* head()
        {
            auto pointer = buffer_.node_->pointer_.load(std::memory_order_relaxed);

            // If local read_pointer and write_pointer are equal => buffer is empty for this listener
            if (read_p_ == pointer.ptr.write_p )
            {
                return nullptr;
            }

            auto cell = &buffer_.cells_[get_pointer_value(read_p_)];

            return cell->ref_counter() != 0 ? cell : nullptr;
        }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants