Skip to content

Commit

Permalink
Fix possible deadlock of Future when adding a listener after completed
Browse files Browse the repository at this point in the history
### Motivation

There is a case that deadlock could happen for a `Future`. Assume there
is a `Promise` and its `Future`.

1. Call `Future::addListener` to add a listener that tries to acquire a
   user-provided mutex (`lock`).
2. Thread 1: Acquire `lock` first.
3. Thread 2: Call `Promise::setValue`, the listener will be triggered
   first before completed. Since `lock` is held by Thread 1, the
   listener will be blocked.
4. Thread 1: Call `Future::addListener`, since it detects the
   `InternalState::completed_` is true, it will call `get` to retrieve
   the result and value.

Then, deadlock happens:
- Thread 1 waits for `lock` is released, and then complete
  `InternalState::future_`.
- Thread 2 holds `lock` but wait for `InternalState::future_` is
  completed.

In a real world case, if we acquire a lock before
`ProducerImpl::closeAsync`, then another thread call `setValue` in
`ClientConnection::handleSuccess` and the callback of
`createProducerAsync` tries to acquire the lock, `handleSuccess` will be
blocked. Then in `closeAsync`, the current thread will be blocked in:

```c++
    cnx->sendRequestWithId(Commands::newCloseProducer(producerId_, requestId), requestId)
        .addListener([self, callback](Result result, const ResponseData&) { callback(result); });
```

The stacks:

```
Thread 1:
#11 0x00007fab80da2173 in pulsar::InternalState<...>::complete (this=0x3d53e7a10, result=..., value=...) at lib/Futre.h:61
#13 pulsar::ClientConnection::handleSuccess (this=this@entry=0x2214bc000, success=...) at lib/ClientConnection.cc:1552

Thread 2:
#8  get (result=..., this=0x3d53e7a10) at lib/Future.h:69
#9  pulsar::InternalState<...>::addListener (this=this@entry=0x3d53e7a10, listener=...) at lib/Future.h:51
#11 0x00007fab80e8dc4e in pulsar::ProducerImpl::closeAsync at lib/ProducerImpl.cc:794
```

There are two points that make the deadlock:
1. We use `completed_` to represent if the future is completed. However,
   after it's true, the future might not be completed because the value
   is not set and the listeners are not completed.
2. If `addListener` is called after it's completed, we still push the
   listener to `listeners_` so that previous listeners could be executed
   before the new listener. This guarantee is unnecessarily strong.

### Modifications

First, complete the future before calling the listeners.

Then, use an enum to represent the status:
- INITIAL: `complete` has not been called
- COMPLETING: when the 1st time `complete` is called, the status will
  change from INITIAL to COMPLETING
- COMPLETED: the future is completed.

Besides, implementation of `Future` is simplified.
#299 fixes a possible
mutex crash by introducing the `std::future`. However, the root cause is
the conditional variable is not used correctly:

> Even if the shared variable is atomic, it must be modified while owning the mutex to correctly publish the modification to the waiting thread.

See https://en.cppreference.com/w/cpp/thread/condition_variable

The simplest way to fix
#298 is just adding
`lock.lock()` before `state->condition.notify_all();`.
  • Loading branch information
BewareMyPower committed Oct 26, 2023
1 parent 6daf7a5 commit 335a7d1
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 70 deletions.
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <boost/optional.hpp>
#include <functional>
#include <list>
#include <memory>
#include <utility>

Expand Down
94 changes: 45 additions & 49 deletions lib/Future.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
#define LIB_FUTURE_H_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <forward_list>
#include <functional>
#include <future>
#include <list>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>

namespace pulsar {

Expand All @@ -38,71 +35,70 @@ class InternalState {
using Pair = std::pair<Result, Type>;
using Lock = std::unique_lock<std::mutex>;

enum Status : uint8_t
{
INITIAL,
COMPLETING,
COMPLETED
};

// NOTE: Add the constructor explicitly just to be compatible with GCC 4.8
InternalState() {}

void addListener(Listener listener) {
Lock lock{mutex_};
listeners_.emplace_back(listener);
lock.unlock();

if (completed()) {
Type value;
Result result = get(value);
triggerListeners(result, value);
auto result = result_;
auto value = value_;
lock.unlock();
listener(result, value);
} else {
tailListener_ = listeners_.emplace_after(tailListener_, std::move(listener));
}
}

bool complete(Result result, const Type &value) {
bool expected = false;
if (!completed_.compare_exchange_strong(expected, true)) {
Status expected = Status::INITIAL;
if (!status_.compare_exchange_strong(expected, Status::COMPLETING)) {
return false;
}
triggerListeners(result, value);
promise_.set_value(std::make_pair(result, value));
return true;
}

bool completed() const noexcept { return completed_; }

Result get(Type &result) {
const auto &pair = future_.get();
result = pair.second;
return pair.first;
}
// Ensure if another thread calls `addListener` at the same time, that thread can get the value by
// `get` before the existing listeners are executed
Lock lock{mutex_};
result_ = result;
value_ = value;
status_ = COMPLETED;
cond_.notify_all();

// Only public for test
void triggerListeners(Result result, const Type &value) {
while (true) {
Lock lock{mutex_};
if (listeners_.empty()) {
return;
if (!listeners_.empty()) {
auto listeners = std::move(listeners_);
lock.unlock();
for (auto &&listener : listeners) {
listener(result, value);
}
}

bool expected = false;
if (!listenerRunning_.compare_exchange_strong(expected, true)) {
// There is another thread that polled a listener that is running, skip polling and release
// the lock. Here we wait for some time to avoid busy waiting.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
continue;
}
auto listener = std::move(listeners_.front());
listeners_.pop_front();
lock.unlock();
return true;
}

listener(result, value);
listenerRunning_ = false;
}
bool completed() const noexcept { return status_.load() == COMPLETED; }

Result get(Type &value) const {
Lock lock{mutex_};
cond_.wait(lock, [this] { return completed(); });
value = value_;
return result_;
}

private:
std::atomic_bool completed_{false};
std::promise<Pair> promise_;
std::shared_future<Pair> future_{promise_.get_future()};

std::list<Listener> listeners_;
mutable std::mutex mutex_;
std::atomic_bool listenerRunning_{false};
mutable std::condition_variable cond_;
std::forward_list<Listener> listeners_;
decltype(listeners_.before_begin()) tailListener_{listeners_.before_begin()};
Result result_;
Type value_;
std::atomic<Status> status_;
};

template <typename Result, typename Type>
Expand Down
1 change: 1 addition & 0 deletions lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define LIB_PRODUCERIMPL_H_

#include <boost/optional.hpp>
#include <list>
#include <memory>

#include "Future.h"
Expand Down
53 changes: 34 additions & 19 deletions tests/PromiseTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "WaitUtils.h"
#include "lib/Future.h"
#include "lib/LogUtils.h"

Expand Down Expand Up @@ -88,26 +91,38 @@ TEST(PromiseTest, testListeners) {
ASSERT_EQ(values, (std::vector<std::string>(2, "hello")));
}

TEST(PromiseTest, testTriggerListeners) {
InternalState<int, int> state;
state.addListener([](int, const int&) {
LOG_INFO("Start task 1...");
std::this_thread::sleep_for(std::chrono::seconds(1));
LOG_INFO("Finish task 1...");
TEST(PromiseTest, testListenerDeadlock) {
Promise<int, int> promise;
auto future = promise.getFuture();
auto mutex = std::make_shared<std::mutex>();
auto done = std::make_shared<std::atomic_bool>(false);

future.addListener([mutex, done](int, int) {
LOG_INFO("Listener-1 before acquiring the lock");
std::lock_guard<std::mutex> lock{*mutex};
LOG_INFO("Listener-1 after acquiring the lock");
done->store(true);
});
state.addListener([](int, const int&) {
LOG_INFO("Start task 2...");

std::thread t1{[mutex, &future] {
std::lock_guard<std::mutex> lock{*mutex};
// Make it a great chance that `t2` executes `promise.setValue` first
std::this_thread::sleep_for(std::chrono::seconds(2));

// Since the future is completed, `Future::get` will be called in `addListener` to get the result
LOG_INFO("Before adding Listener-2 (acquired the mutex)")
future.addListener([](int, int) { LOG_INFO("Listener-2 is triggered"); });
LOG_INFO("After adding Listener-2 (releasing the mutex)");
}};
t1.detach();
std::thread t2{[mutex, promise] {
// Make there a great chance that `t1` acquires `mutex` first
std::this_thread::sleep_for(std::chrono::seconds(1));
LOG_INFO("Finish task 2...");
});
LOG_INFO("Before setting value");
promise.setValue(0); // the 1st listener is called, which is blocked at acquiring `mutex`
LOG_INFO("After setting value");
}};
t2.detach();

auto start = std::chrono::high_resolution_clock::now();
auto future1 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
auto future2 = std::async(std::launch::async, [&state] { state.triggerListeners(0, 0); });
future1.wait();
future2.wait();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start)
.count();
ASSERT_TRUE(elapsed > 2000) << "elapsed: " << elapsed << "ms";
ASSERT_TRUE(waitUntil(std::chrono::seconds(5000), [done] { return done->load(); }));
}
5 changes: 3 additions & 2 deletions tests/WaitUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@
namespace pulsar {

template <typename Rep, typename Period>
inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
inline bool waitUntil(std::chrono::duration<Rep, Period> timeout, const std::function<bool()>& condition,
long durationMs = 10) {
auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
while (timeoutMs > 0) {
auto now = std::chrono::high_resolution_clock::now();
if (condition()) {
break;
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - now)
.count();
timeoutMs -= elapsed;
}
return false;
}

} // namespace pulsar

0 comments on commit 335a7d1

Please sign in to comment.