Skip to content

Commit

Permalink
Avoid blocking the message listener threads (#332)
Browse files Browse the repository at this point in the history
### Motivation

The message listener thread blocks when the receiver queue of `MultiTopicsConsumerImpl` is full. As message listener threads are used by all consumers in the same `Client`, if one slow consumer blocks the listener threads, all other consuemrs can no longer receive new messages.

### Modifications

1. Modify `MultiTopicsConsumerImpl` to use `UnboundedBlockingQueue` to avoid blocking
2. Modify the permit update logic: Increase permit only after messages consumed from `MultiTopicsConsumerImpl`
  • Loading branch information
erobot authored Oct 22, 2023
1 parent 7cefe0e commit 6daf7a5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 10 deletions.
14 changes: 13 additions & 1 deletion lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,9 @@ void ConsumerImpl::messageProcessed(Message& msg, bool track) {
return;
}

increaseAvailablePermits(currentCnx);
if (!hasParent_) {
increaseAvailablePermits(currentCnx);
}
if (track) {
trackMessage(msg.getMessageId());
}
Expand Down Expand Up @@ -1089,6 +1091,16 @@ void ConsumerImpl::increaseAvailablePermits(const ClientConnectionPtr& currentCn
}
}

void ConsumerImpl::increaseAvailablePermits(const Message& msg) {
ClientConnectionPtr currentCnx = getCnx().lock();
if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
LOG_DEBUG(getName() << "Not adding permit since connection is different.");
return;
}

increaseAvailablePermits(currentCnx);
}

inline CommandSubscribe_SubType ConsumerImpl::getSubType() {
ConsumerType type = config_.getConsumerType();
switch (type) {
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ConsumerImpl : public ConsumerImplBase {
void discardCorruptedMessage(const ClientConnectionPtr& cnx, const proto::MessageIdData& messageId,
CommandAck_ValidationError validationError);
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx, int delta = 1);
void increaseAvailablePermits(const Message& msg);
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage,
const BitSet& ackSet, int redeliveryCount);
Expand Down
1 change: 1 addition & 0 deletions lib/MessageImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class MessageImpl {
int redeliveryCount_;
bool hasSchemaVersion_;
const std::string* schemaVersion_;
std::weak_ptr<class ConsumerImpl> consumerPtr_;

const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
Expand Down
16 changes: 9 additions & 7 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic()
<< " message:" << msg.getDataAsString());
msg.impl_->setTopicName(consumer.impl_->getTopicPtr());
msg.impl_->consumerPtr_ = std::static_pointer_cast<ConsumerImpl>(consumer.impl_);

Lock lock(pendingReceiveMutex_);
if (!pendingReceives_.empty()) {
Expand All @@ -530,18 +531,15 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
auto self = weakSelf.lock();
if (self) {
notifyPendingReceivedCallback(ResultOk, msg, callback);
auto consumer = msg.impl_->consumerPtr_.lock();
if (consumer) {
consumer->increaseAvailablePermits(msg);
}
}
});
return;
}

if (incomingMessages_.full()) {
lock.unlock();
}

// add message to block queue.
// when messages queue is full, will block listener thread on ConsumerImpl,
// then will not send permits to broker, will broker stop push message.
incomingMessages_.push(msg);
incomingMessagesSize_.fetch_add(msg.getLength());

Expand Down Expand Up @@ -1072,6 +1070,10 @@ void MultiTopicsConsumerImpl::notifyBatchPendingReceivedCallback(const BatchRece
void MultiTopicsConsumerImpl::messageProcessed(Message& msg) {
incomingMessagesSize_.fetch_sub(msg.getLength());
unAckedMessageTrackerPtr_->add(msg.getMessageId());
auto consumer = msg.impl_->consumerPtr_.lock();
if (consumer) {
consumer->increaseAvailablePermits(msg);
}
}

std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImpl::get_shared_this_ptr() {
Expand Down
4 changes: 2 additions & 2 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <memory>
#include <vector>

#include "BlockingQueue.h"
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "ConsumerInterceptors.h"
Expand All @@ -33,6 +32,7 @@
#include "LookupDataResult.h"
#include "SynchronizedHashMap.h"
#include "TestUtil.h"
#include "UnboundedBlockingQueue.h"

namespace pulsar {
typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr;
Expand Down Expand Up @@ -115,7 +115,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
std::map<std::string, int> topicsPartitions_;
mutable std::mutex mutex_;
std::mutex pendingReceiveMutex_;
BlockingQueue<Message> incomingMessages_;
UnboundedBlockingQueue<Message> incomingMessages_;
std::atomic_int incomingMessagesSize_ = {0};
MessageListener messageListener_;
DeadlineTimerPtr partitionsUpdateTimer_;
Expand Down
66 changes: 66 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1335,4 +1335,70 @@ TEST(ConsumerTest, testRetrySubscribe) {
// milliseconds
}

TEST(ConsumerTest, testNoListenerThreadBlocking) {
Client client{lookupUrl};

const int numPartitions = 2;
const std::string partitionedTopic = "testNoListenerThreadBlocking-" + std::to_string(time(nullptr));
int res =
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

const int receiverQueueSize = 1;
const int receiverQueueSizeAcrossPartitions = receiverQueueSize * numPartitions;

Consumer consumer1, consumer2;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(receiverQueueSize);
consumerConfig.setMaxTotalReceiverQueueSizeAcrossPartitions(receiverQueueSizeAcrossPartitions);
Result consumerResult;
consumerResult = client.subscribe(partitionedTopic, "sub1", consumerConfig, consumer1);
ASSERT_EQ(consumerResult, ResultOk);
consumerResult = client.subscribe(partitionedTopic, "sub2", consumerConfig, consumer2);
ASSERT_EQ(consumerResult, ResultOk);

Producer producer;
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
Result producerResult = client.createProducer(partitionedTopic, producerConfig, producer);
ASSERT_EQ(producerResult, ResultOk);

const int msgCount = receiverQueueSizeAcrossPartitions * 100;

for (int i = 0; i < msgCount; ++i) {
auto msg = MessageBuilder().setContent("test").build();
producer.sendAsync(msg, [](Result code, const MessageId& messageId) {});
}
producer.flush();
producer.close();

waitUntil(std::chrono::seconds(1), [consumer1] {
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
return multiConsumerImpl->getNumOfPrefetchedMessages() == receiverQueueSizeAcrossPartitions;
});

// check consumer1 prefetch num
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer1);
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);

// read consumer2 while consumer1 reaches the prefech limit
for (int i = 0; i < msgCount; ++i) {
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer2);
int prefetchNum = multiConsumerImpl->getNumOfPrefetchedMessages();
ASSERT_LE(prefetchNum, receiverQueueSizeAcrossPartitions);

Message msg;
Result ret = consumer2.receive(msg, 1000);
ASSERT_EQ(ret, ResultOk);
consumer2.acknowledge(msg);
}

consumer2.close();
consumer1.close();
client.close();
}

} // namespace pulsar

0 comments on commit 6daf7a5

Please sign in to comment.