1.了解的知识点
1.1 AMQP和JMS
AMQP(Advanced Message Queuing Protocol)高级消息队列
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
1.2 两者的联系与区别
JMS是定义统一接口,对消息操作进行统一;AMQP通过规定协议统一数据交互的格式
JMS限定了必须使用Java语言,AMQP只是协议,不规定实现方式,因此是跨语言的
JMS规定了两种消息模型;而AMQP的消息模型更加丰富
1.3 常见的MQ产品
ActiveMQ:基于JMS
RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
RocketMQ:基于JMS,阿里产品,目前Apache管理
Kafka:分布式消息队列,高吞吐量,处理日志,Scala和Java编写的,Apache
2.RabbitMQ
2.1 业务需求
微服务中对于数据的持久化以及搜索的系统分为两个服务系统,数据持久化系统与搜索服务系统之间数据的同步与一致性问题。
2.2 消息队列介绍
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据机构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
2.2.1 消息队列的作用
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构
1.解耦:一个业务需要多个模块共同实现,或者一条消息有多个系统需要对应处理,只需要主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
2.异步:主业务执行结束后从属业务通过MQ,异步执行,减低业务的响应时间,提高用户体验。
3.削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。
2.2.2 缺点
1、系统可用性降低。依赖服务也多,服务越容易挂掉。需要考虑MQ瘫痪的情况
2、系统复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性
3、业务一致性。主业务和从属业务一致性的处理
2.2.3 RabbitMQ特点
基于AMQP协议的Erlang实现
可靠性(Reliability):使用一些机制来保证可靠性,例如持久化、传输确认、发布确认
灵活的路由(Flwxible Routing):在消息进入队列之前,通过Exchange来路由消息的。
消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker(整体消息平台)
高可用(Highly Available Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列任然可用
多种协议(Multi-Protocol):Rabbit支持多种消息队列协议。
多语言客户端(Many Client):RabbitMQ几乎支持所有常用语言,例如Java、.Net
管理界面: RabbitMQ提供了一个易用用户界面
跟踪机制、支持插件
2.3 相关的概念
组成图如下:
1.Message:消息,消息是不具名的,有消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key组成(路由键)、priority(消息的优先权)、delivery-model(消息可能需要持久性存储)等。
2.Publisher生产者:消息的生产者,也是一个向交换器发布消息的客户端应用程序
3.Exchange:交换器,用来接收生产者发送消息并将这些消息路由给服务器中的队列
4.Queue:消息队列,用来保存消息直到发送给消费者。
5.Connection:网络连接,比如TCP连接
6.Channel:信道,多路复用连接中的一条独立的双向数据流通道,信道是建立在真实TCP连接内地虚拟连接,AMQP命令都是通过信道发送消息的,无论是发布消息,还是订阅队列、接受消息,这些动作都是通过信道完成。
7.Consumer消费者:表示从一个消息队列中获取消息的客户端应用程序
8.Virtural Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个虚拟主机本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定、权限机制。默认的虚拟主机是/
9.Broker:表示消息队列服务器实体
2.4 下载与安装
2.4.1 查看版本对应关系
RabbitMQ版本与Erlang之间的对应关系查看版本对应关系
linux版本与rabbitMQ版本的对比
将下载的rpm安装包上传到linux上,本次使用的是CentOS7
安装命令 rpm -ivh XXX.rpm
2.4.2 安装后台管理插件
rabbitmq-plugins enable rabbitmq_management
2.4.3 重启RabbitMQ
CentOs7启动MQ
systemctl start rabbitmq-server.service
查看启动后的状态
systemctl status rabbitmq-service.service
重新启动
systemctl restart rabbitmq-server.service
停止服务
systemctl stop rabbitmq-server.service
查看进程
ps -ef | grep rabbitmq
2.4.4 测试
首先需要关闭防火墙:
systemctl stop firewalled.service
在Web浏览器中输入http://虚拟机ip:15672/
输入默认的账号密码:guest:guest,guest用户默认不允许远程连接
自定义账号以及设置远程访问
添加管理员账号
rabbitmqctrl add_user admin admin
分配账号的角色
rabbitmqctl set_user_tags admin administrator
修改密码
rabbitmqctl change_password admin root123
查看用户列表
rabbitmqctl list_users
使用admin用户远程访问
粗心大意输错了命令,导致不能远程访问,错误提示为
修改如下
进入界面
2.5 管理界面
标签页
overview:概览
connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消费和生产
channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道
Exchanges:交换机,用来实现消息的路由
Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列
端口:
5672:MQ编程语言客户端连接的端口
15672:MQ管理界面的端口
25672:MQ集群的端口
3.五种消息类型
3.1 基本消息模型
RabbitMQ是一个消息代理:它接受和发送消息。对消息不进行处理,对消息只进行存储和转发二进制数据。
P:生产者,一个发送消息的用户应用程序
C:消费者,主要是用来等待接收消息的用户应用程序
队列:类似于邮箱的概念
简单的测试:访问IP:15672后登录界面后,输入自己的admin,以及密码root123,自己的虚拟主机shop
Java操作RabbitMQ发送消息。
1.类似于连接数据库,抽取出一个连接的工具类
public class ConnectionUtil {
//连接RabbitMQ的工具类,利用ConnectionFactory创建连接对象
public static Connection getConnection(){
try {
Connection connection = null;
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器地址
factory.setHost("192.168.199.159");
//设置端口,代码连接服务器的端口信息5672
factory.setPort(5672);
//设置虚拟主机
factory.setVirtualHost("/shop");
//设置用户名
factory.setUsername("admin");
//设置密码
factory.setPassword("root123");
connection = factory.newConnection();
return connection;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
2.发送端/生产者端
public class Send {
private final static String Queue_Name = "simple_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
//声明队列 不支持扩展 自动删除 相关参数
channel.queueDeclare(Queue_Name,false,false,false,null);
//消息内容
String message = "Hello world";
//向指定的队列中发送消息,基本的消息传递,发送者直接将消息放在消息队列中不需要路由的转发
channel.basicPublish("",Queue_Name,null,message.getBytes());
//
System.out.println("[X] sent '"+message+"'");
//关闭信道和连接
channel.close();
connection.close();
}
}
3.消费者
public class Recv {
private final static String Queue_Name = "simple_queue";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明队列 (对于消费者来说,只需要在队列中获取数据,如果队列存在,拿取数据,如果队列不存在创建队列)
channel.queueDeclare(Queue_Name,false,false,false,null);
//定义队列的消费者,继承消费者接口
DefaultConsumer consumer = new DefaultConsumer(channel){
//获取消息,并且处理这个方法类似事件监听,如果有消息,或被自动调用
//参数为
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("[X] received"+msg+"!");
}
};
//监听队列,第二个参数:是否自动进行消息确认
channel.basicConsume(Queue_Name,true,consumer);
}
}
消息的确认模型:ACK
在数据通信中,接收端发送给发送端一种传输类型的控制字符,表示消息的确认接收,C/S端一般会使用Socket来进行连接,该过程中有一个三次握手的过程,ACK作为发送与接受两端之间的一个确认信息,RabbitMq分为手动确认和自动确认。
上面的消费者采用的是自动消费的方式,下面是一个手动消费
public class Recv {
private final static String Queue_Name = "simple_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Queue_Name,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取消息体
String msg = new String(body);
System.out.println("[X] received"+msg+"!");
//第二个参数,表示是否同时确认多个消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//监听队列,第二个参数:是否自动进行消息确认
// channel.basicConsume(Queue_Name,true,consumer);
//手动ACK,参数为false表示手动确认
channel.basicConsume(Queue_Name,false,consumer);
}
}
3.2 work消息模型
Work模型针对生产者产生的消息过多,消费者消费速度慢,因此需要多个消费者来处理队列中的消息,这个模式下解决了消息积压的问题。
3.3 订阅模型—Fanout
前两种模型,消费者是对同一队列的消息进行消费,订阅模式解决的问题是将同一个消息如何交给不同的消费者进行消费。
Fanout中存在一个问题就是,因为生产者将消息交给路由器进行处理,路由器不能进行消息的存储工作,消息会丢失掉。如果先启动消费者,消费者创建的消息队列需要绑定路由器,则没有路由器时会产生异常。
Fanout模式下的生产者端的程序,需要创建路由,直接执行后,消息丢失,在Exchange中或产生一个新的路由信息
public class FanoutSend {
//fanout模式,会向路由器发送
public static final String EXCHAGE_NAME = "fanout_change";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明exchange,指定类型fanout 指定路由类型
channel.exchangeDeclare(EXCHAGE_NAME,"fanout");
//消息内容
String message = "Hello everyone";
//发布消息到Exchange
channel.basicPublish(EXCHAGE_NAME,"",null,message.getBytes());
System.out.println("[生产者] Sent "+message+"'");
channel.close();
connection.close();
}
}
消费者端程序,多个消费者程序类似,只是对消息的处理不同:
public class FanoutRecv {
private final static String QUEUE_NAME = "fanout_exchange_queue1";
private final static String EXCHANGE_NAME = "fanout_change";
public static void main(String[] args) throws Exception{
//获取到连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
//定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("[消费者]收到消息:"+message+"'");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
总结:多个消费者收到相同的消息,相当于一个电视订阅的过程,客户端对自己订阅的电视节目能够进行观看。
3.4 订阅模型—Direct
消息定向发送,Exchange会对消息进行处理,将消息进行一个匹配的过程,对消息进行不同的队列中去分发。
Direct生产者
public class DirectSend {
//定义路由的名称呢过
private static final String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//获取通道
Channel channel = connection.createChannel();
//创建路由,并确定路由的类型
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//消息内容
String message = "查找商品的消息";
//在绑定数据时指定其向哪一个通道发送消息,key值作为选择,得到相对应的通道
channel.basicPublish(EXCHANGE_NAME,"select",null,message.getBytes());
System.out.println("[商品服务]:Sent"+message+"...");
channel.close();
connection.close();
}
}
Dire模型的消费者端
public class DirectRecv1 {
public static final String QUEUE_NAME="direct_queue1";
//通道二绑定一个名字
//public static final String QUEUE_NAME="direct_queue1";
public static final String EXCHANGE_NAME="direct_exchange_test";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete");
//channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert");
// channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select");
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("[DirectRecv1] Recv "+message+"...");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
3.5 订阅模型—Topic
通配符匹配
‘#’匹配0或多个词(含0个)
‘*’匹配不多不少恰好1个词(不含0个)
例如三个绑定 1.*.orange.*、2.*.*.rabbit、3.lazy.#
发送如下字符:
quick.orange.rabbit 进入的是Q1和Q2
lazy.orange.elephant 进入的是Q1和Q2
quick.orange.fox 进入的是Q1
lazy.pink.rabbit 进入的是Q2
quick.orange.male.rabbit 都得不到,不满足消息结构
orange 不能进入消息队列
了解RPC消息模型:
4.Spring AMQP
SpringBoot与RabbitMQ进行整合
4.1 服务器的相关配置
spring:
rabbitmq:
host: 192.168.199.159
username: admin
password: root123
virtual-host:/shoping
port: 5672
4.2 需要引入的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
4.3 消费者监听
创建一个消费者,也就是一个监听器
@Component
public class Listener {
//使用注解声明队列、是否持久化、绑定路由,选择模式(一般为TOPIC模式)、设置匹配格式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue",durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),key = {"#.#"}))
public void listen(String msg){ //处理消息的业务逻辑程序
System.out.println("接收到的消息是 "+msg);
}
}
@Component注解是spring中的,意思是将该类交给spring管理
@RabbitListener注解是整合包中的注解,用来描述中间件的部分信息
@QueueBinding队列绑定注解
@Queue队列的相关参数注解
@Exchange 交换机/路由的相关注解
4.4 相关注解的源码
RabbitListener注解的源码
public @interface RabbitListener {
String id() default "";
String containerFactory() default "";
String[] queues() default {};
Queue[] queuesToDeclare() default {};
boolean exclusive() default false;
String priority() default "";
String admin() default "";
QueueBinding[] bindings() default {};
String group() default "";
String returnExceptions() default "";
String errorHandler() default "";
String concurrency() default "";
String autoStartup() default "";
String executor() default "";
String ackMode() default "";
String replyPostProcessor() default "";
}
QueueBinding的源码
public @interface QueueBinding {
Queue value();
Exchange exchange();
String[] key() default {};
String ignoreDeclarationExceptions() default "false";
Argument[] arguments() default {};
String declare() default "true";
String[] admins() default {};
}
Queue的源码
public @interface Queue {
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
String durable() default "";
String exclusive() default "";
String autoDelete() default "";
String ignoreDeclarationExceptions() default "false";
Argument[] arguments() default {};
String declare() default "true";
String[] admins() default {};
}
Exchange源码
public @interface Exchange {
String TRUE = "true";
String FALSE = "false";
@AliasFor("name")
String value() default "";
@AliasFor("value")
String name() default "";
String type() default "direct";
String durable() default "true";
String autoDelete() default "false";
String internal() default "false";
String ignoreDeclarationExceptions() default "false";
String delayed() default "false";
Argument[] arguments() default {};
String declare() default "true";
String[] admins() default {};
}
4.5 测试
@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqApplicationTests {
//依赖注入,从spring中获取amqp模板
@Autowired
private AmqpTemplate amqpTemplate;
@Test
void testSend() throws InterruptedException {
//客户端发送的消息
String message = "hello,spring boot amqp";
//模板发送匹配的交换机,以及匹配的形式,以及发送的消息
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b",message);
Thread.sleep(1000);
}
}