1. 1. 核心工作模型
    1. 1.1. 长连接(虚拟通道)
    2. 1.2. 启动脚本和控制脚本
    3. 1.3. 交换机(Exchange)与消息发送
      1. 1.3.1. Fanout Exchange(扇形交换机)
      2. 1.3.2. Direct Exchange(直连交换机)
      3. 1.3.3. Topic Exchange(主题交换机)
      4. 1.3.4. Headers Exchange(头部交换机)(使用较少)
    4. 1.4. 消息接收
    5. 1.5. 消息的过期时间(TTL)
    6. 1.6. 死信队列(Dead-Letter-Exchange)
      1. 1.6.1. 设置方法
      2. 1.6.2. 消息进入死信交换机的情况
    7. 1.7. 消息的确认
    8. 1.8. 延迟队列
      1. 1.8.1. 数据库定时任务
      2. 1.8.2. JDK延迟队列(DelayedQueue)
      3. 1.8.3. 延迟消息投递
    9. 1.9. 消息的可靠性
      1. 1.9.1. 启用交换机手动确认(生产者->交换机)
        1. 1.9.1.1. 单独实现Callback接口
        2. 1.9.1.2. 在RabbitConfig中直接实现Callback接口
        3. 1.9.1.3. 使用匿名内部类
        4. 1.9.1.4. 使用lambda函数
      2. 1.9.2. 启用队列手动确认(交换机->队列)
        1. 1.9.2.1. 直接实现callback接口
        2. 1.9.2.2. 直接实现接口、匿名内部类、lambda表达式实现方式同上
    10. 1.10. 交换机的持久化和自动删除
    11. 1.11. 备用交换机
    12. 1.12. 队列详细属性
      1. 1.12.1. 自动删除
      2. 1.12.2. 持久化
      3. 1.12.3. 排他队列
      4. 1.12.4. 其他属性
    13. 1.13. 消息可靠性投递
      1. 1.13.1. 生产者确认模式
      2. 1.13.2. return模式
      3. 1.13.3. 消费者手动确认
    14. 1.14. 幂等性及其实现

Rabbit Message Queue

核心工作模型

消息生产者->TCP长连接(虚拟通道)->虚拟主机交换机->消息队列->TCP长连接(虚拟通道)->消息消费者

长连接(虚拟通道)

内含很多通道(Channel),以实现发送效率的提升

启动脚本和控制脚本

设置环境变量之后使用

1
2
rabbitmq-server -detached
# detached为后台静默启动

之后可以使用rabbitmqctl管理mq信息

交换机(Exchange)与消息发送

Fanout Exchange(扇形交换机)

消息会被发送到交换机,交换机会投递到所有绑定的队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//定义交换机
public FanoutExchange fanoutExchange(){
return new FanoutExchange("exchange.fanout");
}
//定义队列
public Queue queueA(){
return new Queue("queue.fanout.a");
}
public Queue queueB(){
return new Queue("queue.fanout.b");
}
//绑定交换机和队列
public Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
public Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
//发送端
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,"",message);
//扇形交换机,key留空即可

Direct Exchange(直连交换机)

根据路由键匹配,传递到相应的消息队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@EnableConfigurationProperties("name")
String exchangeName;
String queueAName;
//定义交换机(建造者模式)
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(exchangeName).build();
}
//定义队列
public Queue queueA(){
return QueueBuilder.durable(queueAName).build();
}
//绑定交换机和队列
public Binding bindingB3(DirectExchange directExchange,Queue queueA){
return BindingBuilder.bind(queueA).to(directExchange).with(key);
}
//发送端
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,Key,message);

Topic Exchange(主题交换机)

类似通配符匹配,#表示多个单词(0个或多个),*表示一个单词(必须有一个),用.对单词和通配符进行分隔
一个信息只会进入一个队列一次,无论有多少条匹配
没有匹配的消息会被丢弃

1
2
3
4
5
//定义交换机和队列同上
//绑定交换机和队列
public Binding bindingB3(TopicExchange topicExchange,Queue queueA){
return BindingBuilder.bind(queueA).to(topicExchange).with(*.qwq.*);
}

Headers Exchange(头部交换机)(使用较少)

基于消息内容的头部进行匹配,暂时没写

消息接收

1
2
3
4
5
6
@RabbitListener(queues={"queue.fanout.a","queue.fanout.b"})//接收这两个队列的消息
public void receiveMsg(Message message){
byte[] body=message.getBody();
String msg=new String(body);
System.out.println(msg);
}

消息的过期时间(TTL)

可以为消息设置过期时间,过期的消息会被丢弃

1
2
3
4
5
//为消息设置过期时间
MessageProperties messageProperties=new MessageProperties();
messageProperties.setExpiration("5000");//单位为毫秒
Message message = MessageBuilder.withBody("This is a message".getBytes()).addProperties(MessageProperties).build();
rabbitTemplate.convertAndSend(Exchange,Key,message);
1
2
3
4
5
6
7
//为队列设置过期时间
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-message-ttl",5000);
//new queue的方式
new Queue(queueName,true,false,false,arguments);
//建造者模式
return QueueBuilder.durable(queueName).withArguments(arguments).build();

如果消息和队列都有过期时间,以较小者为准

死信队列(Dead-Letter-Exchange)

消息在交换机因为各种原因被丢弃后,进入的交换机被称作死信交换机,死信交换机连接的队列叫做死信队列

设置方法

1
2
3
4
5
6
7
8
Map<String,Object> arguments=new HashMap<>();
//方法一:设置arguments
arguments.put("x-dead-letter-exchange",deadExchangeName);
//设置死信路由key,要和死信交换机和死信队列绑定的key一致
arguments.put("x-dead-letter-routing-key",error);
new Queue(queueName,true,false,false,arguments);
//方法二:建造时设置
return QueueBuilder.durable(queueName).deadRoutingKey("error").deadLetterExchange(deadExchangeName).build();

消息进入死信交换机的情况

  • 消息过期
  • 消息进入时没有匹配的key
  • 队列达到长度上限(队列头的消息会被发送到死信交换机)
  • 消费者拒绝信息并不重新投递

消息的确认

1
2
3
listener:
simple:
acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RabbitListener(queues={"queue.fanout.a","queue.fanout.b"})//接收这两个队列的消息
public void receiveMsg(Message message,Channel channel){
MessageProperties messageProperties=message=getMessageProperties();
long deliveryTag=messageProperties.getDeliveryTag();
try{
byte[] body=message.getBody();
String msg=new String(body);
System.out.println(msg);
channel.basicAck(deliveryTag,false);
//参数2为true:批量确认 false:只确认当前
}catch(Exception e){
log.error("出现问题");
//参数3为true:重新放回队列,不进入死信队列,为false:进入死信队列
channel.basicNack(deliveryTag,false,true);
throw new RuntimeException(e);
}

}

延迟队列

假设有一种任务需要定时对现有业务进行扫描

数据库定时任务

优点:简单
缺点:存在延迟,性能较差

如果依靠被动查询才进行任务呢?
优点:服务器压力小
缺点:如果一直没有查询,永远不会停止,且查询时效率较差

JDK延迟队列(DelayedQueue)

优点:实现简单,任务延迟低
缺点

  • 若服务器重启或宕机,数据库会丢失
  • 不适合集群
  • 订单量大可能会爆内存

延迟消息投递

对于RabbitMQ,可以使用TTL结合DLX实现延迟投递,定时任务可以通过监听DLQ来实现定时任务
存在的问题:
如果队头消息过期时间大于下一条消息的过期时间,第二条消息会被第一条消息阻塞

解决方法1:不同过期时间的消息放在不同的队列中
缺点:队列过多,实现复杂

解决方法2:使用延迟消息插件(rabbitmq_delayed_message_exchange)
原理:
消息生产者发送消息之后,消息被发送到延迟交换机(x-delayed-message Exchange)
消息会被存储到Mnesia数据库,检查消息的X-delay信息,时间到了之后才会被投递到队列(队列中不再重复计算TTL)
使用方法如下:

1
2
3
4
5
wget # 地址,去github上找
unzip # 下载下来的源文件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 启用插件
./rabbitmq-plugins list # 查询安装好的所有插件
# 重启
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//建立交换机
public CustomExchange customExchange(){
//CustomExchange(name,type,durable,autoDelete,arguments);
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-delayed-type","direct");//必须放这个
return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);
}
//定义队列
public Queue queueDlx(){
//不需要指定死信交换机和死信队列
return QueueBuilder.durable(queueDlxName).build();
}
//绑定交换机和队列
public Binding bindingB3(CustomExchangecdirectExchange,Queue queueDlx){
//一定要写noargs();
return BindingBuilder.bind(queueDlx).to(customExchange).with("plugin").noargs();
}
//发送消息
public void sendMsg(){
MessageProperties messageProperties=new MessageProperties();
messageProperties.setHeader("x-delay,""5000");
//注意不用setExpiration
Message message = MessageBuilder.withBody("This is a message".getBytes()).addProperties(MessageProperties).build();
rabbitTemplate.convertAndSend(Exchange,Key,message);
}
//接收消息的方法,相较于之前,不用改动

消息的可靠性

消息的发送可以分为四个过程

  1. 消息从生产者被投递到交换机
  2. 消息从交换机被投递到队列
  3. 消息在队列中被正常的存储(持久化,宕机后稳定性)
  4. 消息从队列被发送到消费者

启用交换机手动确认(生产者->交换机)

单独实现Callback接口

  1. 在yml中开启手动确认模式
    1
    spring.rabbitmq.publisher-confirm-type=correlated
  2. 写一个类实现implements RabbitTemplate.ConfitmCallback 用于判断成功和失败的ack,如果ack为false,则对消息进行重新发送或记录日志等
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    class confirmClass implements RabbitTemplate.ConfitmCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack,String cause){
    if(ack){//消息到达交换机
    log.info("到达交换机");
    return;
    }else{
    log.error("未到达交换机,原因为"+cause);
    return;
    }
    }
    }
    3.在发送消息处进行如下更改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    class MessageService{
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Resource
    private ConfirmClass confirmClass;

    @PostConstrust//构造后执行
    public void init(){
    rabbitTemplate.setConfirmCallback(confirmClass);
    }

    public void sendMsg(){
    CorrelationData correlationData=new CorrelationData();
    correlationData.setId("123456");
    Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
    rabbitTemplate.convertAndSend(Exchange,"",message,correlationData);
    //参数4为设置回调接口,设置之后,当交换机收到消息时会自动回调实现了RabbitTemplate.ConfitmCallback的类中的confirm方法
    }
    }

在RabbitConfig中直接实现Callback接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class MessageService implements RabbitTemplate.ConfitmCallback{
@Resource
private RabbitTemplate rabbitTemplate;

@PostConstrust//构造后执行
public void init(){
rabbitTemplate.setConfirmCallback(this);
//这里直接使用this指针就可以
}
@Override
public void confirm(CorrelationData correlationData, boolean ack,String cause){
if(ack){//消息到达交换机
log.info("到达交换机");
return;
}else{
log.error("未到达交换机,原因为"+cause);
return;
}
}
public void sendMsg(){
CorrelationData correlationData=new CorrelationData();
correlationData.setId("123456");
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,"",message,correlationData);
}
}

使用匿名内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MessageService{
@Resource
private RabbitTemplate rabbitTemplate;

@PostConstrust//构造后执行
public void init(){
rabbitTemplate.setConfirmCallback(
new RabbitTemplate.ConfitmCallback(){//实现一个匿名内部类
@Override
public void confirm(CorrelationData correlationData, boolean ack,String cause){
if(ack){//消息到达交换机
log.info("到达交换机");
return;
}else{
log.error("未到达交换机,原因为"+cause);
return;
}
}
}
);
}

public void sendMsg(){
CorrelationData correlationData=new CorrelationData();
correlationData.setId("123456");
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,"",message,correlationData);
}
}

使用lambda函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MessageService{
@Resource
private RabbitTemplate rabbitTemplate;

@PostConstrust//构造后执行
public void init(){
rabbitTemplate.setConfirmCallback(
//因为原来的RabbitTemplate.ConfitmCallback接口只有一个方法,是一个函数式接口,所以可以使用lambda表达式,java会自动将这个lambda表达式视为confirm函数
(correlationData,ack,cause)->{
if(ack){//消息到达交换机
log.info("到达交换机");
return;
}else{
log.error("未到达交换机,原因为"+cause);
return;
}
}
);
}

public void sendMsg(){
CorrelationData correlationData=new CorrelationData();
correlationData.setId("123456");
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,"",message,correlationData);
}
}

启用队列手动确认(交换机->队列)

在yml中开启手动确认模式

1
spring.rabbitmq.publisher-returns=true

直接实现callback接口

1
2
3
4
5
6
7
class ReturnClass implements RabbitTemplate.ReturnsCallback{
@Override
public void returnedMessage(ReturnedMessage returnedMessage){
//如果该函数被回调,一定没有到达队列
log.info("消息没有被投递到队列,原因为:"+returnedMessage.getReplyText());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MessageService{
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private ReturnClass returnClass;

@PostConstrust//构造后执行
public void init(){
rabbitTemplate.setReturnsCallback(returnClass);
}

public void sendMsg(){
CorrelationData correlationData=new CorrelationData();
correlationData.setId("123456");
Message message = MessageBuilder.withBody("This is a message".getBytes()).build();
rabbitTemplate.convertAndSend(Exchange,"",message,correlationData);
//参数4为设置回调接口,设置之后,当交换机收到消息时会自动回调实现了RabbitTemplate.ConfitmCallback的类中的confirm方法
}
}

直接实现接口、匿名内部类、lambda表达式实现方式同上

交换机的持久化和自动删除

交换机参数:

  • Name:交换机名称
  • Type 交换机类型
  • Durability 持久化,代表交换机在服务器重启之后是否还存在
  • Auto Delete 是否自动删除,绑定到该交换机的队列数量为0时,交换机会被自动删除
  • Internal 内部使用交换机,如果是yes,客户端无法直接访问该交换机,消息只能来源于其他交换机

备用交换机

可以在绑定交换机时设置一个备用交换机和备用队列,如果主交换机宕机,备用交换机可以替代接收消息,备用队列同理,消费者可以同时监听两个队列

1
2
3
4
//新建备用交换机和备用队列的方式和普通交换机一致
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(exchangeName).alternate(alternateExchange).build();
}

队列详细属性

1
new Queue(queueName,是否持久化,是否排他,是否自动删除,其他属性)

自动删除

如果没有消费者监听该队列则自动删除(第一次初始化后除外)

持久化

消息是否会被保存到硬盘中,即关机重启后消息是否仍然存在

排他队列

只对首次声明它的连接可见,并且会在连接断开时自动删除

其他属性

如死信交换机等其他属性

名字 参数 解释
x-overflow reject-publish等 丢弃后行为(抛弃队头/不入队/不入队-死信)
x-max-length int 队列长度
x-single-active-consumer bool 只允许有一个消费者
x-max-priority int 设置队列允许的优先级范围

消息可靠性投递

保证消息投递的每一个环节都成功

生产者确认模式

1
rabbitmq:publisher-confirm-type: correlated
1
2
3
4
5
6
7
8
9
10
11
12
//在init方法内
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(
(correlationData,ack,cause)->{
if(!ack){
log.error("消息未到达,原因:"+cause);
//TODO 重新发送消息或其他操作
}
}
)
}

return模式

消息在没有正确投递时返回生产者

1
rabbitmq:publisher-returns: true
1
2
3
4
5
6
rabbitTemplate.setReturnsCallback(
returnMessage->{
log.error("消息没有从交换机正确的投递到队列");
//TODO 重新发送消息或其他操作
}
)

消费者手动确认

1
rabbitmq:listener:simple:acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import con.rabbitmq.client;

@Slf4j
@Component
public class ReceiveMessage{
@RabbitListener(queues={"queueA"})
public void receiveMsg(Message message, Channel channel){
try{
log.info("接收到了"+new String(message.getBody()));
//TODO 消息及其影响
//获取消息唯一标识
long deliveryTag=message.getMessageProperties().getDeliveryTag();
//确认并只确认第一条(第二个参数的false)
channel.bacisAck(deliveryTag,false);
}catch(Exception e){
log.error("消息出现问题");
//不确定,只处理一条,重新入队
channel.basicNack(deliceryTag,false,true);
e.prinkstack();
}
}
}

幂等性及其实现

对于同一资源,多次发出请求,对该资源造成的影响应该是相同的,即避免重复消费问题