diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc index c1b17270..fa11a845 100644 --- a/lib/BatchMessageContainerBase.cc +++ b/lib/BatchMessageContainerBase.cc @@ -27,7 +27,7 @@ namespace pulsar { BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& producer) - : topicName_(producer.topic_), + : topicName_(producer.topic()), producerConfig_(producer.conf_), producerName_(producer.producerName_), producerId_(producer.producerId_), diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h index cd17d6d6..ed42c07c 100644 --- a/lib/BatchMessageContainerBase.h +++ b/lib/BatchMessageContainerBase.h @@ -90,7 +90,7 @@ class BatchMessageContainerBase : public boost::noncopyable { protected: // references to ProducerImpl's fields - const std::shared_ptr topicName_; + const std::string topicName_; const ProducerConfiguration& producerConfig_; const std::string& producerName_; const uint64_t& producerId_; diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index d65fe54c..61def687 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -174,7 +174,7 @@ Future ConsumerImpl::getConsumerCreatedFuture() const std::string& ConsumerImpl::getSubscriptionName() const { return originalSubscriptionName_; } -const std::string& ConsumerImpl::getTopic() const { return *topic_; } +const std::string& ConsumerImpl::getTopic() const { return topic(); } void ConsumerImpl::start() { HandlerBase::start(); @@ -194,7 +194,7 @@ void ConsumerImpl::start() { // Initialize ackGroupingTrackerPtr_ here because the get_shared_this_ptr() was not initialized until the // constructor completed. - if (TopicName::get(*topic_)->isPersistent()) { + if (TopicName::get(topic())->isPersistent()) { if (config_.getAckGroupingTimeMs() > 0) { ackGroupingTrackerPtr_.reset(new AckGroupingTrackerEnabled( connectionSupplier, requestIdSupplier, consumerId_, config_.isAckReceiptEnabled(), @@ -249,7 +249,7 @@ Future ConsumerImpl::connectionOpened(const ClientConnectionPtr& c ClientImplPtr client = client_.lock(); uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe( - *topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, + topic(), subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_, subscribeMessageId, readCompacted_, config_.getProperties(), config_.getSubscriptionProperties(), config_.getSchema(), getInitialPosition(), config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy(), config_.getPriorityLevel()); @@ -552,7 +552,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: Message m(messageId, brokerEntryMetadata, metadata, payload); m.impl_->cnx_ = cnx.get(); - m.impl_->setTopicName(topic_); + m.impl_->setTopicName(getTopicPtr()); m.impl_->setRedeliveryCount(msg.redelivery_count()); if (metadata.has_schema_version()) { @@ -1243,7 +1243,7 @@ void ConsumerImpl::closeAsync(ResultCallback originalCallback) { return; } - LOG_INFO(getName() << "Closing consumer for topic " << topic_); + LOG_INFO(getName() << "Closing consumer for topic " << topic()); state_ = Closing; incomingMessages_.close(); @@ -1764,7 +1764,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa return; } if (result != ResultOk) { - LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" << self->consumerName_ << "} Failed to acknowledge the message {" << originMessageId << "} of the original topic but send to the DLQ successfully : " @@ -1777,7 +1777,7 @@ void ConsumerImpl::processPossibleToDLQ(const MessageId& messageId, ProcessDLQCa } }); } else { - LOG_WARN("{" << self->topic_ << "} {" << self->subscription_ << "} {" + LOG_WARN("{" << self->topic() << "} {" << self->subscription_ << "} {" << self->consumerName_ << "} Failed to send DLQ message to {" << self->deadLetterPolicy_.getDeadLetterTopic() << "} for message id " << "{" << originMessageId << "} : " << res); diff --git a/lib/HandlerBase.cc b/lib/HandlerBase.cc index b9299395..986063e0 100644 --- a/lib/HandlerBase.cc +++ b/lib/HandlerBase.cc @@ -30,8 +30,8 @@ DECLARE_LOG_OBJECT() namespace pulsar { HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff) - : client_(client), - topic_(std::make_shared(topic)), + : topic_(std::make_shared(topic)), + client_(client), executor_(client->getIOExecutorProvider()->get()), mutex_(), creationTimestamp_(TimeUtils::now()), @@ -88,7 +88,7 @@ void HandlerBase::grabCnx() { return; } auto self = shared_from_this(); - client->getConnection(*topic_).addListener([this, self](Result result, const ClientConnectionPtr& cnx) { + client->getConnection(topic()).addListener([this, self](Result result, const ClientConnectionPtr& cnx) { if (result == ResultOk) { LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); connectionOpened(cnx).addListener([this, self](Result result, bool) { diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index ad16a220..937b308d 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -87,14 +87,18 @@ class HandlerBase : public std::enable_shared_from_this { virtual const std::string& getName() const = 0; + const std::string& topic() const { return *topic_; } + const std::shared_ptr& getTopicPtr() const { return topic_; } + private: + const std::shared_ptr topic_; + void handleDisconnection(Result result, const ClientConnectionPtr& cnx); void handleTimeout(const boost::system::error_code& ec); protected: ClientImplWeakPtr client_; - const std::shared_ptr topic_; ExecutorServicePtr executor_; mutable std::mutex mutex_; std::mutex pendingReceiveMutex_; diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index dd530381..5162a61c 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -72,7 +72,7 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std interceptors_(interceptors) { std::stringstream consumerStrStream; consumerStrStream << "[Muti Topics Consumer: " - << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]"; + << "TopicName - " << topic() << " - Subscription - " << subscriptionName << "]"; consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { @@ -312,7 +312,7 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated( } void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) { - LOG_INFO("[ Topics Consumer " << topic_ << "," << subscriptionName_ << "] Unsubscribing"); + LOG_INFO("[ Topics Consumer " << topic() << "," << subscriptionName_ << "] Unsubscribing"); auto callback = [this, originalCallback](Result result) { if (result == ResultOk) { @@ -483,7 +483,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) { *numberTopicPartitions_ = 0; if (consumers.empty()) { LOG_DEBUG("TopicsConsumer have no consumers to close " - << " topic" << topic_ << " subscription - " << subscriptionName_); + << " topic" << topic() << " subscription - " << subscriptionName_); callback(ResultAlreadyClosed); return; } @@ -518,7 +518,7 @@ void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) { void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { LOG_DEBUG("Received Message from one of the topic - " << consumer.getTopic() << " message:" << msg.getDataAsString()); - msg.impl_->setTopicName(consumer.impl_->topic_); + msg.impl_->setTopicName(consumer.impl_->getTopicPtr()); Lock lock(pendingReceiveMutex_); if (!pendingReceives_.empty()) { @@ -744,7 +744,7 @@ Future MultiTopicsConsumerImpl::getConsumerCrea } const std::string& MultiTopicsConsumerImpl::getSubscriptionName() const { return subscriptionName_; } -const std::string& MultiTopicsConsumerImpl::getTopic() const { return *topic_; } +const std::string& MultiTopicsConsumerImpl::getTopic() const { return topic(); } const std::string& MultiTopicsConsumerImpl::getName() const { return consumerStr_; } diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc index 41595dd9..a66fbfbd 100644 --- a/lib/ProducerImpl.cc +++ b/lib/ProducerImpl.cc @@ -58,7 +58,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, partition_(partition), producerName_(conf_.getProducerName()), userProvidedProducerName_(false), - producerStr_("[" + *topic_ + ", " + producerName_ + "] "), + producerStr_("[" + topic() + ", " + producerName_ + "] "), producerId_(client->newProducerId()), msgSequenceGenerator_(0), batchTimer_(executor_->createDeadlineTimer()), @@ -67,7 +67,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, memoryLimitController_(client->getMemoryLimitController()), chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()), interceptors_(interceptors) { - LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_ + LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic() << " id: " << producerId_); int64_t initialSequenceId = conf.getInitialSequenceId(); @@ -93,7 +93,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName, if (conf_.isEncryptionEnabled()) { std::ostringstream logCtxStream; - logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]"; + logCtxStream << "[" << topic() << ", " << producerName_ << ", " << producerId_ << "]"; std::string logCtx = logCtxStream.str(); msgCrypto_ = std::make_shared(logCtx, true); msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader()); @@ -123,7 +123,7 @@ ProducerImpl::~ProducerImpl() { } } -const std::string& ProducerImpl::getTopic() const { return *topic_; } +const std::string& ProducerImpl::getTopic() const { return topic(); } const std::string& ProducerImpl::getProducerName() const { return producerName_; } @@ -148,7 +148,7 @@ Future ProducerImpl::connectionOpened(const ClientConnectionPtr& c ClientImplPtr client = client_.lock(); int requestId = client->newRequestId(); - SharedBuffer cmd = Commands::newProducer(*topic_, producerId_, producerName_, requestId, + SharedBuffer cmd = Commands::newProducer(topic(), producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_, userProvidedProducerName_, conf_.isEncryptionEnabled(), static_cast(conf_.getAccessMode()), @@ -218,7 +218,7 @@ Result ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result cnx->registerProducer(producerId_, shared_from_this()); producerName_ = responseData.producerName; schemaVersion_ = responseData.schemaVersion; - producerStr_ = "[" + *topic_ + ", " + producerName_ + "] "; + producerStr_ = "[" + topic() + ", " + producerName_ + "] "; topicEpoch = responseData.topicEpoch; if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) { @@ -788,7 +788,7 @@ void ProducerImpl::closeAsync(CloseCallback originalCallback) { return; } - LOG_INFO(getName() << "Closing producer for topic " << topic_); + LOG_INFO(getName() << "Closing producer for topic " << topic()); state_ = Closing; ClientConnectionPtr cnx = getCnx().lock();