Rabbit Message Queue
核心工作模型 消息生产者->TCP长连接(虚拟通道)->虚拟主机交换机->消息队列->TCP长连接(虚拟通道)->消息消费者
长连接(虚拟通道) 内含很多通道(Channel),以实现发送效率的提升
启动脚本和控制脚本 设置环境变量之后使用
1 2 rabbitmq-server -detached # detached为后台静默启动
之后可以使用rabbitmqctl管理mq信息
交换机(Exchange)与消息发送 Fanout Exchange(扇形交换机) 消息会被发送到交换机,交换机会投递到所有绑定的队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public FanoutExchange fanoutExchange () { return new FanoutExchange ("exchange.fanout" ); } public Queue queueA () { return new Queue ("queue.fanout.a" ); } public Queue queueB () { return new Queue ("queue.fanout.b" ); } public Binding bindingA (FanoutExchange fanoutExchange,Queue queueA) { return BindingBuilder.bind(queueA).to(fanoutExchange); } public Binding bindingB (FanoutExchange fanoutExchange,Queue queueB) { return BindingBuilder.bind(queueB).to(fanoutExchange); } Message message = MessageBuilder.withBody("This is a message" .getBytes()).build();rabbitTemplate.convertAndSend(Exchange,"" ,message);
Direct Exchange(直连交换机) 根据路由键匹配,传递到相应的消息队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @EnableConfigurationProperties("name") String exchangeName; String queueAName; public DirectExchange directExchange () { return ExchangeBuilder.directExchange(exchangeName).build(); } public Queue queueA () { return QueueBuilder.durable(queueAName).build(); } public Binding bindingB3 (DirectExchange directExchange,Queue queueA) { return BindingBuilder.bind(queueA).to(directExchange).with(key); } Message message = MessageBuilder.withBody("This is a message" .getBytes()).build();rabbitTemplate.convertAndSend(Exchange,Key,message);
Topic Exchange(主题交换机) 类似通配符匹配,#表示多个单词(0个或多个),*表示一个单词(必须有一个),用.对单词和通配符进行分隔 一个信息只会进入一个队列一次,无论有多少条匹配 没有匹配的消息会被丢弃
1 2 3 4 5 public Binding bindingB3 (TopicExchange topicExchange,Queue queueA) { return BindingBuilder.bind(queueA).to(topicExchange).with(*.qwq.*); }
基于消息内容的头部进行匹配,暂时没写
消息接收 1 2 3 4 5 6 @RabbitListener(queues={"queue.fanout.a","queue.fanout.b"}) public void receiveMsg (Message message) { byte [] body=message.getBody(); String msg=new String (body); System.out.println(msg); }
消息的过期时间(TTL) 可以为消息设置过期时间,过期的消息会被丢弃
1 2 3 4 5 MessageProperties messageProperties=new MessageProperties (); messageProperties.setExpiration("5000" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).addProperties(MessageProperties).build();rabbitTemplate.convertAndSend(Exchange,Key,message);
1 2 3 4 5 6 7 Map<String,Object> arguments=new HashMap <>(); arguments.put("x-message-ttl" ,5000 ); new Queue (queueName,true ,false ,false ,arguments);return QueueBuilder.durable(queueName).withArguments(arguments).build();
如果消息和队列都有过期时间,以较小者为准
死信队列(Dead-Letter-Exchange) 消息在交换机因为各种原因被丢弃后,进入的交换机被称作死信交换机,死信交换机连接的队列叫做死信队列
设置方法 1 2 3 4 5 6 7 8 Map<String,Object> arguments=new HashMap <>(); arguments.put("x-dead-letter-exchange" ,deadExchangeName); arguments.put("x-dead-letter-routing-key" ,error); new Queue (queueName,true ,false ,false ,arguments);return QueueBuilder.durable(queueName).deadRoutingKey("error" ).deadLetterExchange(deadExchangeName).build();
消息进入死信交换机的情况
消息过期
消息进入时没有匹配的key
队列达到长度上限(队列头的消息会被发送到死信交换机)
消费者拒绝信息并不重新投递
消息的确认 1 2 3 listener: simple: acknowledge-mode: manual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RabbitListener(queues={"queue.fanout.a","queue.fanout.b"}) public void receiveMsg (Message message,Channel channel) { MessageProperties messageProperties=message=getMessageProperties(); long deliveryTag=messageProperties.getDeliveryTag(); try { byte [] body=message.getBody(); String msg=new String (body); System.out.println(msg); channel.basicAck(deliveryTag,false ); }catch (Exception e){ log.error("出现问题" ); channel.basicNack(deliveryTag,false ,true ); throw new RuntimeException (e); } }
延迟队列 假设有一种任务需要定时对现有业务进行扫描
数据库定时任务 优点:简单 缺点:存在延迟,性能较差
如果依靠被动查询才进行任务呢? 优点:服务器压力小 缺点:如果一直没有查询,永远不会停止,且查询时效率较差
JDK延迟队列(DelayedQueue) 优点:实现简单,任务延迟低 缺点
若服务器重启或宕机,数据库会丢失
不适合集群
订单量大可能会爆内存
延迟消息投递 对于RabbitMQ,可以使用TTL结合DLX实现延迟投递,定时任务可以通过监听DLQ来实现定时任务 存在的问题: 如果队头消息过期时间大于下一条消息的过期时间,第二条消息会被第一条消息阻塞
解决方法1:不同过期时间的消息放在不同的队列中 缺点:队列过多,实现复杂
解决方法2:使用延迟消息插件(rabbitmq_delayed_message_exchange) 原理: 消息生产者发送消息之后,消息被发送到延迟交换机(x-delayed-message Exchange) 消息会被存储到Mnesia数据库,检查消息的X-delay信息,时间到了之后才会被投递到队列(队列中不再重复计算TTL) 使用方法如下:
1 2 3 4 5 wget # 地址,去github上找 unzip # 下载下来的源文件 ./rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 启用插件 ./rabbitmq-plugins list # 查询安装好的所有插件 # 重启
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public CustomExchange customExchange () { Map<String,Object> arguments=new HashMap <>(); arguments.put("x-delayed-type" ,"direct" ); return new CustomExchange (exchangeName,"x-delayed-message" ,true ,false ,arguments); } public Queue queueDlx () { return QueueBuilder.durable(queueDlxName).build(); } public Binding bindingB3 (CustomExchangecdirectExchange,Queue queueDlx) { return BindingBuilder.bind(queueDlx).to(customExchange).with("plugin" ).noargs(); } public void sendMsg () { MessageProperties messageProperties=new MessageProperties (); messageProperties.setHeader("x-delay," "5000" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).addProperties(MessageProperties).build(); rabbitTemplate.convertAndSend(Exchange,Key,message); }
消息的可靠性 消息的发送可以分为四个过程
消息从生产者被投递到交换机
消息从交换机被投递到队列
消息在队列中被正常的存储(持久化,宕机后稳定性)
消息从队列被发送到消费者
启用交换机手动确认(生产者->交换机) 单独实现Callback接口
在yml中开启手动确认模式1 spring.rabbitmq.publisher-confirm-type=correlated
写一个类实现implements RabbitTemplate.ConfitmCallback 用于判断成功和失败的ack,如果ack为false,则对消息进行重新发送或记录日志等1 2 3 4 5 6 7 8 9 10 11 12 class confirmClass implements RabbitTemplate .ConfitmCallback{ @Override public void confirm (CorrelationData correlationData, boolean ack,String cause) { if (ack){ log.info("到达交换机" ); return ; }else { log.error("未到达交换机,原因为" +cause); return ; } } }
3.在发送消息处进行如下更改1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private ConfirmClass confirmClass; @PostConstrust public void init () { rabbitTemplate.setConfirmCallback(confirmClass); } public void sendMsg () { CorrelationData correlationData=new CorrelationData (); correlationData.setId("123456" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).build(); rabbitTemplate.convertAndSend(Exchange,"" ,message,correlationData); } }
在RabbitConfig中直接实现Callback接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class MessageService implements RabbitTemplate .ConfitmCallback{ @Resource private RabbitTemplate rabbitTemplate; @PostConstrust public void init () { rabbitTemplate.setConfirmCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack,String cause) { if (ack){ log.info("到达交换机" ); return ; }else { log.error("未到达交换机,原因为" +cause); return ; } } public void sendMsg () { CorrelationData correlationData=new CorrelationData (); correlationData.setId("123456" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).build(); rabbitTemplate.convertAndSend(Exchange,"" ,message,correlationData); } }
使用匿名内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class MessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstrust public void init () { rabbitTemplate.setConfirmCallback( new RabbitTemplate .ConfitmCallback(){ @Override public void confirm (CorrelationData correlationData, boolean ack,String cause) { if (ack){ log.info("到达交换机" ); return ; }else { log.error("未到达交换机,原因为" +cause); return ; } } } ); } public void sendMsg () { CorrelationData correlationData=new CorrelationData (); correlationData.setId("123456" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).build(); rabbitTemplate.convertAndSend(Exchange,"" ,message,correlationData); } }
使用lambda函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class MessageService { @Resource private RabbitTemplate rabbitTemplate; @PostConstrust public void init () { rabbitTemplate.setConfirmCallback( (correlationData,ack,cause)->{ if (ack){ log.info("到达交换机" ); return ; }else { log.error("未到达交换机,原因为" +cause); return ; } } ); } public void sendMsg () { CorrelationData correlationData=new CorrelationData (); correlationData.setId("123456" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).build(); rabbitTemplate.convertAndSend(Exchange,"" ,message,correlationData); } }
启用队列手动确认(交换机->队列) 在yml中开启手动确认模式
1 spring.rabbitmq.publisher-returns=true
直接实现callback接口 1 2 3 4 5 6 7 class ReturnClass implements RabbitTemplate .ReturnsCallback{ @Override public void returnedMessage (ReturnedMessage returnedMessage) { log.info("消息没有被投递到队列,原因为:" +returnedMessage.getReplyText()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MessageService { @Resource private RabbitTemplate rabbitTemplate; @Resource private ReturnClass returnClass; @PostConstrust public void init () { rabbitTemplate.setReturnsCallback(returnClass); } public void sendMsg () { CorrelationData correlationData=new CorrelationData (); correlationData.setId("123456" ); Message message = MessageBuilder.withBody("This is a message" .getBytes()).build(); rabbitTemplate.convertAndSend(Exchange,"" ,message,correlationData); } }
直接实现接口、匿名内部类、lambda表达式实现方式同上 交换机的持久化和自动删除 交换机参数:
Name:交换机名称
Type 交换机类型
Durability 持久化,代表交换机在服务器重启之后是否还存在
Auto Delete 是否自动删除,绑定到该交换机的队列数量为0时,交换机会被自动删除
Internal 内部使用交换机,如果是yes,客户端无法直接访问该交换机,消息只能来源于其他交换机
备用交换机 可以在绑定交换机时设置一个备用交换机和备用队列,如果主交换机宕机,备用交换机可以替代接收消息,备用队列同理,消费者可以同时监听两个队列
1 2 3 4 public DirectExchange directExchange () { return ExchangeBuilder.directExchange(exchangeName).alternate(alternateExchange).build(); }
队列详细属性 1 new Queue (queueName,是否持久化,是否排他,是否自动删除,其他属性)
自动删除 如果没有消费者监听该队列则自动删除(第一次初始化后除外)
持久化 消息是否会被保存到硬盘中,即关机重启后消息是否仍然存在
排他队列 只对首次声明它的连接可见,并且会在连接断开时自动删除
其他属性 如死信交换机等其他属性
名字
参数
解释
x-overflow
reject-publish等
丢弃后行为(抛弃队头/不入队/不入队-死信)
x-max-length
int
队列长度
x-single-active-consumer
bool
只允许有一个消费者
x-max-priority
int
设置队列允许的优先级范围
消息可靠性投递 保证消息投递的每一个环节都成功
生产者确认模式 1 rabbitmq:publisher-confirm-type: correlated
1 2 3 4 5 6 7 8 9 10 11 12 @PostConstruct public void init () { rabbitTemplate.setConfirmCallback( (correlationData,ack,cause)->{ if (!ack){ log.error("消息未到达,原因:" +cause); } } ) }
return模式 消息在没有正确投递时返回生产者
1 rabbitmq:publisher-returns: true
1 2 3 4 5 6 rabbitTemplate.setReturnsCallback( returnMessage->{ log.error("消息没有从交换机正确的投递到队列" ); } )
消费者手动确认 1 rabbitmq:listener:simple:acknowledge-mode: manual
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import con.rabbitmq.client;@Slf4j @Component public class ReceiveMessage { @RabbitListener(queues={"queueA"}) public void receiveMsg (Message message, Channel channel) { try { log.info("接收到了" +new String (message.getBody())); long deliveryTag=message.getMessageProperties().getDeliveryTag(); channel.bacisAck(deliveryTag,false ); }catch (Exception e){ log.error("消息出现问题" ); channel.basicNack(deliceryTag,false ,true ); e.prinkstack(); } } }
幂等性及其实现 对于同一资源,多次发出请求,对该资源造成的影响应该是相同的,即避免重复消费问题