Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitTemplate.send throw java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk #1337

Open
CrackerGit opened this issue May 11, 2021 · 10 comments

Comments

@CrackerGit
Copy link

spring-rabbit 2.1.13

Question

I use RabbitTemplate to send messages where the caching mode is Connection and the number of cached connections is less than the maximum number of connections.
At this time I used 50 concurrent pressure test abnormal

java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk
@artembilan
Copy link
Member

Can you show, please, more stack trace to see what Spring AMQP code is involved in the problem?
Any chances to have something from your what we can run on our side and reproduce?

@garyrussell
Copy link
Contributor

garyrussell commented May 11, 2021

Also, 2.1.x is at end of life; consider upgrading to 2.2.16.RELEASE or 2.3.6.

https://spring.io/projects/spring-amqp#learn

@CrackerGit
Copy link
Author

CrackerGit commented May 12, 2021

this is my code

package com.wppilu.learn.spring.amqp;

import lombok.extern.slf4j.Slf4j;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;

import java.nio.charset.StandardCharsets;

@BenchmarkMode(Mode.Throughput)
@Threads(100)
@State(Scope.Benchmark)
@Warmup(iterations = 3,time = 5)
@Measurement(iterations = 3,time = 30)
@SpringBootApplication
@Slf4j
public class RabbitMqTemplatePerf {
    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder()
            .include(RabbitMqTemplatePerf.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(options).run();
    }

    @RabbitListener(queues = {"msg-test"})
    public void handle(Message message){

    }


    private ConfigurableApplicationContext context;
    private RabbitTemplate rabbitTemplate;

    @Setup
    public void init(){
         context = SpringApplication.run(RabbitMqTemplatePerf.class);
        rabbitTemplate=context.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

        });
    }

    @TearDown
    public void destroy(){
        context.close();
    }

    @Benchmark
    public void send(){
        try{
            rabbitTemplate.send("test","a.b", MessageBuilder.withBody("11".getBytes(StandardCharsets.UTF_8)).build());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

with config

spring:
    rabbitmq:
        publisher-confirms: true
        cache:
            connection:
                mode: connection
                size: 10
            channel:
                checkout-timeout: 30000
                size: 2000

cause :

java.lang.ClassCastException: com.rabbitmq.client.impl.AMQImpl$Channel$OpenOk cannot be cast to com.rabbitmq.client.impl.AMQImpl$Confirm$SelectOk
	at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:1552)
	at com.rabbitmq.client.impl.ChannelN.confirmSelect(ChannelN.java:52)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:677)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:668)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:627)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:518)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1700(CachingConnectionFactory.java:102)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1380)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2079)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2047)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:996)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:989)

@garyrussell
Copy link
Contributor

Appears to be a a problem in the amqp-client; the openOk is returned for the open request and it is somehow coming as a reply to the confirmSelect(). I suggest you ask about it on the rabbitmq-users Google group. Make sure you tell them what amqp-client version you are using.

Have you tried upgrading to a supported Spring AMQP version? They use newer clients.

@CrackerGit
Copy link
Author

OK, I have tried the new version and this problem does not appear. I'm going to ask rabbitmq-users. I'm just curious about the cause of the problem. My previous guess is that if a connection exceeds the cache size, it will be physically closed, and there is a possibility to get the same Channel (one closed and one unclosed) while sending messages using the same connection. CachingConnectionFactory doesn't seem to have this problem.

@CrackerGit
Copy link
Author

I have a detailed look at the code and found still have this problem. In 'CachingConnectionFactory.doCreateBareChannel'
And 'CachedChannelInvocationHandler.doReturnToCache' no concurrent control,
When the number of connections over the cache size and began to decrease when the 'CachingConnectionFactory.this.active' = false,
the Channel will be physically close.
ChannelManager also has no strict control over Channel declaration and destruction,
so when I get a Channel I can get a cached Channel first, and then the Channel is closed and a new thread gets the same Channel

@garyrussell
Copy link
Contributor

It's not clear what you mean; createBareChannel is only called if there are no cached channels or if a channel proxy in the cache has a target that is closed.

Please explain further.

@freshgeek
Copy link

我详细看了一下代码,发现还是有这个问题。在'CachingConnectionFactory.doCreateBareChannel'
和'CachedChannelInvocationHandler.doReturnToCache'没有并发控制,
当连接数超过缓存大小并且'CachingConnectionFactory.this.active'=false时开始减少,
Channel就会物理关闭。
ChannelManager对Channel的声明和销毁也没有严格的控制,
所以拿到Channel的时候可以先拿到一个缓存的Channel,然后关闭Channel,新线程拿到同一个Channel

@CrackerGit 请问下你后面怎么解决的,是不是这个配置也有问题,如果在多线程环境下运行

@CrackerGit
Copy link
Author

CrackerGit commented Aug 19, 2021

To avoid this problem, I use the 'connectionLimit' property of CachingConnectionFactory to make it the same as 'connectionCacheSize'

But I think it needs to be fixed

@garyrussell
Copy link
Contributor

It is still not clear to me how two threads can get the same channel - I must be missing something - can you elaborate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants