rabbit 消费线程死锁
版本
- spring-amqp.version=1.7.7.RELEASE
- spring.boot.version=1.5.13.RELEASE
死锁原因
- 根据堆栈信息,定位到代码,死锁的线程是去获取channel连接,没有超时时间,此处没有拿带replay
- 上层调用,是synchronized,所以上层方法阻塞了其他线程
- 死锁同时还出现:Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
问题排查
Blocked count 0
Blocked time -1
Waited count 1
Waited time -1
Lock name com.rabbitmq.utility.BlockingValueOrException@f4173fd
Lock owner id -1
Lock owner name
java.lang.Object.wait(Object.java:-2) native
java.lang.Object.wait(Object.java:502)
com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:49)
com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:91)
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32)
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366)
com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229)
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117)
com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:132)
com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:533)
org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:56)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1154)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1143)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:547)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:530)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:500)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:482)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1400(CachingConnectionFactory.java:97)
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1159)
org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1441)
org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417)
org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393)
org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:350)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1209)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
获取不到锁的tread 堆栈信息
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1198)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470)
java.lang.Thread.run(Thread.java:748)
解决方案
- 目前rabbitmq 配置的默认线程池,线程池是可以自动缩融合扩容,猜测是thread缩容、扩容过程中出现死锁的情况
- max-concurrency=concurrency=10 ,prefetch = 1这样,线程就不会出现缩容、扩容
- 这样配置是否合理?从两个角度来说明:mq有两个作用,异步,缓冲。假如出现性能问题,横向扩容会比增加线程并发更可行。
- 假如队列中堆积了很多消息。配置过多线程,并发线程数设置很大,一般情况下容器都是虚拟cpu ,所以真正可以并行处理的线程就是cpu个数,线程间需要切换,浪费了资源
- mq在秒杀或者并发中,可以当缓存(如景点超过最大承载人数,可以通过排队,为什么不直接增加窗口来增加售票,原因是景区承载有限,很有可能爆满,出现践踏等事故)
- 最近观察,再无出现此情况。google相关资料,甚少
rabbit mq 重复ack
错误信息
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=120)
- tag 是针对消息设置的一个标志,自增,确认后减少
- 出现这个情况是tag 已经被确认过了,又被确认过一次。
- 由于目前mq消费是manual ack 异常封装会basicAck,但是onError 方法又做了一次ackRequeue,导致出现该错误,消息重复增加的情况
解决方案
- 删除requeue
- 出现未知异常,人工介入处理,不做requeue
rabbitMq message ttl 不被消费
现象
- 生产环境将异常处理消息,或者需要重试消息,重新投递到DLX 私信队列中,超过message的TTL 自动路由到原有队列
- 目前消息ttl时间已到,但是一直没有被重新推送到队列里
- 排查了历史所有被消费的信息,message-create 时间 + message-expire 已经远远超过了当前时间,消费被延后
- 目前重试次数和mq message ttl 有线性关系,所以TTL大的消息一直占用队列head节点
- 查询官方文档后rabbitMq ttl注意,官方有一个注意点,message-TTL 方式,mq只会关注head message 是否到期,所以出现ttl时间已到,但是没有被执行
解决方案
- 使用queue ttl 方式,这样ttl 时间都是一样的,不会出现ttl时间到,但是未被执行,这样的话,重试规则不可变,不够灵活