rabbitmq
大约 58 分钟
AMQP 协议简介
消息开发
- 传统的应用开发一般都采用的是快速的“请求-响应”处理模式,即:当用户向服务端发出了一个请求之后,那么服务端会立即对该请求进行处理,如图 1 所示。但是当请求并发量较大时,这样的处理模式就有可能造成服务端程序的崩溃,那么此时的做法,往往是在客户端和服务端之间追加一个数据队列,用户的每一次请求都在队列中依次排列而后服务端依照请求的顺序进行业务处理,如图 2 所示。
AMQP 发展历史
- AMQP 最早在 2003 年由 John O'Hara 在摩根大通提出。
- 初始设计方案在 2004 年至 2006 年由摩根大通发布,由 iMatix 公司编写协议文档和一个 C 语言实现。
- 2005 年摩根大通推动了包括思科系统、红帽公司、iMatix、IONA 技术等公司组成了个工作组。摩根大通和红帽公司合作开发了 Apache Qpid,该客户端最初由 Java 编写后转向 C++。Rabbit 技术公司独立用 Erlang 开发了 RabbitMQ(后来被 Pivotal 收购)
- 早先版本的协议包括版本 0-8,2006 年 6 月发布;版本 0-9,2006 年 12 月发布;版本 0-9-1,2008 年 11 月发布。这些版本与后来的 1.0 系列有很大的不同。
- 2011 年 8 月,高级消息队列协议工作组公布其改组方案,作为 OASIS(Organizationfor the Advancement of Structured Information Standards、结构化信息标准促进组织)成员运作。高级消息队列协议 1.0 版本在 2011 年 10 月 30 日发表。该版本在 2014 年四月成为 ISO(International Organization for Standardization、国际标准化组织)/EC(International Electrotechnical Commission、国际电工委员会)国际标准。
AMQP
- AMQP(Advanced Message Queuing Protocol、高级消息队列协议)提供统一消息服务的应用层标准协议,该协议属于一种二进制协议,具有多通道、异步、安全、语言中立以及高效等特点。不管使用何种开发语言,何种实现组件,只要基于此协议的客户端与消息中间件都可以实现消息的传递,从整体来讲 AMQP 协议可以分为三层
AMQP 核心概念
- 在 AMQP 组成协议中数据的发送与接收操作依靠传输层处理,在数据接收时会将二进制数据流交付给会话层进行处理,而程序的使用者主要关注的是型层的概念定义,在型层中一共定义有三个核心的概念模块:
- Exchange(交换机):实现消息生产者的消息接收,而后按照一定的规则将消息发送到指定的消息队列中;
- Queue(消息队列):实现消息的存储,直到该消息已经被安全的投递给了消息消费者;
- Bindings(绑定):定义了 Exchange 与 Queue 之间的关系,并提供消息路由规则的配置
RabbitMQ
- AMQP 只提供了一个协议的标准,在实际使用时需要依据此标准提供服务组件,其中最为常用的组件为 RabbitMQ,该组件是由 RabbitMQTechnologies Ltd 开发并且提供商业支持的。该公司在 2010 年 4 月被 SpringSource(VMWare 的一个部门)收购,并在 2013 年 5 月被并入 Pivotal,开发者可以通过“www.rabbitmg.com”免费获取该组件,本次使用的版本为“RabbitMQ 3.10.0 release”
配置 wxWidgets 组件库
wxWidgets
- wxWidgets 是一个开源的图形组件,基于 C++语言开发。是在 1992 年由 Julian Smart 在爱丁堡大学所开发,该组件最大的特点是其开发出来的图形界面应用,可以在源代码不更改或者少量更改的情况下,在不同的操作系统平台上编译并执行,同时该组件不与任何的语言绑定,这样就可以轻松与 Python、Java、Lua、Perl、Ruby、JavaScript 等语言进行整合由于 RabbitMQ 依赖于 ErLang 组件,而 ErLang 又依赖于 wxWidgets 组件,所以需要首先在当前的系统中进行该组件的配置,该组件的信息可以通过“wxwidgets.org/”获取,组件的下载地址需要托管在 GITHub 中
1、
wxwidgets.org
2、
mkdir -p /usr/local/wxWidgets
3、
dnf -y install gtk3-devel mesa-libGL-devel mesa-libGLU-devel mesa* freeglut*
4、
tar -jxvf /usr/local/src/wxWidgets-3.0.5.tar.bz2 -C /usr/local/src/
5、
cd /usr/local/src/wxWidgets-3.0.5/
6、
./configure --with-regex=builtin --with-gtk --enable-unicode --disable-shared --prefix=/usr/local/wxWidgets
7、
make -j4 && make install
8、
vi /etc/profile
export WX_HOME=/usr/local/wxWidgets
export PATH=$PATH:$JAVA_HOME/bin:$WX_HOME/bin:
source /etc/profile
配置 ErLang 开发环境
ErLang
- ErLang 是由 Ericsson(爱立信)公司在 1991 年推出的编程语言,是一种通用的面向并发的编程语言,该语言专门针对于大型电信系统设计,拥有三个主要的特点:高并发高容错、软实时,现在主要应用于物联网的开发行业。RabbitMQ 服务运行需要 ErLang 开发环境支持,每一个不同的 RabbitMQ 版本都需要有与之匹配的 ErLang 运行环境,当前使用的 RabbitMQ 3.10.0release 版本对应的 ErLang 版本为 23.2~24.3,开发者可以通过“erlang.org”站点获得所需开发环境的支持
1、
https://www.rabbitmq.com/which-erlang.html
2、
dnf -y install ncurses-devel openssl openssl-devel unixODBC unixODBC-devel kernel-devel m4 tk tc
3、
mkdir -p /usr/local/erlang
4、
tar xzvf /usr/local/src/otp_src_24.3.tar.gz -C /usr/local/src/
5、
cd /usr/local/src/otp_src_24.3/
6、
./configure --without-java --with-ssl --enable-kernel-poll --enable-threads --enable-smp-support --enable-jit --enable-webview --prefix=/usr/local/erlang
7、
make -j4 && make install
8、
vi /etc/profile
export ERLANG_HOME=/usr/local/erlang
export PATH=$PATH:$JAVA_HOME/bin:$WX_HOME/bin:$ERLANG_HOME/bin:
source /etc/profile
9、
erl
io:format("www.yootk.com").
halt().
RabbitMQ 安装与配置
1、
xz -d /usr/local/src/rabbitmq-server-generic-unix-3.10.0.tar.xz
2、
tar xvf /usr/local/src/rabbitmq-server-generic-unix-3.10.0.tar -C /usr/local/
3、
mv /usr/local/rabbitmq_server-3.10.0/ /usr/local/rabbitmq
4、
/usr/local/rabbitmq/sbin/rabbitmq-server start
5、
/usr/local/rabbitmq/sbin/rabbitmq-server start > /dev/null 2>&1 &
6、
netstat -nptl
7、
/usr/local/rabbitmq/sbin/rabbitmqctl add_user yootk hello
8、
/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags yootk administrator
9、
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management
10、
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --zone=public --add-port=4369/tcp --permanent
firewall-cmd --reload
11、
C:\Windows\System32\drivers\etc\hosts
192.168.190.128 rabbitmq-server
12、
http://rabbitmq-server:15672/
RabbitMQ 开发核心结构
RabbitMQ 控制台导航项
- RabbitMQ 是 AMQP 协议实现组件,所以当打开 RabbitMQ 控制台时,可以发现在导航位置提供有 Connections(连接)、Channel(通道)、Exchanges (交换机)、Queue(队列)、Admin(管理)等链结信息
配置用户的 vhost 权限
- 按照 AMQP 协议的定义,当用户通过程序连接到 RabbitMQ 服务端之后,会自动在 Connections 以及 Channels 信息中进行注册,在使用时用户也可以依据程序动态的创建 Exchanqes 交换机以及 Queues 队列信息,但是如果要想正常进行程序的开发,需要进入到管理界面为当前的 yootk 账户分配默认的 vhost(虚拟主机)的使用权限,否则无法进行消息的收发处理
1、
// https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.15.0'
// https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.20.0'
RabbitMQ 通道连接
RabbitMQ 通道连接
- 消息生产者主要负责消息数据的生产,所有生产出来的消息在没有被消费端消费前,都会被保留在 RabbitMQ 服务之中,而在进行消息生产时就需要创建 Connection 以及 Channel,为此在 ampq-client 依赖库中提供了 ConnectionFactory 工厂类,通过该类实现 Connection 接口实例的创建
1、
package com.yootk.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MessageProducer { // 消息生产者
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
long start = System.currentTimeMillis(); // 获取当前的发送时间
CountDownLatch latch = new CountDownLatch(100); // 配置100个线程的延迟
for (int x = 0; x < 100; x++) { // 通过循环进行消息的发送
int temp = x; // 保存临时数据
TimeUnit.MILLISECONDS.sleep(10);
new Thread(()->{
String msg = "【沐言科技 - " + temp + "】www.yootk.com";
try {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
latch.countDown();
} catch (IOException e) { }
}, "消息生产者 - " + x).start();
}
latch.await(); // 等待数据发送完毕
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
创建消息消费者
RabbitMQ 消息消费者
- RabbitMQ 之中如果要进行消息的消费处理则需要启动消费端的处理线程,而后通过-消息处理的标准,指定的消息队列来进行消息的接收,为了统一提供了 Consumer 接口
1、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import javax.sound.midi.Soundbank;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 在实际的开发之中,在此方法内部需要进行业务接口的方法调用
try {
TimeUnit.SECONDS.sleep(1); // 每隔1秒消费一次
System.err.println("【" + count++ + "】消息的消费处理");
} catch (InterruptedException e) {}
String message = new String(body); // 获取消息内容
System.out.println("【接收消息】" + message);
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
消息应答
消息应答
- 消息服务器的运转流程依然属于传统的 C/S 开发架构,所以在网络通信的过程中就存在有大量的不可靠性,在数据书时难免会出现丢失、、延迟、错误、重复等各类状况,所以为了防止此类情况的出现,在每次消费完成后都要返回给服务端-个 ACK(Acknowledge 简写)回执信息,表示当前的消息处理完成,则消息组件会将当前消息删除。
- 如果接收方一直没有应答,服务端则认为该消息传递错误,则会重复进行投递
RabbitMo 应签处理
- 在 RabbitMQ 之中如果某些消息一直没有收到消费端的 ACK 应答信息,那么 RabbitMO 会将已经消费过的消息重新放回消息队列之中,随着消息的数量越来越多,就有可能带来内存泄漏的致命问题,从而导致 RabbitMQ 服务崩溃,在 RabbitMQ 之中 ACK 的回应处理有自动与手工两种方式:
- ACK 自动应答。ACK 消息的自动应答机制中是在绑定消费端监听时处理的,在 Channel 接口中对于 basicConsumer0)方法进行了重载,其中一个重载的方法中可以进行 Ack 的状态配置,该方法如下
- public String basicConsume(String queue, boolean autoAck, Consumer callback) throwsiOException
- ACK 手动应答。手工应答可以在每一次的消息消费处理中根据需要手工调用,在消息处理完成后可以通过 Envelope 类获取一个传送标签(DeliveryTag),该标签是个标记数字,直接通过 Channel 接口提供的 basicAck()方法即可回应。
- channel.basicAck(envelope.getDeliveryTag(),false); // 对当前消息进行确认
1、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import javax.sound.midi.Soundbank;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 在实际的开发之中,在此方法内部需要进行业务接口的方法调用
try {
TimeUnit.SECONDS.sleep(1); // 每隔1秒消费一次
System.err.println("【" + count++ + "】消息的消费处理");
} catch (InterruptedException e) {}
String message = new String(body); // 获取消息内容
System.out.println("【接收消息】" + message);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer); // 连接队列与消费者
}
}
2、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import javax.sound.midi.Soundbank;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 在实际的开发之中,在此方法内部需要进行业务接口的方法调用
try {
TimeUnit.SECONDS.sleep(1); // 每隔1秒消费一次
System.err.println("【" + count++ + "】消息的消费处理");
} catch (InterruptedException e) {}
String message = new String(body); // 获取消息内容
System.out.println("【接收消息】" + message);
// multiple参数,如果设置为false表示只对当前的消息进行回应
// 如果设置为true则表示对所有的消息进行回应
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
消息持久化
瞬时消息队列
- 在 RabbitMQ 之中所有消息数据的存储都是通过 Queue 实现的,而做为一款稳定可靠的消息队列数据,更是要保证在服务出现停止时,未被消费的数据应该被妥善保管,但是到现在为止所创建的消息队列都属于瞬时(Transient)消息队列,即:在 RabbitMQ 服务停止后,所有的消息数据都将消失。
1、
/usr/local/rabbitmq/sbin/rabbitmq-server start > /dev/null 2>&1 &
/usr/local/rabbitmq/sbin/rabbitmq-plugins enable rabbitmq_management
2、
package com.yootk.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MessageProducer { // 消息生产者
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
long start = System.currentTimeMillis(); // 获取当前的发送时间
CountDownLatch latch = new CountDownLatch(100); // 配置100个线程的延迟
for (int x = 0; x < 100; x++) { // 通过循环进行消息的发送
int temp = x; // 保存临时数据
new Thread(()->{
String msg = "【沐言科技 - " + temp + "】www.yootk.com";
try {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
latch.countDown();
} catch (IOException e) { }
}, "消息生产者 - " + x).start();
}
latch.await(); // 等待数据发送完毕
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
3、
package com.yootk.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class MessageProducer { // 消息生产者
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
long start = System.currentTimeMillis(); // 获取当前的发送时间
CountDownLatch latch = new CountDownLatch(100); // 配置100个线程的延迟
for (int x = 0; x < 100; x++) { // 通过循环进行消息的发送
int temp = x; // 保存临时数据
new Thread(()->{
String msg = "【沐言科技 - " + temp + "】www.yootk.com";
try {
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
latch.countDown();
} catch (IOException e) { }
}, "消息生产者 - " + x).start();
}
latch.await(); // 等待数据发送完毕
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
虚拟主机
vhost 虚拟主机
- 为了可以区分出不同的开发业务,,在 RabbitMQ 的内部设置有一个逻辑上的虚拟主机划分,一台 RabbitMQ 中可以创建若干个虚拟主机,每一个虚拟主机中都拥有自己独立的交换机、队列,同时每一台虚拟主机也都存在有独立的权限划分。
创建虚拟主机
- 在 RabbitMQ 控制台中,开发者可以直接通过 RabbitMQ 管理界面进行虚拟主机的创建在创建时只需要设置虚拟主机的名称以及标签即可。
/usr/local/rabbitmq/sbin/rabbitmqctl add_vhost YootkVHost
/usr/local/rabbitmq/sbin/rabbitmqctl list_vhosts
/usr/local/rabbitmq/sbin/rabbitmqctl set_permissions -p YootkVHost yootk .* .* .*
/usr/local/rabbitmq/sbin/rabbitmqctl delete_vhost YootkVHost
public class MessageProducer { // 消息生产者
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
long start = System.currentTimeMillis(); // 获取当前的发送时间
CountDownLatch latch = new CountDownLatch(100); // 配置100个线程的延迟
for (int x = 0; x < 100; x++) { // 通过循环进行消息的发送
int temp = x; // 保存临时数据
new Thread(()->{
String msg = "【沐言科技 - " + temp + "】www.yootk.com";
try {
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
latch.countDown();
} catch (IOException e) { }
}, "消息生产者 - " + x).start();
}
latch.await(); // 等待数据发送完毕
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
2、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import javax.sound.midi.Soundbank;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 在实际的开发之中,在此方法内部需要进行业务接口的方法调用
try {
TimeUnit.SECONDS.sleep(1); // 每隔1秒消费一次
System.err.println("【" + count++ + "】消息的消费处理");
} catch (InterruptedException e) {}
String message = new String(body); // 获取消息内容
System.out.println("【接收消息】" + message);
// multiple参数,如果设置为false表示只对当前的消息进行回应
// 如果设置为true则表示对所有的消息进行回应
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
fanout 广播模式
Exchange 交换机
- 除了基本的消息队列的使用之外,在 RabbitMQ 之中还提供了 Exchange 交换机、RoutingKey 路由 KEY 等核心概念同时在 Channel 接口中提供的 basicPublish()方法之中也提供有对应的参数支持,在本节中将基于 Exchange 实现广播模式(fanout)、直连模式(direct)以及主题模式(topic)的消息收发。
fanout 广播模式
- 消息组件中一般会绑定有多个消费端的应用,如果现在希望所有的消费端都可以消费同一条数据,那么就可以基于广播的模式来实现
- 广播模式的配置主要是依靠 Exchange 实现的,在进行 Exchange 定义时,可以采用"fanout”的配置模式,这样所有绑定在该 Exchange 上的所有消息队列都可以对同一条消息进行投递,以保证每一位消费者可以接收到消息内容,广播的实现需要对生产者与消费者应用同时修改
1、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumerA { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费端A - 接收消息】" + message);
// multiple参数,如果设置为false表示只对当前的消息进行回应
// 如果设置为true则表示对所有的消息进行回应
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
2、
package com.yootk.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumerB { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费端B - 接收消息】" + message);
// multiple参数,如果设置为false表示只对当前的消息进行回应
// 如果设置为true则表示对所有的消息进行回应
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
3、
package com.yootk.rabbitmq.producer.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
public class FanoutMessageProducer { // 消息生产者
private static final String QUEUE_NAME = "yootk.queue.msg"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 现在仅仅是进行了Exchange的创建,但是此时不再进行队列的声明了
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 创建Exchange
long start = System.currentTimeMillis(); // 获取当前的发送时间
for (int x = 0; x < 5; x++) { // 通过循环进行消息的发送
String msg = "【沐言科技 - " + x + "】〖FANOUT〗www.yootk.com";
try {
channel.basicPublish(EXCHANGE_NAME, "",
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
} catch (IOException e) {}
}
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
4、
package com.yootk.rabbitmq.consumer.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupAMessageConsumerA { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.a.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 路由KEY先为空字符串
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组A-消费者A-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
5、
package com.yootk.rabbitmq.consumer.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupAMessageConsumerB { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.a.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 路由KEY先为空字符串
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组A-消费者B-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
6、
package com.yootk.rabbitmq.consumer.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupAMessageConsumerB { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.a.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 路由KEY先为空字符串
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组A-消费者B-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
7、
package com.yootk.rabbitmq.consumer.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupBMessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.b.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 路由KEY先为空字符串
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组B-消费者-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
8、
package com.yootk.rabbitmq.consumer.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupCMessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String QUEUE_NAME = "yootk.c.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.fanout"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 路由KEY先为空字符串
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组C-消费者-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
direct 直连模式
RoutingKey
- 使用 fanout 的模式,可以将一个消息在所有的消息队列之中进行传递,如果说现在某个消息只允许在特定的消息队列中进行传递,这个时候就可以使用 RoutingKey 进行标记
1、
package com.yootk.rabbitmq.producer.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
public class DirectMessageProducer { // 消息生产者
private static final String ROUTING_KEY = "yootk.routing.key"; // 路由KEY
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.direct"; // 自定义的名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 现在仅仅是进行了Exchange的创建,但是此时不再进行队列的声明了
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 创建Exchange
long start = System.currentTimeMillis(); // 获取当前的发送时间
for (int x = 0; x < 5; x++) { // 通过循环进行消息的发送
String msg = "【沐言科技 - " + x + "】〖DIRECT〗www.yootk.com";
try {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
} catch (IOException e) {}
}
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
2、
package com.yootk.rabbitmq.consumer.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupAMessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String ROUTING_KEY = "muyan.routing.key";
private static final String QUEUE_NAME = "yootk.a.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.direct"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组A-消费者-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
3、
package com.yootk.rabbitmq.consumer.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
public class GroupBMessageConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String ROUTING_KEY = "yootk.routing.key"; // 匹配
private static final String QUEUE_NAME = "yootk.b.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.direct"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【消费组B-消费者-消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
topic 主题模式
Topic 主题模式
- 在一个庞大的系统架构中,有可能会存在有许多类型相同的消息,为了方便消费处理的归类,可以采用 Topic 交换机的处理模式。该模式可以通过 RoutingKey 的模糊匹配,让不同类型的消息通过指定消息队列的消费端进行处理
- 在主题模式中,需要在 RoutingKey 中定义匹配模式,而后不同的消息会依据匹配模式的不同分发给指定队列的消费端,在 RabbitMQ 中提供了两种匹配的占位符
- ”*“匹配 0 个或 1 个单词;
- ”#“ 匹配 0 个、1 个或多个单词
1、
package com.yootk.rabbitmq.producer.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
public class TopicERPMessageProducer { // 消息生产者
private static final String ROUTING_KEY = "muyan.erp.dept.add"; // 路由KEY
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.topic"; // 自定义的名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 现在仅仅是进行了Exchange的创建,但是此时不再进行队列的声明了
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 创建Exchange
long start = System.currentTimeMillis(); // 获取当前的发送时间
for (int x = 0; x < 5; x++) { // 通过循环进行消息的发送
String msg = "【沐言科技 - ERP管理系统 - " + x + "】〖TOPIC〗www.yootk.com";
try {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
} catch (IOException e) {}
}
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
2、
package com.yootk.rabbitmq.producer.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
public class TopicCRMMessageProducer { // 消息生产者
private static final String ROUTING_KEY = "muyan.crm.emp.delete"; // 路由KEY
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.topic"; // 自定义的名称
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
// 现在仅仅是进行了Exchange的创建,但是此时不再进行队列的声明了
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 创建Exchange
long start = System.currentTimeMillis(); // 获取当前的发送时间
for (int x = 0; x < 5; x++) { // 通过循环进行消息的发送
String msg = "【沐言科技 - CRM管理系统 - " + x + "】〖TOPIC〗www.yootk.com";
try {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化消息
} catch (IOException e) {}
}
long end = System.currentTimeMillis(); // 获取结束的发送时间
System.out.println("【消息发送完毕】消息发送的耗时时间:" + (end - start));
channel.close(); // 关闭通道
connection.close(); // 关闭连接
}
}
3、
package com.yootk.rabbitmq.consumer.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicDeptConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String ROUTING_KEY = "#.dept.#"; // 路由匹配
private static final String QUEUE_NAME = "yootk.a.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.topic"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【部门消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
4、
package com.yootk.rabbitmq.consumer.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
public class TopicEmpConsumer { // 消息消费者
// 消费者是依据队列进行消费的,所以与生产者的队列名称必须一致
private static final String ROUTING_KEY = "#.emp.#"; // 路由匹配
private static final String QUEUE_NAME = "yootk.b.group.queue"; // 队列名称
private static final String HOST = "rabbitmq-server"; // 写的是映射名称
private static final int PORT = 5672; // 服务端口
private static final String USERNAME = "yootk"; // 用户名
private static final String PASSWORD = "hello"; // 密码
private static final String VHOST = "MuyanVHost"; // 虚拟主机的名称
private static final String EXCHANGE_NAME = "yootk.exchange.topic"; // 自定义的名称
private static int count = 1; // 消费次数的记录
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 连接工厂
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VHOST); // 配置虚拟主机
Connection connection = factory.newConnection(); // 创建连接
// 在RabbitMQ之中创建完成了连接之后,需要通过连接获取通道的实例
Channel channel = connection.createChannel(); // 创建通道
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 采用同样的Exchange类型
// 所有的消息的内容都保存在队列之中,但是现阶段RabbitMQ没有提供队列
channel.queueDeclare(QUEUE_NAME, true, false, true, null);
// 队列需要与Exchange进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
Consumer consumer = new DefaultConsumer(channel) { // 配置消息的消费者
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body); // 获取消息内容
System.out.println("【雇员消息接收】" + message);
channel.basicAck(envelope.getDeliveryTag(), false); // 手工应答
}
};
channel.basicConsume(QUEUE_NAME, consumer); // 连接队列与消费者
}
}
Spring 整合 RabbitMQ
程序开发结构
为了进一步简化 RabbitMQ 的开发,Spring 提供了“spring-rabbit”实现依赖,开发者可以通过该依赖库实现 RabbitMQ 连接的管理,AmgpTemplate 发送模版、消费端监听的功能,本次将基于此依赖实现 RabbitMQ 的开发,创建三个新的子模块,名称分别为:
amgp-common(公共模块)
amgp-producer(消息生产者模块)
amgp-consumer(消息消费者模块)
// https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit
implementation group: 'org.springframework.amqp', name: 'spring-rabbit', version: '3.1.2'
1、
// https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit
implementation group: 'org.springframework.amqp', name: 'spring-rabbit', version: '2.4.6'
// https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.11.1'
// https://mvnrepository.com/artifact/org.apache.commons/commons-pool2
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
2、
package com.yootk.producer;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("com.yootk.producer")
public class StartAMQPProducer {
}
3、
package com.yootk.consumer;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan("com.yootk.consumer")
public class StartAMQPConsumer {
}
RabbitMQ 消费端
Spring 整合 RabbitMQ 配置结构
- 在 Spring 整合 RabbitMQ 的处理操作中,需要通过已有的“amqp-client”依赖提供的 ConnectionFactory 实现连接的配置管理,但是此对象实例需要交给 Spring 来管理,可以根据自身的需要选择是否采用对象池的方式管理,以提高 RabbitMQ 的处理性能。而后的配置主要就围绕着 Queue、Exchange、Binding 等核心概念展开
1、
# 【RabbitMQ的配置信息】编写RabbitMQ服务主机,该主机名称已经在hosts文件中进行了配置
amqp.rabbitmq.host=rabbitmq-server
# 【RabbitMQ的配置信息】设置RabbitMQ的连接端口
amqp.rabbitmq.port=5672
# 【RabbitMQ的配置信息】定义RabbitMQ的用户名
amqp.rabbitmq.username=yootk
# 【RabbitMQ的配置信息】用户登录密码
amqp.rabbitmq.password=hello
# 【RabbitMQ的配置信息】配置虚拟主机,此时的用户一定要拥有该虚拟主机的对应权限
amqp.rabbitmq.vhost=MuyanVHost
# 【RabbitMQ的配置信息】配置路由KEY
amqp.rabbitmq.routing.key=muyan.message.key
# 【RabbitMQ的配置信息】本次使用的是fanout的模式(不同的模式实现类上有区别)
amqp.rabbitmq.exchange.name=yootk.exchange.fanout
# 【RabbitMQ的配置信息】定义消费队列的名称
amqp.rabbitmq.queue.name=muyan.consumer.queue
2、
package com.yootk.consumer.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class RabbitMQMessageListener implements MessageListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQMessageListener.class); // 日志对象
@Override
public void onMessage(Message message) { // 接收消息的内容
LOGGER.info("【接收消息】消息内容:{}", new String(message.getBody()));
}
}
3、
package com.yootk.consumer.config;
import com.yootk.consumer.listener.RabbitMQMessageListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig { // RabbitMQ配置类
@Value("${amqp.rabbitmq.host}")
private String host;
@Value("${amqp.rabbitmq.port}")
private Integer port;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.routing.key}")
private String queueName;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${amqp.rabbitmq.queue.name}")
private String routingKey;
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.ConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
return factory;
}
@Bean
public org.springframework.amqp.core.Queue queue() { // 队列声明
return new org.springframework.amqp.core.Queue(this.queueName, true, false, true);
}
@Bean
public RabbitMQMessageListener rabbitMQMessageListener() {
return new RabbitMQMessageListener(); // 消息监听的处理类
}
@Bean
public SimpleMessageListenerContainer listenerContainer(
RabbitMQMessageListener rabbitMQMessageListener,
org.springframework.amqp.core.Queue queue,
org.springframework.amqp.rabbit.connection.ConnectionFactory factory
) { // 监听容器
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(factory); // 配置监听容器
container.setConcurrentConsumers(5); // 并行的消费端数量
container.setMaxConcurrentConsumers(10); // 最大的并行消费端数量
container.setMessageListener(rabbitMQMessageListener);
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动应答
container.addQueues(queue); // 追加队列
container.initialize(); // 容器初始化
return container;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean
public Binding binding(Exchange exchange,
org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey).noargs();
}
@Bean
public RabbitAdmin admin(
RetryTemplate retryTemplate,
Binding binding,
Exchange exchange,
org.springframework.amqp.core.Queue queue,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
RabbitAdmin admin = new RabbitAdmin(springConnectionFactory);
admin.setRetryTemplate(retryTemplate); // 配置重试模版
admin.declareQueue(queue); // 声明使用的队列
admin.declareExchange(exchange); // 声明交换空间
admin.declareBinding(binding); // 声明绑定
return admin;
}
}
4、
package com.yootk.test;
import com.yootk.consumer.StartAMQPConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartAMQPConsumer.class)
@ExtendWith(SpringExtension.class)
public class TestRabbitMQConsumer {
@Test
public void testReceive() throws Exception {
TimeUnit.SECONDS.sleep(Long.MAX_VALUE); // 持续等待
}
}
RabbitMQ 生产端
生产端模型
- 为了简化消息生产的处理过程,Spring 中提供了一个 AmqpTemplate 的模版接口,在该接口中可以直接发送普通消息,也可以将消息封装在 Message 对象实例后再进行发为了规范化本次的消息发送处理,将创建一个 MessageService 业务接口,并在该业务接口的实现类中注入 AmqpTemplate 接口实例
1、
# 【RabbitMQ的配置信息】编写RabbitMQ服务主机,该主机名称已经在hosts文件中进行了配置
amqp.rabbitmq.host=rabbitmq-server
# 【RabbitMQ的配置信息】设置RabbitMQ的连接端口
amqp.rabbitmq.port=5672
# 【RabbitMQ的配置信息】定义RabbitMQ的用户名
amqp.rabbitmq.username=yootk
# 【RabbitMQ的配置信息】用户登录密码
amqp.rabbitmq.password=hello
# 【RabbitMQ的配置信息】配置虚拟主机,此时的用户一定要拥有该虚拟主机的对应权限
amqp.rabbitmq.vhost=MuyanVHost
# 【RabbitMQ的配置信息】配置路由KEY
amqp.rabbitmq.routing.key=muyan.message.key
# 【RabbitMQ的配置信息】本次使用的是fanout的模式(不同的模式实现类上有区别)
amqp.rabbitmq.exchange.name=yootk.exchange.fanout
2、
package com.yootk.producer.config;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig {
@Value("${amqp.rabbitmq.host}")
private String host;
@Value("${amqp.rabbitmq.port}")
private Integer port;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.routing.key}")
private String queueName;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.ConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean // 最终消息的发送处理是由专属的模版类提供支持的
public AmqpTemplate amqpTemplate(
RetryTemplate retryTemplate,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory) {
RabbitTemplate template = new RabbitTemplate(springConnectionFactory);
template.setExchange(this.exchangeName); // 交换空间
template.setRetryTemplate(retryTemplate);
return template;
}
}
3、
package com.yootk.producer.service;
public interface IMessageService {
public void send(String msg); // 消息的发送
}
4、
package com.yootk.producer.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;
@Service
@PropertySource("classpath:config/amqp.properties") // 配置资源文件
public class MessageServiceImpl implements IMessageService {
@Value("${amqp.rabbitmq.routing.key}")
private String routingKey;
@Autowired
private AmqpTemplate amqpTemplate; // 配置的发送模版
@Override
public void send(String msg) {
this.amqpTemplate.convertAndSend(this.routingKey, msg);
}
}
5、
package com.yootk.test;
import com.yootk.producer.StartAMQPProducer;
import com.yootk.producer.service.IMessageService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartAMQPProducer.class)
@ExtendWith(SpringExtension.class)
public class TestMessageService { // 消息的发送测试
@Autowired
private IMessageService messageService;
@Test
public void testSend() {
this.messageService.send("沐言科技:www.yootk.com"); // 消息发送
}
}
消费端注解配置
MessageListener 实现
- 要在 Spring 中启用消息消费端的处理则需要创建 MessageListenerContainer 接囗实例,而后在该接口中绑定消息监听类。不在默认情况下,所有的消息监听类都需要强制性的实现 MessageListener 父接
RabbitListener 注解组成
- MessageListener 提供了消息监听的处理标准,而 SimpleMessageListenerContainer 子类依据 MessageListener 接口实现监听程序的注册,但是这种强制性的接口实现结构并不是 Spring 所提倡的设计,为了解决此类问题,在 Spring 中又提供了@RabbitListener”注解
1、
package com.yootk.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@EnableRabbit // 启用注解配置
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig { // RabbitMQ配置类
@Value("${amqp.rabbitmq.host}")
private String host;
@Value("${amqp.rabbitmq.port}")
private Integer port;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.queue.name}")
private String queueName;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${amqp.rabbitmq.routing.key}")
private String routingKey;
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.ConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
return factory;
}
@Bean
public org.springframework.amqp.core.Queue queue() { // 队列声明
return new org.springframework.amqp.core.Queue(this.queueName, true, false, true);
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(springConnectionFactory);
factory.setConcurrentConsumers(5); // 并行Consumer数量
factory.setMaxConcurrentConsumers(10); // 最大的Consumer数量
factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动应答
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean
public Binding binding(Exchange exchange,
org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey).noargs();
}
@Bean
public RabbitAdmin admin(
RetryTemplate retryTemplate,
Binding binding,
Exchange exchange,
org.springframework.amqp.core.Queue queue,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
RabbitAdmin admin = new RabbitAdmin(springConnectionFactory);
admin.setRetryTemplate(retryTemplate); // 配置重试模版
admin.declareQueue(queue); // 声明使用的队列
admin.declareExchange(exchange); // 声明交换空间
admin.declareBinding(binding); // 声明绑定
return admin;
}
}
2、
package com.yootk.consumer.listener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component // 扫描注册
public class RabbitMQMessageListener { // 强制性的接口实现一定不是好设计
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQMessageListener.class); // 日志对象
@RabbitListener(
queues = "muyan.consumer.queue",
admin = "admin",
containerFactory = "rabbitListenerContainerFactory"
)
public void handle(String message) { // 接收消息的内容
LOGGER.info("【接收消息】消息内容:{}", message);
}
}
对象消息传输
对象消息传输
RabbitMQ 在进行消息数据传输时,采用的是二进制数据形式,这样除了可以实现普通文本数据的传输之外,也可以基于对象序列化的方式,进行对象数据的传输
未经授权类解决
SPRING_AMQP_DESERIALIZATION_TRUST_ALL=true
1、
package com.yootk.common;
import java.io.Serializable;
public class Dept implements Serializable { // 二进制数据传输
private Long deptno;
private String dname;
private String loc;
public Long getDeptno() {
return deptno;
}
public void setDeptno(Long deptno) {
this.deptno = deptno;
}
public String getDname() {
return dname;
}
public void setDname(String dname) {
this.dname = dname;
}
public String getLoc() {
return loc;
}
public void setLoc(String loc) {
this.loc = loc;
}
}
2、
package com.yootk.consumer.listener;
import com.yootk.common.Dept;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component // 扫描注册
public class RabbitMQMessageListener { // 强制性的接口实现一定不是好设计
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQMessageListener.class); // 日志对象
@RabbitListener(
queues = "muyan.consumer.queue",
admin = "admin",
containerFactory = "rabbitListenerContainerFactory"
)
public void handle(Dept dept) { // 接收消息的内容
LOGGER.info("【接收消息】部门编号:{}、部门名称:{}、部门位置:{}",
dept.getDeptno(), dept.getDname(), dept.getLoc());
}
}
3、
package com.yootk.producer.service;
import com.yootk.common.Dept;
public interface IMessageService {
public void send(Dept dept); // 消息的发送
}
4、
package com.yootk.producer.service.impl;
import com.yootk.common.Dept;
import com.yootk.producer.service.IMessageService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;
@Service
@PropertySource("classpath:config/amqp.properties") // 配置资源文件
public class MessageServiceImpl implements IMessageService {
@Value("${amqp.rabbitmq.routing.key}")
private String routingKey;
@Autowired
private AmqpTemplate amqpTemplate; // 配置的发送模版
@Override
public void send(Dept dept) {
this.amqpTemplate.convertAndSend(this.routingKey, dept);
}
}
5、
package com.yootk.test;
import com.yootk.common.Dept;
import com.yootk.producer.StartAMQPProducer;
import com.yootk.producer.service.IMessageService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@ContextConfiguration(classes = StartAMQPProducer.class)
@ExtendWith(SpringExtension.class)
public class TestMessageService { // 消息的发送测试
@Autowired
private IMessageService messageService;
@Test
public void testSend() {
Dept dept = new Dept();
dept.setDeptno(10L);
dept.setDname("教学研发部");
dept.setLoc("北京");
this.messageService.send(dept); // 消息发送
}
}
消息批处理
消息批量处理
- 传统的消费模型是每一次通过 RabbitMQ 获取单条消息数据,但是如果此时有大批量这样的做法势必会造成 I0 通道的拥挤,所以为了解决此类问题,就的消息传递过来需要实现消息的批处理操作
批量消息发送
- 由于此时的消费端不再是单一的消息接收,所以需要开启批量接收的处理模式。同时生产者也存在有批量消息的发送需求,这时就需要通过 BatchingRabbitTemplate 消息发送者类来完成
1、
package com.yootk.test;
import com.yootk.common.Dept;
import com.yootk.producer.StartAMQPProducer;
import com.yootk.producer.service.IMessageService;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import java.util.concurrent.TimeUnit;
@ContextConfiguration(classes = StartAMQPProducer.class)
@ExtendWith(SpringExtension.class)
public class TestMessageService { // 消息的发送测试
@Autowired
private IMessageService messageService;
@Test
public void testSend() throws Exception {
for (int x = 0; x < 100; x++) { // 模拟高并发
new Thread(()->{
Dept dept = new Dept();
dept.setDeptno(10L);
dept.setDname("教学研发部");
dept.setLoc(Thread.currentThread().getName()); // 线程名称作为位置标记
this.messageService.send(dept); // 消息发送
}, "消息生产者 - " + x).start();
}
TimeUnit.SECONDS.sleep(3L);
}
}
2、
package com.yootk.consumer.listener;
import com.yootk.common.Dept;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component // 扫描注册
public class RabbitMQMessageListener { // 强制性的接口实现一定不是好设计
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQMessageListener.class); // 日志对象
@RabbitListener(
queues = "muyan.consumer.queue",
admin = "admin",
containerFactory = "rabbitListenerContainerFactory"
)
public void handle(Dept dept) { // 接收消息的内容
LOGGER.info("*********************** 接收到新消息 ***********************");
LOGGER.info("【接收消息】部门编号:{}、部门名称:{}、部门位置:{}",
dept.getDeptno(), dept.getDname(), dept.getLoc());
}
}
3、
package com.yootk.consumer.listener;
import com.yootk.common.Dept;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component // 扫描注册
public class RabbitMQMessageListener { // 强制性的接口实现一定不是好设计
private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQMessageListener.class); // 日志对象
@RabbitListener(
queues = "muyan.consumer.queue",
admin = "admin",
containerFactory = "rabbitListenerContainerFactory"
)
public void handle(List<Dept> depts) { // 接收消息的内容
LOGGER.info("*********************** 接收到新消息 ***********************");
for (Dept dept : depts) {
LOGGER.info("【接收消息】部门编号:{}、部门名称:{}、部门位置:{}",
dept.getDeptno(), dept.getDname(), dept.getLoc());
}
}
}
4、
package com.yootk.producer.config;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig {
@Value("${amqp.rabbitmq.host}")
private String host;
@Value("${amqp.rabbitmq.port}")
private Integer port;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Bean
public TaskScheduler batchQueueTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 物理内核 * 2 = 线程池的大小
taskScheduler.setPoolSize(16); // 任务池大小
return taskScheduler;
}
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.ConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean // 最终消息的发送处理是由专属的模版类提供支持的
public AmqpTemplate amqpTemplate(
RetryTemplate retryTemplate,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory,
TaskScheduler batchQueueTaskScheduler) {
int batchSize = 20; // 批量传输的大小
int bufferLimit = 4096; // 4K缓冲区
long timeout = 10000; // 发送延迟
BatchingStrategy strategy = new SimpleBatchingStrategy(
batchSize, bufferLimit, timeout); // 批量发送策略
BatchingRabbitTemplate template = new BatchingRabbitTemplate(
springConnectionFactory, strategy, batchQueueTaskScheduler);
template.setExchange(this.exchangeName); // 交换空间
template.setRetryTemplate(retryTemplate);
return template;
}
}
5、
package com.yootk.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@EnableRabbit // 启用注解配置
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig { // RabbitMQ配置类
@Value("${amqp.rabbitmq.host}")
private String host;
@Value("${amqp.rabbitmq.port}")
private Integer port;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.queue.name}")
private String queueName;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${amqp.rabbitmq.routing.key}")
private String routingKey;
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setHost(this.host);
factory.setPort(this.port);
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.ConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
return factory;
}
@Bean
public org.springframework.amqp.core.Queue queue() { // 队列声明
return new org.springframework.amqp.core.Queue(this.queueName, true, false, true);
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(springConnectionFactory);
factory.setConcurrentConsumers(5); // 并行Consumer数量
factory.setMaxConcurrentConsumers(10); // 最大的Consumer数量
factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动应答
int batchSize = 20; // 批量传输的大小
int bufferLimit = 4096; // 4K缓冲区
long timeout = 10000; // 发送延迟
BatchingStrategy strategy = new SimpleBatchingStrategy(
batchSize, bufferLimit, timeout); // 批量发送策略
factory.setConsumerBatchEnabled(true); // 启用消费批处理
factory.setBatchingStrategy(strategy); // 批处理策略
factory.setBatchListener(true); // 批处理监听
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean
public Binding binding(Exchange exchange,
org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey).noargs();
}
@Bean
public RabbitAdmin admin(
RetryTemplate retryTemplate,
Binding binding,
Exchange exchange,
org.springframework.amqp.core.Queue queue,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
RabbitAdmin admin = new RabbitAdmin(springConnectionFactory);
admin.setRetryTemplate(retryTemplate); // 配置重试模版
admin.declareQueue(queue); // 声明使用的队列
admin.declareExchange(exchange); // 声明交换空间
admin.declareBinding(binding); // 声明绑定
return admin;
}
}
6、
*********************** 接收到新消息 ***********************
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 39
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 38
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 33
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 37
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 60
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 32
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 31
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 59
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 58
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 54
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 53
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 52
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 48
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 42
【接收消息】部门编号:10、部门名称:教学研发部、部门位置:消息生产者 - 40
RabbitMQ 集群架构
RabbitMQ 集群
- RabbitMQ 组件相比较其他消息组件来讲,其稳定性更高,可以实现可靠的消息传输但是这些都是建立在 RabbitMQ 服务可以正常提供的前提下,但是对于当前的 RabbitMQ 服务来讲属于单节点应用,如果当前的节点出现了服务崩溃、机房断电、网络瘫痪等情况,那么最终就会导致整个的业务应用出现严重问题,从而导致整体项目的瘫痪。
方案一:远程模式
- 该模式又被称为 Shovel(铲子)模式,可以将远程模式实现了一种双活模式的结构并且可以实现跨地域的连接模式,在本地服务访本地消息服务的数据复制到远程主机,问量过高时,远程服务也可以提供支持,如果要配置远程模式需要安装 amap client 与 rabbitmg shovel 插件,并且该架构属于 RabbitMQ 早期集群方案,并且架构实现复杂现在已经不建议使用了,
方案二:主备模式
- 主备模式又称为 Warren 模式,指的是创建两个不同的 RabbitMQ 节点,每个节点都保留有各白的交换机,并且设置公共的数据存储空间。集群中的两个 RabbitMQ 节点都通过 HAProxy 代理组件进行访问,当 Master 服务节点出现问题后,HAProxy 可以自动的切换到 Backup 节点,继续提供消息服务,此种式适合于并发量不高的应用场景。
方案三:多活模式
- 多活模式可以设置不同的 RabbitMQ 集群,同时基于 Federation 插件实现 AMOP 通讯连接双方可以使用不同的账户以及虚拟主机。在该模式中每一个集群除了要提供正常的业务支持外,还要实现部分的消息数据共享
方案四:镜像模式
- 镜像(Mirror)模式是实际开发中最为常用的 RabbitMQ 集群解决方案,并且其实现简单,同时该方案可以实现 100%的数据可靠性存储,在集群中的每一个 RabbitMQ 节点都保存有相同的数据信息,如果其中有一个节点出现了问题,那么其他的节点可以继续
搭建 RabbitMQ 镜像集群
1、
vi /etc/sysconfig/network-scripts/ifcfg-ens33
IPADDR=192.168.190.151
IPADDR=192.168.190.152
IPADDR=192.168.190.153
2、
vi /etc/hostname
rabbitmq-cluster-a
rabbitmq-cluster-b
rabbitmq-cluster-c
3、
vi /etc/hosts
192.168.190.151 rabbitmq-cluster-a
192.168.190.152 rabbitmq-cluster-b
192.168.190.153 rabbitmq-cluster-c
4、
reboot
5、
cat ~/.erlang.cookie
6、
/usr/local/rabbitmq/sbin/rabbitmq-server start > /dev/null 2>&1 &
7、
/usr/local/rabbitmq/sbin/rabbitmqctl cluster_status
8、
/usr/local/rabbitmq/sbin/rabbitmqctl stop_app
9、
/usr/local/rabbitmq/sbin/rabbitmqctl join_cluster rabbit@rabbitmq-cluster-a
10、
/usr/local/rabbitmq/sbin/rabbitmqctl start_app
11、
C:\Windows\System32\drivers\etc\hosts
192.168.190.151 rabbitmq-cluster-a
192.168.190.152 rabbitmq-cluster-b
192.168.190.153 rabbitmq-cluster-c
12、
rabbitmq-cluster-b:15672
13、
/usr/local/rabbitmq/sbin/rabbitmqctl add_user yootk hello
/usr/local/rabbitmq/sbin/rabbitmqctl set_user_tags yootk administrator
RabbitMQ 集群镜像配置
1、
/usr/local/rabbitmq/sbin/rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
RabbitMQ 集群程序开发
1、
# 【RabbitMQ的配置信息】定义RabbitMQ集群服务主机
amqp.rabbitmq.addresses=rabbitmq-cluster-a:5672,rabbitmq-cluster-b:5672,rabbitmq-cluster-c:5672
# 【RabbitMQ的配置信息】定义RabbitMQ的用户名
amqp.rabbitmq.username=yootk
# 【RabbitMQ的配置信息】用户登录密码
amqp.rabbitmq.password=hello
# 【RabbitMQ的配置信息】配置虚拟主机,此时的用户一定要拥有该虚拟主机的对应权限
amqp.rabbitmq.vhost=MuyanVHost
# 【RabbitMQ的配置信息】配置路由KEY
amqp.rabbitmq.routing.key=muyan.message.key
# 【RabbitMQ的配置信息】本次使用的是fanout的模式(不同的模式实现类上有区别)
amqp.rabbitmq.exchange.name=yootk.exchange.fanout
# 【RabbitMQ的配置信息】定义消费队列的名称
amqp.rabbitmq.queue.name=muyan.consumer.queue
2、
package com.yootk.consumer.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
@EnableRabbit // 启用注解配置
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig { // RabbitMQ配置类
@Value("${amqp.rabbitmq.addresses}")
private String addresses;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.queue.name}")
private String queueName;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Value("${amqp.rabbitmq.routing.key}")
private String routingKey;
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
factory.setAddresses(this.addresses); // 设置集群地址
return factory;
}
@Bean
public org.springframework.amqp.core.Queue queue() { // 队列声明
return new org.springframework.amqp.core.Queue(this.queueName, true, false, true);
}
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(springConnectionFactory);
factory.setConcurrentConsumers(5); // 并行Consumer数量
factory.setMaxConcurrentConsumers(10); // 最大的Consumer数量
factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动应答
int batchSize = 20; // 批量传输的大小
int bufferLimit = 4096; // 4K缓冲区
long timeout = 10000; // 发送延迟
BatchingStrategy strategy = new SimpleBatchingStrategy(
batchSize, bufferLimit, timeout); // 批量发送策略
factory.setConsumerBatchEnabled(true); // 启用消费批处理
factory.setBatchingStrategy(strategy); // 批处理策略
factory.setBatchListener(true); // 批处理监听
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean
public Binding binding(Exchange exchange,
org.springframework.amqp.core.Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with(this.routingKey).noargs();
}
@Bean
public RabbitAdmin admin(
RetryTemplate retryTemplate,
Binding binding,
Exchange exchange,
org.springframework.amqp.core.Queue queue,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory
) {
RabbitAdmin admin = new RabbitAdmin(springConnectionFactory);
admin.setRetryTemplate(retryTemplate); // 配置重试模版
admin.declareQueue(queue); // 声明使用的队列
admin.declareExchange(exchange); // 声明交换空间
admin.declareBinding(binding); // 声明绑定
return admin;
}
}
3、
package com.yootk.producer.config;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Configuration
@PropertySource("classpath:config/amqp.properties") // 导入配置的资源文件
public class RabbitMQConfig {
@Value("${amqp.rabbitmq.addresses}")
private String addresses;
@Value("${amqp.rabbitmq.username}")
private String username;
@Value("${amqp.rabbitmq.password}")
private String password;
@Value("${amqp.rabbitmq.vhost}")
private String vhost;
@Value("${amqp.rabbitmq.exchange.name}")
private String exchangeName;
@Bean
public TaskScheduler batchQueueTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
// 物理内核 * 2 = 线程池的大小
taskScheduler.setPoolSize(16); // 任务池大小
return taskScheduler;
}
@Bean
public com.rabbitmq.client.ConnectionFactory amqpConnectionFactory() { // 连接工厂类
com.rabbitmq.client.ConnectionFactory factory =
new com.rabbitmq.client.ConnectionFactory();
factory.setUsername(this.username);
factory.setPassword(this.password);
factory.setVirtualHost(this.vhost);
return factory;
}
@Bean
public org.springframework.amqp.rabbit.connection.ConnectionFactory
springConnectionFactory(
com.rabbitmq.client.ConnectionFactory amqpConnectionFactory) {
org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory factory =
new org.springframework.amqp.rabbit.connection.PooledChannelConnectionFactory(
amqpConnectionFactory); // Spring连接工厂
factory.setAddresses(this.addresses); // 设置集群地址
return factory;
}
@Bean
public RetryTemplate retryTemplate() { // 重试配置
RetryTemplate template = new RetryTemplate(); // 重试模版类
ExponentialBackOffPolicy policy = new ExponentialBackOffPolicy();
policy.setInitialInterval(500); // 重试的间隔时间
policy.setMaxInterval(10000); // 重试的最大间隔
policy.setMultiplier(10.0); // 重试的倍数
template.setBackOffPolicy(policy);// 设置重拾策略
return template;
}
@Bean
public Exchange exchange() { // 定义Exchange
// 主题消息使用的Exchange实现类:TopicExchange
// 直连消息使用的Exchange实现类:DirectExchange
return new FanoutExchange(this.exchangeName);
}
@Bean // 最终消息的发送处理是由专属的模版类提供支持的
public AmqpTemplate amqpTemplate(
RetryTemplate retryTemplate,
org.springframework.amqp.rabbit.connection.ConnectionFactory springConnectionFactory,
TaskScheduler batchQueueTaskScheduler) {
int batchSize = 20; // 批量传输的大小
int bufferLimit = 4096; // 4K缓冲区
long timeout = 10000; // 发送延迟
BatchingStrategy strategy = new SimpleBatchingStrategy(
batchSize, bufferLimit, timeout); // 批量发送策略
BatchingRabbitTemplate template = new BatchingRabbitTemplate(
springConnectionFactory, strategy, batchQueueTaskScheduler);
template.setExchange(this.exchangeName); // 交换空间
template.setRetryTemplate(retryTemplate);
return template;
}
}
demo