[RabbitMQ]
基本概念
MQ概述
全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

- MQ,消息队列,储存消息的中间件
- 分布式系统通信的两种方式:直接 远程调用 和 借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者
MQ的优势
应用解耦

使用MQ使得应用间解耦,提升容错性和可维护性
异步提速

提升用户体验和系统吞吐量(单位时间内处理请求的数目)
削峰填谷

MQ的劣势
使用条件
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空。
- 容许短暂的不一致性
- 确实起到了效果。收益大于管理。
RabbitMQ简介
AMQP,高级消息队列,是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间键设计。

相关概念

快速入门
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| ConnectionFactory factory() = new ConnectionFactory;
factory.setHost("xxx"); factory.setPort(5672); factory.setVirtualHost("/xxx"); factory.setUsername("xxx"); factory.setPassword("xxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
queueDeclare();
channel.queueDeclare("Hello_world",true,false,false,null);
String body = "hello~"; channel.basicPublish("","Hello_world",null,body.getBytes());
channel.close(); connection.close();
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| ConnectionFactory factory() = new ConnectionFactory;
factory.setHost("xxx"); factory.setPort(5672); factory.setVirtualHost("/xxx"); factory.setUsername("xxx"); factory.setPassword("xxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
queueDeclare();
channel.queueDeclare("Hello_world",true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){ 回调方法,收到消息后,自动执行 ... }
|
RabbitMQ工作模式
Work queues工作队列模式

- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争关系
- Workqueues对于任务过重或任务较多情况使用工作队列可以提高任务处理速度
Pub/Sub订阅模式

生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| ConnectionFactory factory() = new ConnectionFactory;
factory.setHost("xxx"); factory.setPort(5672); factory.setVirtualHost("/xxx"); factory.setUsername("xxx"); factory.setPassword("xxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangesName = "test_fanout"; channel.exchangeDeclare(exchangesName...);
channel.queueDeclare(...);
channel.queueBind("队列名称1","交换机名称","路由键"); channel.queueBind("队列名称2","交换机名称","路由键");
channel.basicPublish(exchangeName,"",null,body.getbytes());
channel.close(); connection.close();
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| ConnectionFactory factory() = new ConnectionFactory;
factory.setHost("xxx"); factory.setPort(5672); factory.setVirtualHost("/xxx"); factory.setUsername("xxx"); factory.setPassword("xxx");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangesName = "test_fanout"; channel.exchangeDeclare(exchangesName...);
channel.queueDeclare(...);
channel.queueBind("队列名称1","交换机名称","路由键"); channel.queueBind("队列名称2","交换机名称","路由键");
channel.basicPublish(exchangeName,"",null,body.getbytes());
channel.close(); connection.close();
|
Routing路由模式

Routing 模式要求队列在绑定交换机时要指定routing key,消息会自动传发到符合的队列
Topics通配符模式
路径变成通配符
SpringBoot工整合RabbitMQ
生产者
- 引入依赖
1 2 3 4 5 6 7 8 9 10 11
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>xxx</version> </parent> <dependencier> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencier>
|
- RabbitMQConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_queue"; @Bean("bootExchange") public Exchange boot Exchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } @Bean public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue,@Qualifier(bootExchange)Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(boot.#).noargs(); } }
|
消费者
- RabbitMQListener
1 2 3 4 5 6 7
| @Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void ListenerQueue(Message message) { sout(message); } }
|
RabbitMQ高级
RabbitMQ高级特性
- 消息可靠性投递
- Consumer ACK
- 消费端限流
- TTL
- 死信队列
- 延迟队列
- 日志与监控
- 消息可靠性分析与追踪
- 管理
消息的可靠投递
RabbitMQ整个消息投递的路径为
producer -> rabbitmq broker -> exchange -> queue -> consumer
RabbitMQ提供两种方式来控制消息的投递可靠性模式
开启确认模式
- 确认模式开启:ConnectionFactory中开启publisher-confirm=“true”
- 在rabbitTemplate定义回调函数
开启回退模式
- 开启回退模式
- 设置ReturncallBack
- 设置Exchange处理消息的模式

Consumer ACK
ack指acknowledge

确认方式
Consumer ACK机制
- 设置手动签收
- 让监听器实现ChannelAwareMessageListener接口
- 如果消息成功处理,则调用channel的basicAck()签收
- 否则拒绝签收,broker重新发送给Consumer

消费端限流

限流机制
- 确保ack机制为手动确认
- listener-contaioner配置属性
TTL

死信队列

称为死信队列
- 队列消息长度到达限制
- 消费者拒收消费消息
- 原队列存在消息过期设置,到达超时时间未被消费
延迟队列

在RabbitMQ中没有延迟队列功能
但可以用TTL+死信队列组合实现延迟队列效果
日志与监控
默认存放路径: /var/log/rabbitmq/rabbit@xx.log
rabbitmqctl管理和监控

消息追踪

另一种方式:rabbitmq_tracing

RabbitMQ应用问题
消息可靠性保障-消息补偿

息幂等型处理

消息可靠性保障-消息补偿

RabbitMQ集群搭建
集群搭建原理
使用Haproxy