Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Documentation for PooledChannelConnectionFactory #1557

Open
m-ober opened this issue Jan 16, 2023 · 9 comments
Open

Improve Documentation for PooledChannelConnectionFactory #1557

m-ober opened this issue Jan 16, 2023 · 9 comments

Comments

@m-ober
Copy link

m-ober commented Jan 16, 2023

In what version(s) of Spring AMQP are you seeing this issue?

Tested with 2.4.3 and 2.4.8

Describe the bug

We have a Spring Boot application that uses a PooledChannelConnectionFactory and a CachingConnectionFactory inside a SimpleRoutingConnectionFactory(the default connection used is the PooledChannelConnectionFactory, the other one is only used for few requests which where not used when experiencing the described problem). We noticed that after some time and loading the application with (HTTP) requests, the container is shut down because the /actuator/health endpoint stops responding.

We traced it down to the channel pool being exhausted, that is, this line (PooledChannelConnectionFactory.java:196) waits forever:

Channel channel = transactional ? this.txChannels.borrowObject() : this.channels.borrowObject();

We are using the default RabbitHealthIndicator provided by the org.springframework.boot.actuate.amqp package, which does the check by executing:

return this.rabbitTemplate
	.execute((channel) -> channel.getConnection().getServerProperties().get("version").toString());

The connection configuration is straight forward:

var pooledChannelConnectionFactory = new PooledChannelConnectionFactory(connectionFactory);
pooledChannelConnectionFactory.setRequestedHeartBeat(configProperties.getRabbitmq().getHeartbeatSeconds());
pooledChannelConnectionFactory.setConnectionNameStrategy(factory -> String.format(
    "%s-%s",
    configProperties.getName(),
    factory.getPublisherConnectionFactory()
));

We have a RabbitListener for incoming messages:

@RabbitListener(
    queues = "${...}",
    ackMode = "NONE",
    messageConverter = "...",
    errorHandler = "..."
)

And send outgoing messages using rabbitTemplate.convertSendAndReceive().

This is the state of the pool when it stops working:

GenericObjectPool [maxTotal=8, blockWhenExhausted=true, maxWaitDuration=PT-0.001S, lifo=true, fairness=false, testOnCreate=false, testOnBorrow=false, testOnReturn=false, testWhileIdle=false, durationBetweenEvictionRuns=PT-0.001S, numTestsPerEvictionRun=3, minEvictableIdleTimeDuration=PT30M, softMinEvictableIdleTimeDuration=PT-0.001S, evictionPolicy=org.apache.commons.pool2.impl.DefaultEvictionPolicy@2aa6311a, closeLock=java.lang.Object@61f39bb, closed=false, evictionLock=java.lang.Object@249e0271, evictor=null, evictionIterator=null, factoryClassLoader=java.lang.ref.WeakReference@4893b344, oname=org.apache.commons.pool2:type=GenericObjectPool,name=pool, creationStackTrace=java.lang.Exception
	at org.apache.commons.pool2.impl.BaseGenericObjectPool.<init>(BaseGenericObjectPool.java:407)
	at org.apache.commons.pool2.impl.GenericObjectPool.<init>(GenericObjectPool.java:147)
	at org.apache.commons.pool2.impl.GenericObjectPool.<init>(GenericObjectPool.java:130)
	at org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory$ConnectionWrapper.<init>(PooledChannelConnectionFactory.java:183)
	at org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory.createConnection(PooledChannelConnectionFactory.java:140)
	at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:141)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.checkMismatchedQueues(AbstractMessageListenerContainer.java:1863)
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:1407)
	at org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry.startIfNecessary(RabbitListenerEndpointRegistry.java:289)
	at org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry.start(RabbitListenerEndpointRegistry.java:239)
	at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
	at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
	at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
	at java.base/java.lang.Iterable.forEach(Iterable.java:75)
	at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
	at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
	at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:731)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:307)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)
	at ...Application.main(Application.java:10)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
	at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
	at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
, borrowedCount=157, returnedCount=149, createdCount=8, destroyedCount=0, destroyedByEvictorCount=0, destroyedByBorrowValidationCount=0, activeTimes=StatsStore [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], size=100, index=49], idleTimes=StatsStore [[15001, 1, 14996, 29999, 14998, 1, 14998, 29999, 14999, 14999, 14998, 14999, 14999, 0, 14998, 0, 6520, 44999, 0, 14996, 15000, 14998, 14998, 14999, 14999, 14999, 14999, 15000, 0, 15002, 30005, 14994, 14994, 15004, 15004, 791, 14993, 1, 14997, 1, 14993, 0, 14999, 0, 14998, 0, 14999, 1, 14997, 0, 14998, 0, 14998, 0, 14999, 0, 10112]], size=100, index=57], waitTimes=StatsStore [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]], size=100, index=57], maxBorrowWaitDuration=PT0.082S, swallowedExceptionListener=null, factoryType=null, maxIdle=8, minIdle=0, factory=org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory$ConnectionWrapper$ChannelFactory@53a665ad, allObjects={IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,1)]=Object: AMQChannel(amqp://[email protected]:5672/,1), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,6)]=Object: AMQChannel(amqp://[email protected]:5672/,6), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,7)]=Object: AMQChannel(amqp://[email protected]:5672/,7), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,4)]=Object: AMQChannel(amqp://[email protected]:5672/,4), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,8)]=Object: AMQChannel(amqp://[email protected]:5672/,8), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,2)]=Object: AMQChannel(amqp://[email protected]:5672/,2), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,5)]=Object: AMQChannel(amqp://[email protected]:5672/,5), State: ALLOCATED, IdentityWrapper [instance=AMQChannel(amqp://[email protected]:5672/,3)]=Object: AMQChannel(amqp://[email protected]:5672/,3), State: ALLOCATED}, createCount=8, idleObjects=[], abandonedConfig=null]

To Reproduce

I am not yet able to provide a minimum working example to easily reproduce this behaviour. It requires an application that runs for some time and is loaded with requests. At some point, the RabbitHealthIndicator stops working, that is, the liveness/readiness endpoints time out.

Expected behavior

The channel pool should not get exhausted.

Sample

Not yet available.

@m-ober
Copy link
Author

m-ober commented Jan 16, 2023

I can set setBlockWhenExhausted(false) on the pool, and then I get a java.util.NoSuchElementException: Pool exhausted when the load reaches a certain threshold - but at that point only 8 channels are borrowed. This seems like a pretty low number.

But even worse, the Application never recovers from an exhausted pool, no matter if I set setBlockWhenExhausted to true or false. If I set it to true, everything just blocks (as expected...), if I set it to false, the log is spammed with the above message. Even if I stop all load on the Application and wait some time, it just never recovers from the "exhausted pool" condition.

At this point, I can only kill the application, which will also give the following Exception:

org.springframework.amqp.UncategorizedAmqpException: java.lang.InterruptedException
	at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:82)
	at org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory$ConnectionWrapper.createChannel(PooledChannelConnectionFactory.java:201)

Update: I can observe the same behaviour when I remove the Routing/Caching Connection Factory und just use a single PooledChannelConnectionFactory.

@m-ober
Copy link
Author

m-ober commented Jan 16, 2023

I tried using the ThreadChannelConnectionFactory instead of the PooledChannelConnectionFactory, this also yields some error message / Exceptions:

basicConsume failed, scheduling consumer SimpleConsumer [queue=amq.rabbitmq.reply-to, index=3, consumerTag=null identity=5c8b439d] for restart

channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - reply consumer already set, class-id=60, method-id=20)

Consumer canceled - channel closed SimpleConsumer [queue=amq.rabbitmq.reply-to, index=0, consumerTag=amq.ctag-r4GOUDMpfBKtSxOYSWvrJw identity=58f91281]

@garyrussell
Copy link
Contributor

There is something strange going on; it shouldn't be creating new connections...

at org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory$ConnectionWrapper.(PooledChannelConnectionFactory.java:183)

There is one shared connection...

public synchronized Connection createConnection() throws AmqpException {
if (this.connection == null || !this.connection.isOpen()) {
Connection bareConnection = createBareConnection(); // NOSONAR - see destroy()
this.connection = new ConnectionWrapper(bareConnection.getDelegate(), getCloseTimeout(), // NOSONAR
this.simplePublisherConfirms, this.poolConfigurer, getChannelListener()); // NOSONAR
getConnectionListener().onCreate(this.connection);
}
return this.connection;
}

Also that stack trace is creating the pool, not trying to check out a channel.

It doesn't look like it has anything to do with runtime...

        // Populate the creation stack trace
        this.creationStackTrace = getStackTrace(new Exception());  <<<<<<<<<<<<<< Line 407

Finally, this connection factory is not really designed for consumers because they have long-lived channels.

If used there, the channel pool size must be large enough for the number of containers multiplied by their concurrency (plus a few more for publishing operations - but it is better to use a different connection for consumers and producers).

You can configure the pool size by adding a configurer.

/**
* Add a consumer to configure the object pool. The second argument is true when
* called with the transactional pool.
* @param poolConfigurer the configurer.
*/
public void setPoolConfigurer(BiConsumer<GenericObjectPool<Channel>, Boolean> poolConfigurer) {

@m-ober
Copy link
Author

m-ober commented Jan 17, 2023

Also that stack trace is creating the pool, not trying to check out a channel.

Yes, I'm just monitoring/printing the pool stats to see how many channels are currently borrowed.

Finally, this connection factory is not really designed for consumers because they have long-lived channels.

I can disable the consuming part (the @RabbitListener) of this Application completely and still see the same behaviour (update for clarification: the App has two entry points, one via HTTP endpoints and one via @RabbitListener. Additionally, messages are exchanged with other services for both entry points):

  • Application is loaded via HTTP endpoint
  • App communicates with other services via rabbitTemplate.convertSendAndReceive()
  • HTTP Response is returned

As I increase the load, more channels are borrowed from the pool, but they are never returned. If I stop all requests, the borrowed count stays the same. If the borrow count reaches the maximum (8) the application hangs and never recovers, as the channels are never returned to the pool.

If used there, the channel pool size must be large enough for the number of containers multiplied by their concurrency (plus a few more for publishing operations - but it is better to use a different connection for consumers and producers).

If I increase the pool size, the problem would still be the same, or am I mistaken? As load increases, channels are borrowed, never returned, at some point the pool is exhausted. We just tried it with a pool size of 30, and it is still exhausted at some point.

Is the PooledChannelConnectionFactory not working together with convertSendAndReceive()?

The docs says:

For most use cases, the PooledChannelConnectionFactory should be used. The ThreadChannelConnectionFactory can be used if you want to ensure strict message ordering without the need to use Scoped Operations. The CachingConnectionFactory should be used if you want to use correlated publisher confirmations or if you wish to open multiple connections, via its CacheMode.

@garyrussell
Copy link
Contributor

garyrussell commented Jan 17, 2023

Yes, I'm just monitoring/printing the pool stats to see how many channels are currently borrowed.

Well, the stack trace in those stats is useless; it is from ancient history; a thread dump would have been better.

borrowed from the pool, but they are never returned.

The template reliably closes the channel (returns it to the pool) in a finally block in doExecute():

finally {
cleanUpAfterAction(channel, invokeScope, resourceHolder, connection);
}
}
private void cleanUpAfterAction(@Nullable Channel channel, boolean invokeScope,
@Nullable RabbitResourceHolder resourceHolder, @Nullable Connection connection) {
if (!invokeScope) {
if (resourceHolder != null) {
ConnectionFactoryUtils.releaseResources(resourceHolder);
}
else {
RabbitUtils.closeChannel(channel);
RabbitUtils.closeConnection(connection);
}
}
}

However, when using direct reply-to (the default), by default the template uses a DirectReplyToMessageListenerContainer to handle replies, so that container will use long-lived channels (the number of which will increase under load). See doSendAndReceiveWithDirect(). So the pool size needs to be large enough to handle your expected load.

Or, you can set useDirectReplyToContainer to false to use direct reply-to with shorter lived channels. Or useTemporaryReplyQueues to true to use that mechanism instead of direct reply-to.

https://docs.spring.io/spring-amqp/docs/current/reference/html/#direct-reply-to

@garyrussell
Copy link
Contributor

garyrussell commented Jan 17, 2023

With the direct reply-to container any option, each channel is reserved for the entire request/reply duration so, depending on the volume, and responsiveness of the server, potentially a large number of channels may be needed.

@m-ober
Copy link
Author

m-ober commented Jan 31, 2023

each channel is reserved for the entire request/reply duration so, depending on the volume, and responsiveness of the server, potentially a large number of channels may be needed.

That would not be a big issue, at least not if it only blocks during the request/reply duration, not forever (because channels are never returned)

So the pool size needs to be large enough to handle your expected load.

There can always be load spikes, and then the application would just hang forever until restarted. That's really the issue here. Having "only" degraded performance because actual load > expected load would be acceptable.

So, if I understood you correctly:

  • useDirectReplyToContainer=false or useTemporaryReplyQueues=true might still cause issues if the pool is too small, but as the channels will be returned with either of these options, the application will continue to work as soon as a channel is available again?
  • What about using the CachingConnectionFactory (without modifying any of the other options) - actually, that's what we did and we are no longer observing any issues.

What solution would you prefer?

At this point, as it looks like everything is working as intended, I think the documentation should be updated to "warn" about this behaviour. I'd guess the use case I have shown is not that exotic and others might run into the same issue.

@garyrussell
Copy link
Contributor

but as the channels will be returned with either of these options

Correct.

What solution would you prefer?

It's up to you.

In order of best performance...

  • direct reply container with a large enough pool (or CachingConnectionFactory with its default settings)
  • useDirectReplyToContainer=false
  • useTemporaryReplyQueues=true

CachingConnectionFactory is fine too - its cache is unbounded by default.

@m-ober
Copy link
Author

m-ober commented Feb 2, 2023

Thanks for the clarification!

I would leave it up to you if you want to close this issue, because it's not a real bug, or leave it open to update the/change the type to "documentation".

@garyrussell garyrussell changed the title Object pool of PooledChannelConnectionFactory is exhausted, blocks indefinitely Improve Documentation for PooledChannelConnectionFactory Feb 2, 2023
@garyrussell garyrussell added this to the Backlog milestone Feb 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants