Skip to content

Commit

Permalink
modified ReaderTest to test Reader for both persistent and non-persis…
Browse files Browse the repository at this point in the history
…tent topics
  • Loading branch information
zliang-min committed Nov 23, 2024
1 parent b91d537 commit 66e51eb
Showing 1 changed file with 49 additions and 42 deletions.
91 changes: 49 additions & 42 deletions tests/ReaderTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ using namespace pulsar;
static std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";

class ReaderTest : public ::testing::TestWithParam<bool> {
class ReaderTest : public ::testing::TestWithParam<std::tuple<bool, bool>> {
public:
void initTopic(std::string topicName) {
if (isMultiTopic_) {
// call admin api to make it partitioned
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
std::string url = adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}
}

protected:
bool isMultiTopic_ = GetParam();
std::string fullTopicName(const std::string & topicName) {
return (isNonPersistentTopic ? "non-" : "") + "persistent://public/default/" + topicName;
}

bool isNonPersistentTopic = std::get<0>(GetParam());
bool isMultiTopic_ = std::get<1>(GetParam());
};

TEST_P(ReaderTest, testSimpleReader) {
Expand All @@ -62,10 +67,11 @@ TEST_P(ReaderTest, testSimpleReader) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -95,10 +101,11 @@ TEST_P(ReaderTest, testAsyncRead) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));

ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -140,7 +147,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -150,7 +157,7 @@ TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

for (int i = 0; i < 10; i++) {
Message msg;
Expand All @@ -174,7 +181,7 @@ TEST_P(ReaderTest, testMultipleReaders) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -184,10 +191,10 @@ TEST_P(ReaderTest, testMultipleReaders) {

ReaderConfiguration readerConf;
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader1));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader1));

Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader2));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader2));

for (int i = 0; i < 10; i++) {
Message msg;
Expand Down Expand Up @@ -221,7 +228,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -231,7 +238,7 @@ TEST_P(ReaderTest, testReaderOnLastMessage) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

for (int i = 10; i < 20; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand Down Expand Up @@ -261,7 +268,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -271,7 +278,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

MessageId lastMessageId;

Expand All @@ -287,7 +294,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessage) {
}

// Create another reader starting on msgid4
ASSERT_EQ(ResultOk, client.createReader(topicName, lastMessageId, readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), lastMessageId, readerConf, reader));

for (int i = 5; i < 10; i++) {
Message msg;
Expand Down Expand Up @@ -319,7 +326,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Expand All @@ -334,7 +341,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {

ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

std::string lastMessageId;

Expand All @@ -352,7 +359,7 @@ TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
// Create another reader starting on msgid4
auto msgId4 = MessageId::deserialize(lastMessageId);
Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), msgId4, readerConf, reader2));

for (int i = 5; i < 11; i++) {
Message msg;
Expand Down Expand Up @@ -382,12 +389,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopic) {
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
Expand Down Expand Up @@ -457,12 +464,12 @@ TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
Producer producer;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), readerConf, reader));

bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
Expand Down Expand Up @@ -511,7 +518,7 @@ TEST(ReaderTest, testPartitionIndex) {
"ReaderTestPartitionIndex-par-topic-" + std::to_string(time(nullptr));

int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
adminUrl + "admin/v2/" + (isNonPersistentTopic ? "non-" : "") + "persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

const std::string partition0 = partitionedTopic + "-partition-0";
Expand All @@ -520,14 +527,14 @@ TEST(ReaderTest, testPartitionIndex) {
ReaderConfiguration readerConf;
Reader readers[3];
ASSERT_EQ(ResultOk,
client.createReader(nonPartitionedTopic, MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(partition0, MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(partition1, MessageId::earliest(), readerConf, readers[2]));
client.createReader(fullTopicName(nonPartitionedTopic), MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition0), MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(partition1), MessageId::earliest(), readerConf, readers[2]));

Producer producers[3];
ASSERT_EQ(ResultOk, client.createProducer(nonPartitionedTopic, producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(partition0, producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(partition1, producers[2]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(nonPartitionedTopic), producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition0), producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(partition1), producers[2]));

for (auto& producer : producers) {
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
Expand Down Expand Up @@ -555,7 +562,7 @@ TEST_P(ReaderTest, testSubscriptionNameSetting) {
ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());

Expand All @@ -575,7 +582,7 @@ TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) {
readerConf.setInternalSubscriptionName(subName);
readerConf.setSubscriptionRolePrefix("my-prefix");
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf, reader));

ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());

Expand All @@ -594,13 +601,13 @@ TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
ReaderConfiguration readerConf1;
readerConf1.setInternalSubscriptionName(subscriptionName);
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf1, reader1));

ReaderConfiguration readerConf2;
readerConf2.setInternalSubscriptionName(subscriptionName);
Reader reader2;
ASSERT_EQ(ResultConsumerBusy,
client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
client.createReader(fullTopicName(topicName), MessageId::earliest(), readerConf2, reader2));

reader1.close();
reader2.close();
Expand All @@ -616,7 +623,7 @@ TEST_P(ReaderTest, testIsConnected) {
Reader reader;
ASSERT_FALSE(reader.isConnected());

ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::earliest(), {}, reader));
ASSERT_TRUE(reader.isConnected());

ASSERT_EQ(ResultOk, reader.close());
Expand All @@ -633,7 +640,7 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
ProducerConfiguration producerConf;
producerConf.setBatchingMaxMessages(3);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producerConf, producer));

std::vector<MessageId> messageIds;
constexpr int numMessages = 7;
Expand All @@ -657,14 +664,14 @@ TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
bool hasMessageAvailable;

for (size_t i = 0; i < messageIds.size() - 1; i++) {
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_TRUE(hasMessageAvailable);
}

// The start message ID is exclusive by default, so when we start at the last message, there should be no
// message available.
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_FALSE(hasMessageAvailable);
client.close();
Expand All @@ -678,7 +685,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
initTopic(topicName);

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ASSERT_EQ(ResultOk, client.createProducer(fullTopicName(topicName), producer));

MessageId seekMessageId;
for (int i = 0; i < 5; i++) {
Expand All @@ -690,7 +697,7 @@ TEST_P(ReaderTest, testReceiveAfterSeek) {
}

Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader));
ASSERT_EQ(ResultOk, client.createReader(fullTopicName(topicName), MessageId::latest(), {}, reader));

reader.seek(seekMessageId);

Expand Down Expand Up @@ -888,5 +895,5 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) {
}
}

INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Combine(::testing::Values(true, false), ::testing::Values(true, false)));
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));

0 comments on commit 66e51eb

Please sign in to comment.