左璞凡的博客

日出之美便在于它脱胎于最深的黑暗

0%

RabbitMQ

[RabbitMQ]

基本概念

MQ概述

全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

image-20231018192602623

  • MQ,消息队列,储存消息的中间件
  • 分布式系统通信的两种方式:直接 远程调用 和 借助第三方完成间接通信
  • 发送方称为生产者,接收方称为消费者

MQ的优势

应用解耦

image-20231018192817077

使用MQ使得应用间解耦,提升容错性和可维护性

异步提速

image-20231018192844548

提升用户体验和系统吞吐量(单位时间内处理请求的数目)

削峰填谷

image-20231018192904065

MQ的劣势

  • 可用性降低
  • 复杂度提高
  • 一致性

使用条件

  1. 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空。
  2. 容许短暂的不一致性
  3. 确实起到了效果。收益大于管理。

RabbitMQ简介

AMQP,高级消息队列,是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间键设计。

image-20231018193111404

相关概念

image-20231018193143382

快速入门

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列
queueDeclare();
/*
参数:
1.queue 队列名称
2.durable 是否持久化
3.exclusive:
是否独占.只有一个消费者能够监听
当connection关闭时,是否删除队列
4.autoDelete 是否自动删除,当没有Consumer时自动删除
5.arguments 删除参数
*/
//如果有相同名字的则不创建
channel.queueDeclare("Hello_world",true,false,false,null);
//发送消息
/*
basicPublish
参数:
1.exchange 交换机名称 简单默认自动
2.routingKey 路由名称
3.props 配置信息
4.body 发送消息数据
*/
String body = "hello~";
channel.basicPublish("","Hello_world",null,body.getBytes());
//释放资源
channel.close();
connection.close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列
queueDeclare();
/*
参数:
1.queue 队列名称
2.durable 是否持久化
3.exclusive:
是否独占.只有一个消费者能够监听
当connection关闭时,是否删除队列
4.autoDelete 是否自动删除,当没有Consumer时自动删除
5.arguments 删除参数
*/
//如果有相同名字的则不创建
channel.queueDeclare("Hello_world",true,false,false,null);
//接收消息
/*
basicComsume
参数:
1.queue 队列名称
2.autoAck 是否自动确认
3.callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
回调方法,收到消息后,自动执行
...
}
//不要关闭

RabbitMQ工作模式

Work queues工作队列模式

image-20231018193257384

  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争关系
  2. Workqueues对于任务过重或任务较多情况使用工作队列可以提高任务处理速度

Pub/Sub订阅模式

image-20231018193337641

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
/*
参数
1.exchange 交换机名称
2.type 交换机类型
DIRECT 定向
FANOUT 扇形
TOPIC 通配符
HEADERS 参数匹配
3.durable 是否持久化
4.autoDelete 自动删除
5.internal 内部使用
6.argumrnts 参数
*/
//创建交换机
String exchangesName = "test_fanout";
channel.exchangeDeclare(exchangesName...);
//创建队列
channel.queueDeclare(...);
//绑定队列和交换机
channel.queueBind("队列名称1","交换机名称","路由键");
channel.queueBind("队列名称2","交换机名称","路由键");
//发送消息
channel.basicPublish(exchangeName,"",null,body.getbytes());
//释放资源
channel.close();
connection.close();

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
/*
参数
1.exchange 交换机名称
2.type 交换机类型
DIRECT 定向
FANOUT 扇形
TOPIC 通配符
HEADERS 参数匹配
3.durable 是否持久化
4.autoDelete 自动删除
5.internal 内部使用
6.argumrnts 参数
*/
//创建交换机
String exchangesName = "test_fanout";
channel.exchangeDeclare(exchangesName...);
//创建队列
channel.queueDeclare(...);
//绑定队列和交换机
channel.queueBind("队列名称1","交换机名称","路由键");
channel.queueBind("队列名称2","交换机名称","路由键");
//发送消息
channel.basicPublish(exchangeName,"",null,body.getbytes());
//释放资源
channel.close();
connection.close();

Routing路由模式

image-20231018193438681

Routing 模式要求队列在绑定交换机时要指定routing key,消息会自动传发到符合的队列

Topics通配符模式

路径变成通配符

SpringBoot工整合RabbitMQ

生产者

  1. 引入依赖
1
2
3
4
5
6
7
8
9
10
11
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>xxx</version>
</parent>
<dependencier>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencier>
  1. RabbitMQConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//交换机
@Bean("bootExchange")
public Exchange boot Exchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}

//队列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}

//队列和交换机绑定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue,@Qualifier(bootExchange)Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(boot.#).noargs();
}
}

消费者

  1. RabbitMQListener
1
2
3
4
5
6
7
@Component
public class RabbitMQListener {
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message) {
sout(message);
}
}

RabbitMQ高级

RabbitMQ高级特性

  • 消息可靠性投递
  • Consumer ACK
  • 消费端限流
  • TTL
  • 死信队列
  • 延迟队列
  • 日志与监控
  • 消息可靠性分析与追踪
  • 管理

消息的可靠投递

RabbitMQ整个消息投递的路径为

producer -> rabbitmq broker -> exchange -> queue -> consumer

RabbitMQ提供两种方式来控制消息的投递可靠性模式

  • confirm 确认模式

    消息从preducer到exchange则会返回一个confirmCallback

  • return 返回模式

    消息exchange到queue投递失败返回returnCallback

开启确认模式

  1. 确认模式开启:ConnectionFactory中开启publisher-confirm=“true”
  2. 在rabbitTemplate定义回调函数

开启回退模式

  1. 开启回退模式
  2. 设置ReturncallBack
  3. 设置Exchange处理消息的模式

image-20231018193829188

Consumer ACK

ack指acknowledge

image-20231018193847490

确认方式

  • 自动确认
  • 手动确认
  • 根据异常情况确认

Consumer ACK机制

  1. 设置手动签收
  2. 让监听器实现ChannelAwareMessageListener接口
  3. 如果消息成功处理,则调用channel的basicAck()签收
  4. 否则拒绝签收,broker重新发送给Consumer

image-20231018193912665

消费端限流

image-20231018193944109

限流机制

  1. 确保ack机制为手动确认
  2. listener-contaioner配置属性

TTL

image-20231018194009964

死信队列

image-20231018194054888

称为死信队列

  1. 队列消息长度到达限制
  2. 消费者拒收消费消息
  3. 原队列存在消息过期设置,到达超时时间未被消费

延迟队列

image-20231018194124945

在RabbitMQ中没有延迟队列功能

但可以用TTL+死信队列组合实现延迟队列效果

日志与监控

默认存放路径: /var/log/rabbitmq/rabbit@xx.log

rabbitmqctl管理和监控

image-20231018194215098

消息追踪

image-20231018194235598

另一种方式:rabbitmq_tracing

image-20231018194253662

RabbitMQ应用问题

  • 消息可靠性保障
  • 消息幂等型处理

消息可靠性保障-消息补偿

image-20231018194334963

息幂等型处理

image-20231018194351546

消息可靠性保障-消息补偿

image-20231018194407195

RabbitMQ集群搭建

  • 高可用集群搭建

集群搭建原理

使用Haproxy