跳转至

RabbitMQ

问题记录

服务雪崩?

RabbitMQ/activeMQ/RoketMQ/Kafka

对比是为了技术选型

事件模型,响应模型

学习重点

spring AMQP【基于AMQP协议】

消息服务的适用场景

适合用通知??

1初识MQ

能支持哪几种通信模型??

1.1同步和异步通信

微服务之间有同步和异步两种方式,常见的同步异步通信模型???

1.1.1同步通讯【阻塞】

之前学习Feign调用就属于同步方式,虽然调用可以实时得到结果,

优点:

  • 时效性较强,可以立即得到结果

缺点:

  • 耦合度高

  • 等待响应,才能执行下一步,导致性能变差

  • 性能和吞吐能力下降
  • 不能释放请求占用的资源,高并发的情况下极度浪费系统资源。
  • 有级联失败的问题

1.1.2异步通讯【非阻塞】

异步调用常见的实现是事件驱动模型【】还有哪些???

image-20230619212440085

Broker

在事件模型中吗,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。订单服务和物流服务是事件订阅者(consumer),订阅支付成功的事件,监听到事件后完成自己的业务即可

为了接触事件发布者与订阅者之间的耦合,两者不是直接通信,而是有一个中间人(Broker),发布者发布事件到Broker,不关心谁来订阅事件,订阅者从Broker订阅事件,不关系谁发来的消息。

解决的问题,解除耦合,提醒性能,故障隔离,流量削峰

优缺点

优点:

  • 耦合度低
  • 吞吐量提升
  • 故障隔离
  • 流浪削峰

缺点

依赖于broker的可靠性,安全性,吞吐能力

结构复杂,业务没有明显的流程线,不好追踪管理。

image-20230619143944127

1.2技术对比

MQ,中文就是消息队列(MessageQueue),管理存放消息的队列,也是事件驱动模型的Borker,

比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

社区活跃度

消息队列涉及的协议类型

RocketMQ与Kafka的技术选型?详细准备

集群与可用性???

image-20230619102616624

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

AMQP是什么协议???这几个协议查一查???

追求可用性:Kafka、RocketMQ、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2快速入门

使用最基础的消息模型

2.1RabbitMQ消息队列模型

image-20230619123151626

RabbitMQ中的一些角色

  • publisher:生产者

  • consumer:消费者

  • exchange:交换机,负责 消息的路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchanger、queue、消息的隔离

五种消息模型

不带交换机

  • 基本消息队列(BasicQueue):一个队列挂载一个消费者
  • 工作消息队列(WorkQueue):一个队列挂载两个消费者

带交换机(Publish/Subscribe)

  • Fanout Exchange:广播
  • Direct Exchange:路由
  • Topic Exchange:主题

2.2入门案例

简单队列模式的模型图

image-20230619132435613

最基础的消息队列模型,三个角色

publisher:消息发布者,将消息发送到队列queue

queue:消息队列,负责接收并缓存消息

consumer:订阅队列,处理队列中的消息

2.2.1publisher实现

  • 建立连接
  • 创建channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel
public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.3.130");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
        // 4.发送消息
        String message = "hello, rabbitmq123!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");
        // 5.关闭通道和连接
        channel.close();
        connection.close();
    }
}

2.2.2consumer实现


3SpringAMQP

Advanced Message Queuing Protocol:用于在应用程序之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求。

基于AMQP协议定义的API规范,提供了模板来发送和接收消息,包含两部分,其中Spring-amqp是基础抽象,Spring-amqp是底层的默认实现。

示意流程图

3.1BasicQueue【简单队列模型】

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2WorkQueue【工作队列模型】

挂载多个消费者就是工作队列模型,

差异在于消费者有几个??

image-20230619144514041

思路

3.3发布/订阅

带有交换

上边两种,消息是阅后即焚的。。

发布订阅模式与之前的案例区别允许同一消息发送给多个消费者,实现方式加入交换机

3.4Fanout【广播】

image-20230619150511471

FanoutExchange会将接收到的消息广播到每一个与其绑定的queue。

image-20230619150745900

消息在交换机里的生命周期??一直存在???

总结

交换机的作用:

接收publisher发送的消息

将消息按规则路由到与之绑定的队列

不能缓存消息,路由失败则消息丢失

FanoutExchange会将消息路由到每个绑定的队列

声明队列、交换机、绑定关系的BeanBinding=>BindingBuilder.bind(队列).to(交换机)

3.5Direct【路由】

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes-)

另一种绑定交换机与队列的方式。

精准匹配

3.6Topic【主题】

3.7消息转换器【序列化与反序列化】

生产者与消费者需要使用一样的序列化方式

N附录

N1安装RabbitMQ并启动

加载镜像

docker load -i mq.tar

启动MQ

docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

进去容器查看rabbitmq的配置文件

RabbitMQ的管理界面详解

延迟消息实现

惰性队列

消费者开线程池=》设置线程池

image-20230629162842444

image-20230629163005090

image-20230629163217720

正则表达式