From e3bbc93b8adbcb26943af898742c7a32a8e83aea Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Aug 2024 20:22:52 +0800 Subject: [PATCH] Fix buffer overflow for non-batched send when the message metadata size exceeds 64KB See https://github.com/apache/pulsar-client-python/issues/223 ### Motivation Currently a shared buffer is used to store serialized message metadata for each send request. However, its capacity is only 64KB, when the metadata size exceeds 64KB, buffer overflow could happen. ### Modifications When the metadata size is too large, allocate a new buffer instead of using the shared buffer. Add `testLargeProperties` to cover it. --- lib/Commands.cc | 15 +++++++++++---- tests/ProducerTest.cc | 31 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/lib/Commands.cc b/lib/Commands.cc index 4b10b732..84f272b0 100644 --- a/lib/Commands.cc +++ b/lib/Commands.cc @@ -191,7 +191,7 @@ SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) return buffer; } -PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, ChecksumType checksumType, +PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand& cmd, ChecksumType checksumType, const SendArguments& args) { cmd.set_type(BaseCommand::SEND); CommandSend* send = cmd.mutable_send(); @@ -221,9 +221,16 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, Chec int totalSize = headerContentSize + payloadSize; int checksumReaderIndex = -1; - headers.reset(); - assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize + headerLength - headers.writeUnsignedInt(totalSize); // External frame + // By default, headers refers a static buffer whose capacity is 64KB, which can be reused for headers to + // avoid frequent memory allocation. However, if users configure many properties, the size could be great + // that results a buffer overflow. In this case, we can only allocate a new larger buffer. + originalHeaders.reset(); + auto headers = originalHeaders; + if (headers.writableBytes() < (4 /* header length */ + headerContentSize)) { + headers = SharedBuffer::allocate(4 + headerContentSize); + } + + headers.writeUnsignedInt(totalSize); // External frame // Write cmd headers.writeUnsignedInt(cmdSize); diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc index 21e491de..d0d9eb48 100644 --- a/tests/ProducerTest.cc +++ b/tests/ProducerTest.cc @@ -683,4 +683,35 @@ TEST(ProducerTest, testFailedToCreateNewPartitionProducer) { client.close(); } +TEST(ProducerTest, testLargeProperties) { + const std::string topic = "producer-test-large-properties-" + std::to_string(time(nullptr)); + Client client(serviceUrl); + Producer producer; + ProducerConfiguration conf; + conf.setBatchingEnabled(false); + ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer)); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer)); + + MessageBuilder::StringMap properties; + constexpr int propertyCount = 20000; + auto builder = MessageBuilder().setContent("msg"); + for (int i = 0; i < propertyCount; i++) { + builder.setProperty("key" + std::to_string(i), "value-" + std::to_string(i)); + } + + // ASSERT_EQ(ResultOk, + // producer.send(MessageBuilder().setContent("msg").setProperties(properties).build())); + ASSERT_EQ(ResultOk, producer.send(builder.build())); + + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + ASSERT_EQ(msg.getProperties().size(), propertyCount); + for (int i = 0; i < propertyCount; i++) { + auto it = msg.getProperties().find("key" + std::to_string(i)); + ASSERT_NE(it, msg.getProperties().cend()); + } + client.close(); +} + INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));