Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: provide a more precise detection of empty state in MPSCIntrusiveQueue #337

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions base/mpsc_intrusive_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,18 @@ template <typename T> class MPSCIntrusiveQueue {
void Push(T* item) noexcept {
// item becomes a new tail.
MPSC_intrusive_store_next(item, nullptr);

// we need it to write with at least `release` MO, so that this write won't be reordered
// before resetting item.next above. Otherwise, another producer could push its own item
// after this CAS, and its item.next will be overriden.
T* prev = tail_.exchange(item, std::memory_order_acq_rel);

// link the previous tail to the new tail.
// (*) Also a potential blocking point!
// For more details see the linked article above!
MPSC_intrusive_store_next(prev, item);
// Until (*) completes, the chain is cut at `prev` and Pop can not reach the item
// and its subsequent items.
MPSC_intrusive_store_next(prev, item); // (*)
}

// Pops the first item at the head or returns nullptr if the queue is empty.
Expand Down Expand Up @@ -91,9 +97,30 @@ template <typename T> std::pair<T*, bool> MPSCIntrusiveQueue<T>::PopWeak() noexc
T* next = MPSC_intrusive_load_next(*head); // load(std::memory_order_acquire)
if (stub() == head) {
if (nullptr == next) {
// empty
return {nullptr, true};
// Empty state.
// Caveat: if Push() called on an empty queue but has not crossed the blocking point yet,
// we may reach this condition because head_ is a stub and stub.next is nullptr.
// Unfortunately it may lead to a bizarre scenario where the arbitrary number of
// subsequent pushes will fully complete, but the queue will still be observerd
// as empty by the consumer because the chain will be cut by the Push that is stuck updating
// the stub.
//
// More comments: if we had a single Push that is not completed yet, then returning
// an empty state is fine. The problem arises when we have multiple pushes in parallel,
// the first one has not completed yet, others completed but they are absolutely
// invisible to the consumer.
//
// To disambiguite, we load the tail_ and check if it is the same as the head.
// To sum up:
// 1. if tail is not head, it is quaranteed that the queue is not empty.
// 2. Otherwise, it's most likely empty, due to eventual consistency semantics of
// load/store operations.
// 3. If store is guaranted to be visible due to external conditions, (2) becomes exact.
T* tail = tail_.load(std::memory_order_relaxed);
return {nullptr, tail == head};
}

// skip the stub if needed and continue.
head_ = next;
head = next;
next = MPSC_intrusive_load_next(*next);
Expand All @@ -105,16 +132,20 @@ template <typename T> std::pair<T*, bool> MPSCIntrusiveQueue<T>::PopWeak() noexc
return {head, false};
}

T* tail = tail_.load(std::memory_order_acquire);
T* tail = tail_.load(std::memory_order_relaxed);
romange marked this conversation as resolved.
Show resolved Hide resolved
if (tail != head) {
// non-empty, we are in the middle of push - see a blocking point above.
return {nullptr, false};
}

// tail and head are the same, pointing to the last element in the queue.
// Link stub to the tail to introduce an empty state.
// Link stub to the tail to introduce an empty state. Before the tail_.load above:
// head -> item, tail -> item
Push(stub());

// Unless we had concurrent pushes, now:
// head->item, item.next=stub
// tail->stub, stub.next = nullptr
next = MPSC_intrusive_load_next(*head);
if (nullptr != next) {
head_ = next;
Expand Down
2 changes: 1 addition & 1 deletion base/mpsc_intrusive_queue_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct TestNode {
};

void MPSC_intrusive_store_next(TestNode* dest, TestNode* next_node) {
dest->next.store(next_node, std::memory_order_relaxed);
dest->next.store(next_node, std::memory_order_release);
}

TestNode* MPSC_intrusive_load_next(const TestNode& src) {
Expand Down
2 changes: 0 additions & 2 deletions util/fibers/detail/fiber_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ class FiberInterface {
#endif
}

uint64_t DEBUG_remote_epoch = 0;

protected:
static constexpr uint16_t kTerminatedBit = 0x1;
static constexpr uint16_t kBusyBit = 0x2;
Expand Down
7 changes: 1 addition & 6 deletions util/fibers/detail/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ void Scheduler::ScheduleFromRemote(FiberInterface* cntx) {
// revert the flags.
cntx->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release);
} else {
cntx->DEBUG_remote_epoch = remote_epoch_.fetch_add(1, memory_order_relaxed);
remote_ready_queue_.Push(cntx);

DVLOG(2) << "ScheduleFromRemote " << cntx->name() << " " << cntx->use_count_.load();
Expand Down Expand Up @@ -324,7 +323,6 @@ bool Scheduler::WaitUntil(chrono::steady_clock::time_point tp, FiberInterface* m
bool Scheduler::ProcessRemoteReady(FiberInterface* active) {
bool res = false;
unsigned iteration = 0;
uint64_t epoch = active ? remote_epoch_.load(memory_order_relaxed) : 0;

while (true) {
auto [fi, qempty] = remote_ready_queue_.PopWeak();
Expand All @@ -335,9 +333,7 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) {
FiberInterface* next = active->remote_next_.load(std::memory_order_acquire);
bool qempty = remote_ready_queue_.Empty();
LOG(ERROR) << "Failed to pull active fiber from remote_ready_queue, iteration "
<< iteration << " remote_empty: " << qempty << ", current_epoch: " << epoch
<< ", push_epoch: " << active->DEBUG_remote_epoch
<< ", next:" << (uint64_t)next;
<< iteration << " remote_empty: " << qempty << ", next:" << (uint64_t)next;
LOG(ERROR) << "Stacktrace: " << GetStacktrace();
if (next != (FiberInterface*)FiberInterface::kRemoteFree) {
if (iteration < 100) {
Expand All @@ -357,7 +353,6 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) {

// Marks as free.
fi->remote_next_.store((FiberInterface*)FiberInterface::kRemoteFree, memory_order_relaxed);
fi->DEBUG_remote_epoch = 0;

// clear the bit after we pulled from the queue.
fi->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release);
Expand Down
Loading