RocketMQ-2
大约 45 分钟
RocketMQclient 基本使用
1、
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client
implementation group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.9.2'
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl
implementation group: 'org.apache.rocketmq', name: 'rocketmq-acl', version: '4.9.2'
2、
ext.versions = [ // 定义全部的依赖库版本号
rocketmq : '4.9.2', // RocketMQ的依赖库版本
]
ext.libraries = [ // 依赖库引入配置
// 以下的配置为RocketMQ相关服务的整合依赖
'rocketmq-client' : "org.apache.rocketmq:rocketmq-client:${versions.rocketmq}",
'rocketmq-acl' : "org.apache.rocketmq:rocketmq-acl:${versions.rocketmq}",
]
3、
project(":rocketmq") { // 部门微服务
dependencies {
implementation(libraries.'rocketmq-client')
implementation(libraries.'rocketmq-acl')
}
}
4、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessageConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("【%s】接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
5、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class MessageProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
}
producer.shutdown(); // 关闭生产者
}
}
消息生产模式
1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class MessageProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
final CountDownLatch countDownLatch = new CountDownLatch(3); // 发送等待
for (int x = 0; x < 3; x++) {// 3、采用循环的方式进行消息的生产
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
producer.send(msg, new SendCallback() { // 异步处理
@Override
public void onSuccess(SendResult result) {
System.out.printf("【消息发送】%s %n", result);
countDownLatch.countDown(); // 计数减少
}
@Override
public void onException(Throwable e) {
System.err.printf("【发送失败】%s %n", e.getMessage());
}
});
}
countDownLatch.await(); // 等待全部接收完成
producer.shutdown(); // 关闭生产者
}
}
2、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MessageOneWayProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
producer.sendOneway(msg); // 单向消息
}
producer.shutdown(); // 关闭生产者
}
}
消费模式

1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook);
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
consumer.start(); // 启动消费端
boolean running = true; // 定义一个轮询的标记
while (running) { // 持续抓取数据
List<MessageExt> messages = consumer.poll(); // 抓取消息
if (messages != null && messages.size() > 0) {
for (Message msg : messages) {
System.out.printf("%s%n", msg);
}
}
}
consumer.shutdown(); // 启动消息的消费者
}
}
业务标签

1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MessageTagProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
Message msg = null; // 定义消息对象
if (x % 2 == 0) { // 判断消息标签的配置
msg = new Message(TOPIC, "dept", ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
} else {
msg = new Message(TOPIC, "emp", ("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
}
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
}
producer.shutdown(); // 关闭生产者
}
}
2、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullDeptConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "dept"); // 匹配指定主题的所有消息
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("%s %n", msg);
}
}
}
consumer.shutdown();
}
}
3、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullEmpConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-emp"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "emp"); // 匹配指定主题的所有消息
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("%s %n", msg);
}
}
}
consumer.shutdown();
}
}
4、
System.out.printf("【%s】%s %n", msg.getTags(), new String(msg.getBody()));
5、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullTagConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("【%s】%s %n", msg.getTags(), new String(msg.getBody()));
}
}
}
consumer.shutdown();
}
}
消息识别码

1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MessageKeysProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
Message msg = null; // 定义消息对象
if (x % 2 == 0) { // 判断消息标签的配置
String keys = "yootk-dept-keys-" + Math.random(); // 消息KEY
msg = new Message(TOPIC, "dept", keys, ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
} else {
String keys = "yootk-emp-keys-" + Math.random(); // 消息KEY
msg = new Message(TOPIC, "emp", keys,("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
}
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
}
producer.shutdown(); // 关闭生产者
}
}
2、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullDeptConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-dept"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "dept"); // 匹配指定主题的所有消息
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("【%s】keys = %s、body = %s %n", msg.getTags(), msg.getKeys(), new String(msg.getBody()));
}
}
}
consumer.shutdown();
}
}
3、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullEmpConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-emp"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "emp"); // 匹配指定主题的所有消息
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("【%s】keys = %s、body = %s %n", msg.getTags(), msg.getKeys(), new String(msg.getBody()));
}
}
}
consumer.shutdown();
}
}
Namespace

1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullTagConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("【%s】keys = %s、namespace = %s、body = %s %n", msg.getTags(), msg.getKeys(), consumer.getNamespace(), new String(msg.getBody()));
}
}
}
consumer.shutdown();
}
}
2、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MessageKeysProducer { // 消息生产者
public static final String NAMESPACE = "MuyanYootk"; // 定义命名空间
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
producer.setNamespace(NAMESPACE); // 设置生产者命名空间
for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
Message msg = null; // 定义消息对象
if (x % 2 == 0) { // 判断消息标签的配置
String keys = "yootk-dept-keys-" + Math.random(); // 消息KEY
msg = new Message(TOPIC, "dept", keys, ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
} else {
String keys = "yootk-emp-keys-" + Math.random(); // 消息KEY
msg = new Message(TOPIC, "emp", keys,("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
}
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
}
producer.shutdown(); // 关闭生产者
}
}
3、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessagePullTagConsumer { // 消息消费者
public static final String NAMESPACE = "MuyanYootk"; // 定义命名空间
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
consumer.setNamespace(NAMESPACE); // 配置命名空间
consumer.start(); // 启动消息的消费者
boolean running = true; // 接收的状态
while (running) { // 处理了一个死循环的概念
List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
for (Message msg : messages) {
System.out.printf("【%s】keys = %s、namespace = %s、body = %s %n", msg.getTags(), msg.getKeys(), consumer.getNamespace(), new String(msg.getBody()));
}
}
}
consumer.shutdown();
}
}
4、
nohup /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/rocketmq-namesrv.log 2>&1 &
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &
java -jar /usr/local/rocketmq/rocketmq-console-ng-1.0.1.jar > /usr/local/rocketmq/logs/rocketmq-console.log 2>&1 &
消息广播

1、
package com.yootk.rockemq.broadcast;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class BroadcastMessageConsumerA { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "broadcast-group-a"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
2、
package com.yootk.rockemq.broadcast;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class BroadcastMessageConsumerB { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "broadcast-group-b"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("【%s】B、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
3、
package com.yootk.rockemq.broadcast;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class BroadcastMessageConsumerC { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "broadcast-group-c"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("【%s】C、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
4、
package com.yootk.rockemq.broadcast;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class BroadcastMessageConsumerD1 { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "broadcast-group-d"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】D1、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
5、
package com.yootk.rockemq.broadcast;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class BroadcastMessageConsumerD2 { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "broadcast-group-d"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】D2、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
消息排序

1、
package com.yootk.rockemq.orders;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class OrdersMessageConsumerA { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "orders-group-a"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "tagA||tagC"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
2、
package com.yootk.rockemq.orders;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class OrdersMessageConsumerB { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "orders-group-b"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "tagB||tagE"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
3、
package com.yootk.rockemq.orders;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class OrdersMessageConsumerC { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "orders-group-c"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "tagD"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】C、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
4、
package com.yootk.rockemq.orders;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OrdersMessageOneWayProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group-orders"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static final String TAGS[] = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; // 业务标签
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 10; x++) {// 3、采用循环的方式进行消息的生产
Message msg = new Message(TOPIC, TAGS[x % TAGS.length], ("【" + String.format("%03d", x) + "】" + "沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
producer.sendOneway(msg); // 单向消息
}
producer.shutdown(); // 关闭生产者
}
}
5、
package com.yootk.rockemq.orders;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class OrdersMessageOneWayProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group-orders"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static final String TAGS[] = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; // 业务标签
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
for (int x = 0; x < 10; x++) {// 3、采用循环的方式进行消息的生产
Message msg = new Message(TOPIC, TAGS[x % TAGS.length], ("【" + String.format("%03d", x) + "】" + "沐言科技:www.yootk.com")
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
producer.sendOneway(msg, new MessageQueueSelector() {// 单向消息
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int index = ((Integer) arg) % mqs.size(); // 计算队列
return mqs.get(index);
}
}, x % TAGS.length); // 设置一个处理的参数
}
producer.shutdown(); // 关闭生产者
}
}
延迟消息

1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class MessageProducerDelay { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
msg.setDelayTimeLevel(2); // 是里面等级顺序,10秒后
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
producer.shutdown(); // 关闭生产者
}
}
消息过滤
1、
vi /usr/local/rocketmq/conf/broker.conf
2、
enablePropertyFilter=true
3、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &
4、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessageConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, MessageSelector.bySql("level BETWEEN 8 AND 12")); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("【%s】接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
5、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class MessageProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
msg.putUserProperty("level", "10"); // 设置一个属性的内容
SendResult result = producer.send(msg); // 发送并接收发送的结果
System.out.printf("【消息发送】%s %n", result);
producer.shutdown(); // 关闭生产者
}
}
消息批处理
1、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessageConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
2、
package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class MessageBatchProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
// 3、如果要想进行批处理的分配,一般都是通过List集合保存批量消息数据的
List<Message> messages = new ArrayList<>(); // 保存批处理消息
for (int x = 0; x < 100; x++) { // 循环数据的配置
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
messages.add(msg); // 将消息保存在List集合之中
}
SendResult result = producer.send(messages); // 消息批处理发送
System.out.printf("【消息批量发送】发送状态:%s %n", result.getSendStatus()); // 获取发送的结果
producer.shutdown(); // 关闭生产者
}
}
3、
package com.yootk.rockemq.util;
import org.apache.rocketmq.common.message.Message;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
// 每次拆分的时候会将完整的消息集合分为子集合的形式出现,每个子集合单独限制其大小
public class MessageListSplitter implements Iterator<List<Message>> {
private static final int SIZE_LIMIT = 1024 * 1024 * 4; // 固定的数据量
private List<Message> messages; // 定义集合列表
// 整个的处理过程之中是需要进行集合长度计算的,这个计算是依据当前迭代的结果完成的\
private int currentIndex; // 当前处理到的索引
public MessageListSplitter(List<Message> messages) {
this.messages = messages; // 集合数据的存储
}
@Override
public boolean hasNext() {
return this.currentIndex < this.messages.size(); // 后面还有数据
}
@Override
public List<Message> next() { // 获取每一次的部分集合数据
int startIndex = this.getBeginIndex(); // 获取开始的索引
int nextIndex = startIndex; // 保存下一个索引
int totalSize = 0; // 要发送数据的总长度
for (; nextIndex < this.messages.size(); nextIndex ++) { // 循环获取集合数据
Message message = this.messages.get(nextIndex); // 获取当前的数据
int messageSize = this.calcMessageSize(message); // 计算数据长度
if (messageSize + totalSize > SIZE_LIMIT) { // 分段
break;
} else {
totalSize += messageSize; // 保存的总长度
}
}
List<Message> subMessages = this.messages.subList(startIndex, nextIndex); // 截取子集合
this.currentIndex = nextIndex; // 当前的索引
return subMessages;
}
// 如果此时发送的消息量过大了,超过了整个的长度的定义,则进行消息的丢弃
public int getBeginIndex() { // 消息处理的时候需要配置一个截取索引
Message currentMessage = this.messages.get(this.currentIndex); // 获取当前的消息内容
int messageSize = this.calcMessageSize(currentMessage); // 计算长度
while (messageSize > SIZE_LIMIT) { // 保存的长度太大了(超大消息)
this.currentIndex += 1; // 考虑下一个开始的索引
Message message = this.messages.get(this.currentIndex);
messageSize = calcMessageSize(message);
}
return this.currentIndex; // 返回索引数据
}
public int calcMessageSize(Message message) { // 计算每个消息的长度
int tempSize = message.getTopic().length() + message.getBody().length; // 消息的长度
Map<String, String> properties = message.getProperties(); // 附加属性的处理
for (Map.Entry<String, String> entry : properties.entrySet()) {
tempSize += entry.getKey().length() + entry.getValue().length(); // 属性的长度计算
}
tempSize = tempSize + 20; // 一些元数据或者是日志信息
return tempSize; // 返回单个消息的长度
}
}
4、
package com.yootk.rockemq;
import com.yootk.rockemq.util.MessageListSplitter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class MessageBatchSplitProducer { // 消息生产者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicYootk"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、创建一个消息的生产者
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.start(); // 启动生产者
// 3、如果要想进行批处理的分配,一般都是通过List集合保存批量消息数据的
List<Message> messages = new ArrayList<>(); // 保存批处理消息
for (int x = 0; x < 1000; x++) { // 循环数据的配置
Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
.getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
messages.add(msg); // 将消息保存在List集合之中
}
MessageListSplitter messageListSplitter = new MessageListSplitter(messages);
while(messageListSplitter.hasNext()) { // 每次只取出部分的集合
SendResult result = producer.send(messageListSplitter.next()); // 消息批处理发送
System.out.printf("【消息批量发送】发送状态:%s %n", result.getSendStatus()); // 获取发送的结果
}
producer.shutdown(); // 关闭生产者
}
}
日志消息处理

1、
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client
implementation group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.9.2'
implementation group: 'org.apache.rocketmq', name: 'rocketmq-acl', version: '4.9.2'
implementation group: 'org.apache.rocketmq', name: 'rocketmq-logappender', version: '4.9.2'
// https://mvnrepository.com/artifact/ch.qos.logback/logback-core
implementation group: 'ch.qos.logback', name: 'logback-core', version: '1.2.6'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.32'
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.6'
2、
ext.versions = [ // 定义全部的依赖库版本号
springboot : '2.2.5.RELEASE', // SpringBoot版本号
springcloud : 'Hoxton.SR3', // SpringCloud版本号
alibabacloud : '2.2.1.RELEASE', // SpringCloudAlibaba版本号
lombok : '1.18.20', // Lombok版本号
junit : '5.6.3', // 配置JUnit测试工具的版本编号
junitPlatformLauncher: '1.6.3', // JUnit测试工具运行平台版本编号
mybatisPlus : '3.4.3', // MyBatisPlus的版本号
mysql : '8.0.25', // MySQL数据库驱动版本
druid : '1.2.6', // Druid版本号
swagger : '3.0.0', // Swagger版本号
nacos : '2.0.2', // Nacos版本号
httpclient : '4.5.13', // HttpClient版本号
feignHttpclient : '11.6', // FeignHttpClient版本号
sentinel : '1.8.2', // Sentinel版本号
caffeine : '3.0.4', // Caffeine缓存组件版本号
micrometer : '1.7.0', // Prometheus相关监控依赖,与服务部署的版本号相同
servlet : '4.0.1', // Servlet的依赖库
commonsCodec : '1.15', // codec依赖库
jjwt : '0.9.1', // jwt依赖库
jaxb : '2.3.0', // JAXB依赖库
admin : '2.3.0', // SpringBootAdmin依赖版本
rocketmq : '4.9.2', // RocketMQ的依赖库版本
logback : '1.2.6', // logback组件的依赖库版本
slf4j : '1.7.32', // SLF4J日志标准的依赖库版本
]
ext.libraries = [ // 依赖库引入配置
'spring-boot-gradle-plugin' :
"org.springframework.boot:spring-boot-gradle-plugin:${versions.springboot}",
'spring-cloud-dependencies' :
"org.springframework.cloud:spring-cloud-dependencies:${versions.springcloud}",
'spring-cloud-alibaba-dependencies':
"com.alibaba.cloud:spring-cloud-alibaba-dependencies:${versions.alibabacloud}",
// 以下的配置为与项目用例测试有关的依赖
'junit-jupiter-api' :
"org.junit.jupiter:junit-jupiter-api:${versions.junit}",
'junit-vintage-engine' :
"org.junit.vintage:junit-vintage-engine:${versions.junit}",
'junit-jupiter-engine' :
"org.junit.jupiter:junit-jupiter-engine:${versions.junit}",
'junit-platform-launcher' :
"org.junit.platform:junit-platform-launcher:${versions.junitPlatformLauncher}",
'junit-platform-engine' :
"org.junit.platform:junit-platform-engine:${versions.junitPlatformLauncher}",
'junit-jupiter-params' :
"org.junit.jupiter:junit-jupiter-params:${versions.junit}",
'junit-bom' : "org.junit:junit-bom:${versions.junit}",
'junit-platform-commons' :
"org.junit.platform:junit-platform-commons:${versions.junitPlatformLauncher}",
// 以下的配置为Lombok组件有关的依赖
'lombok' : "org.projectlombok:lombok:${versions.lombok}",
// 以下的配置为数据库开发有关的依赖
'mybatis-plus-boot-starter' : "com.baomidou:mybatis-plus-boot-starter:${versions.mybatisPlus}",
'mysql-connector-java' : "mysql:mysql-connector-java:${versions.mysql}",
'druid' : "com.alibaba:druid:${versions.druid}",
// 以下的配置为Swagger有关的依赖库
'springfox-boot-starter' : "io.springfox:springfox-boot-starter:${versions.swagger}",
// 以下的配置为Nacos有关的依赖库
'nacos-client' : "com.alibaba.nacos:nacos-client:${versions.nacos}",
// 以下的配置为Feign与HttpClient有关的依赖库
'httpclient' : "org.apache.httpcomponents:httpclient:${versions.httpclient}",
'feign-httpclient' : "io.github.openfeign:feign-httpclient:${versions.feignHttpclient}",
// 以下的配置为Sentinel有关的组件依赖
'sentinel-datasource-nacos' : "com.alibaba.csp:sentinel-datasource-nacos:${versions.sentinel}",
// 以下的配置为LoadBalancer所需要的Caffeine组件有关依赖
'caffeine' : "com.github.ben-manes.caffeine:caffeine:${versions.caffeine}",
// 以下的配置为Prometheus服务整合
'micrometer-registry-prometheus': "io.micrometer:micrometer-registry-prometheus:${versions.micrometer}",
'micrometer-core': "io.micrometer:micrometer-core:${versions.micrometer}",
// 以下的配置为JWT的服务整合
'servlet-api' : "javax.servlet:javax.servlet-api:${versions.servlet}",
'commons-codec' : "commons-codec:commons-codec:${versions.commonsCodec}",
'jjwt' : "io.jsonwebtoken:jjwt:${versions.jjwt}",
'jaxb-api' : "javax.xml.bind:jaxb-api:${versions.jaxb}",
'jaxb-impl' : "com.sun.xml.bind:jaxb-impl:${versions.jaxb}",
'jaxb-core' : "com.sun.xml.bind:jaxb-core:${versions.jaxb}",
// 以下的配置为SpringBootAdmin服务整合
'spring-boot-admin-starter-server' : "de.codecentric:spring-boot-admin-starter-server:${versions.admin}",
'spring-boot-admin-starter-client' : "de.codecentric:spring-boot-admin-starter-client:${versions.admin}",
// 以下的配置为RocketMQ相关服务的整合依赖
'rocketmq-client' : "org.apache.rocketmq:rocketmq-client:${versions.rocketmq}",
'rocketmq-acl' : "org.apache.rocketmq:rocketmq-acl:${versions.rocketmq}",
// 以下的配置为RocketMQ与Logback日志组件的服务整合依赖:
'rocketmq-logappender' : "org.apache.rocketmq:rocketmq-logappender:${versions.rocketmq}",
'logback-core' : "ch.qos.logback:logback-core:${versions.logback}",
'logback-classic' : "ch.qos.logback:logback-classic:${versions.logback}",
'slf4j-api' : "org.slf4j:slf4j-api:${versions.slf4j}",
]
3、
project(":rocketmq-logback") { // 部门微服务
dependencies {
implementation(libraries.'rocketmq-client')
implementation(libraries.'rocketmq-acl')
implementation(libraries.'rocketmq-logappender')
implementation(libraries.'logback-core')
implementation(libraries.'logback-classic')
implementation(libraries.'slf4j-api')
}
}
4、
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOG_HOME" value="c:/log" /> <!-- 日志保存目录 -->
<appender name="STDOUT"
class="ch.qos.logback.core.ConsoleAppender"> <!-- 控制台输出 -->
<Encoding>UTF-8</Encoding> <!-- 日志编码 -->
<layout class="ch.qos.logback.classic.PatternLayout"> <!-- 日志格式 -->
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</layout>
</appender>
<appender name="FILE"
class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 本地日志文件 -->
<Encoding>UTF-8</Encoding> <!-- 日志编码 -->
<!-- 定义日志文件的生成结构,将每天的日志保存在一个文件之中,最多保留30天日志 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${LOG_HOME}/rocketmq.%d{yyyy-MM-dd}.log</FileNamePattern>
<MaxHistory>30</MaxHistory>
</rollingPolicy>
<layout class="ch.qos.logback.classic.PatternLayout"> <!-- 日志格式 -->
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</layout>
</appender>
<!-- 定义RocketMQ连接的相关属性内容,注意:在RocketMQLogback中没有ACL配置支持 -->
<appender name="RocketMQAppender"
class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
<tag>logback</tag> <!-- 消息标签 -->
<topic>TopicLogback</topic> <!-- 消息主题 -->
<producerGroup>logback-group</producerGroup> <!-- 消息分组 -->
<nameServerAddress>rocketmq-server:9876</nameServerAddress> <!-- NameServer -->
<layout><pattern>%date %p %t - %m%n</pattern></layout> <!-- 消息格式 -->
</appender>
<appender name="RocketMQAsyncAppender"
class="ch.qos.logback.classic.AsyncAppender"> <!-- 异步消息 -->
<queueSize>1024</queueSize> <!-- 阻塞队列长度 -->
<discardingThreshold>80</discardingThreshold> <!-- 丢弃阈值 -->
<maxFlushTime>2000</maxFlushTime> <!-- 刷新时间 -->
<neverBlock>true</neverBlock> <!-- 异步处理 -->
<appender-ref ref="RocketMQAppender"/> <!-- 配置引用 -->
</appender>
<logger name="com.yootk" level="debug"/> <!-- 日志级别 -->
<root level="DEBUG"> <!-- 日志级别 -->
<appender-ref ref="RocketMQAppender" /> <!-- 消息日志 -->
<appender-ref ref="STDOUT" /> <!-- 控制台日志 -->
<appender-ref ref="FILE" /> <!-- 文件日志 -->
</root>
</configuration>
5、package com.yootk.rockemq;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class MessageLogbackConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group-logback"; // 定义消费组
public static final String TOPIC = "TopicLogback"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}
6、
package com.yootk.rocketmq.logback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogbackProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerFactory.class);
public static void main(String[] args) {
LOGGER.info("【INFO】沐言科技:www.yootk.com");
LOGGER.error("【ERROR】沐言科技:www.yootk.com");
LOGGER.debug("【DEBUG】沐言科技:www.yootk.com");
}
}
事务消息
1、
vi /usr/local/rocketmq/conf/broker.conf
transactionCheckMax=15
transactionCheckInterval=10000
transactionTimeOut=6000
2、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &
3、
package com.yootk.rockemq.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener { // 事务消息监听
private AtomicInteger transactionIndex = new AtomicInteger(0); // 操作计数
// 保存每一个事务消息对应的业务的处理状态(KEY = 事务ID、VALUE=业务标记状态)
private ConcurrentHashMap<String, Integer> localTransMap = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 此时需要在此方法里面编写具体的分布式业务更新方法调用代码
int value = this.transactionIndex.getAndIncrement(); // 获取一个标记值
int status = value % 3; // 每次调用此方法都会计算一个status内容
this.localTransMap.put(msg.getTransactionId(), status); // 保存数据
System.out.println("【executeLocalTransaction()】调用微服务方法:" + this.localTransMap);
return LocalTransactionState.UNKNOW; // 执行分布式业务处理的时候不知道是否成功或失败
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.err.println("【checkLocalTransaction()】检查事务状态:" + this.localTransMap);
Integer status = this.localTransMap.get(msg.getTransactionId()); // 获取事务消息的ID
if (status != null) { // 获取到了事务的内容
switch(status) {
case 0: // 不知道的状态
int value = this.transactionIndex.getAndIncrement(); // 模拟业务处理
status = value % 3; // 每次调用此方法都会计算一个status内容
this.localTransMap.put(msg.getTransactionId(), status); // 保存数据
return LocalTransactionState.UNKNOW;
case 1: // 成功的状态
return LocalTransactionState.COMMIT_MESSAGE; // 整个的事务成功
case 2: // 表示失败的状态
return LocalTransactionState.ROLLBACK_MESSAGE; // 整个的事务需要回滚
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4、
package com.yootk.rockemq.transaction;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class TransactionMessageProducer { // 消息生产者
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicTransaction"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、事务消息创建的核心的主题在于TransactionListener接口实例的配置
TransactionListener transactionListener = new TransactionListenerImpl();
// 3、创建事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, clientHook);
producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
producer.setExecutorService(Executors.newFixedThreadPool(8)); // 配置线程池
producer.setTransactionListener(transactionListener); // 事务监听类
producer.start(); // 启动生产者
sendDeptMessage(producer); // 发送部门的消息
sendEmpMessage(producer); // 发送雇员的消息
TimeUnit.MINUTES.sleep(Long.MAX_VALUE); // 保持运行状态不关闭
producer.shutdown(); // 关闭生产者
}
public static void sendDeptMessage(TransactionMQProducer producer) throws Exception {
Message msg = new Message(TOPIC, "dept", "【DEPT】增加新部门事务".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送事务消息的时候需要设置一个业务的标记,同时要返回有具体的事务消息处理的结果
TransactionSendResult result = producer.sendMessageInTransaction(msg, "DeptProvider");
TimeUnit.MICROSECONDS.sleep(10); // 模拟分布式的业务调用
}
public static void sendEmpMessage(TransactionMQProducer producer) throws Exception {
for (int x = 0; x < 3; x++) { // 循环配置多个消息
Message msg = new Message(TOPIC, "emp", ("【EMP】增加新雇员事务 - " + x).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送事务消息的时候需要设置一个业务的标记,同时要返回有具体的事务消息处理的结果
TransactionSendResult result = producer.sendMessageInTransaction(msg, "EmpProvider");
TimeUnit.MICROSECONDS.sleep(10); // 模拟分布式的业务调用
}
}
}
5、
package com.yootk.rockemq.transaction;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class TransactionMessageConsumer { // 消息消费者
// 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
public static final String TOPIC = "TopicTransaction"; // 定义主题名称
public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
public static final String SECRET_KEY = "helloyootk"; // 定义密码
public static void main(String[] args) throws Exception { // 懒人必备的处理形式
// 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
// 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
// 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
msgs.forEach((msg)->{
System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
}
});
consumer.start(); // 启动消息的消费者
}
}