rabbitQM死锁及常见问题

rabbit 消费线程死锁

版本

  • spring-amqp.version=1.7.7.RELEASE
  • spring.boot.version=1.5.13.RELEASE

死锁原因

  • 根据堆栈信息,定位到代码,死锁的线程是去获取channel连接,没有超时时间,此处没有拿带replay
  • 上层调用,是synchronized,所以上层方法阻塞了其他线程
  • 死锁同时还出现:Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?

问题排查

Blocked count     0
Blocked time     -1
Waited count     1
Waited time     -1
Lock name     com.rabbitmq.utility.BlockingValueOrException@f4173fd
Lock owner id     -1
Lock owner name     

java.lang.Object.wait(Object.java:-2) native
java.lang.Object.wait(Object.java:502) 
com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:49) 
com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:91) 
com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) 
com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) 
com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) 
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) 
com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:132) 
com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176) 
com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:533) 
org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:56) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1154) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1143) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:547) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:530) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:500) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:482) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1400(CachingConnectionFactory.java:97) 
org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1159) 
org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1441) 
org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1417) 
org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1393) 
org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:350) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1209) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) 

获取不到锁的tread 堆栈信息

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.redeclareElementsIfNecessary(SimpleMessageListenerContainer.java:1198) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1470) 
java.lang.Thread.run(Thread.java:748) 

解决方案

  • 目前rabbitmq 配置的默认线程池,线程池是可以自动缩融合扩容,猜测是thread缩容、扩容过程中出现死锁的情况
  • max-concurrency=concurrency=10 ,prefetch = 1这样,线程就不会出现缩容、扩容
  • 这样配置是否合理?从两个角度来说明:mq有两个作用,异步,缓冲。假如出现性能问题,横向扩容会比增加线程并发更可行。
  • 假如队列中堆积了很多消息。配置过多线程,并发线程数设置很大,一般情况下容器都是虚拟cpu ,所以真正可以并行处理的线程就是cpu个数,线程间需要切换,浪费了资源
  • mq在秒杀或者并发中,可以当缓存(如景点超过最大承载人数,可以通过排队,为什么不直接增加窗口来增加售票,原因是景区承载有限,很有可能爆满,出现践踏等事故)
  • 最近观察,再无出现此情况。google相关资料,甚少

rabbit mq 重复ack

错误信息

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 2, class-id=60, method-id=120)
  • tag 是针对消息设置的一个标志,自增,确认后减少
  • 出现这个情况是tag 已经被确认过了,又被确认过一次。
  • 由于目前mq消费是manual ack 异常封装会basicAck,但是onError 方法又做了一次ackRequeue,导致出现该错误,消息重复增加的情况

解决方案

  • 删除requeue
  • 出现未知异常,人工介入处理,不做requeue

rabbitMq message ttl 不被消费

现象

  1. 生产环境将异常处理消息,或者需要重试消息,重新投递到DLX 私信队列中,超过message的TTL 自动路由到原有队列
  2. 目前消息ttl时间已到,但是一直没有被重新推送到队列里
  3. 排查了历史所有被消费的信息,message-create 时间 + message-expire 已经远远超过了当前时间,消费被延后
  4. 目前重试次数和mq message ttl 有线性关系,所以TTL大的消息一直占用队列head节点
  5. 查询官方文档后rabbitMq ttl注意,官方有一个注意点,message-TTL 方式,mq只会关注head message 是否到期,所以出现ttl时间已到,但是没有被执行

解决方案

  1. 使用queue ttl 方式,这样ttl 时间都是一样的,不会出现ttl时间到,但是未被执行,这样的话,重试规则不可变,不够灵活