跳至主要內容

RocketMQ-2

wangdx大约 45 分钟

RocketMQclient 基本使用

1、
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client
implementation group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.9.2'
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl
implementation group: 'org.apache.rocketmq', name: 'rocketmq-acl', version: '4.9.2'


2、
ext.versions = [                // 定义全部的依赖库版本号
    rocketmq             : '4.9.2', // RocketMQ的依赖库版本
]
ext.libraries = [            // 依赖库引入配置
    // 以下的配置为RocketMQ相关服务的整合依赖
    'rocketmq-client'                   : "org.apache.rocketmq:rocketmq-client:${versions.rocketmq}",
    'rocketmq-acl'                      : "org.apache.rocketmq:rocketmq-acl:${versions.rocketmq}",
]


3、
project(":rocketmq") {    // 部门微服务
    dependencies {
        implementation(libraries.'rocketmq-client')
        implementation(libraries.'rocketmq-acl')
    }
}

4、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessageConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("【%s】接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


5、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class MessageProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            SendResult result = producer.send(msg); // 发送并接收发送的结果
            System.out.printf("【消息发送】%s %n", result);
        }
        producer.shutdown(); // 关闭生产者
    }
}

消息生产模式

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class MessageProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        final CountDownLatch countDownLatch = new CountDownLatch(3); // 发送等待
        for (int x = 0; x < 3; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            producer.send(msg, new SendCallback() { // 异步处理
                @Override
                public void onSuccess(SendResult result) {
                    System.out.printf("【消息发送】%s %n", result);
                    countDownLatch.countDown(); // 计数减少
                }
                @Override
                public void onException(Throwable e) {
                    System.err.printf("【发送失败】%s %n", e.getMessage());
                }
            });
        }
        countDownLatch.await(); // 等待全部接收完成
        producer.shutdown(); // 关闭生产者
    }
}


2、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageOneWayProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            producer.sendOneway(msg); // 单向消息
        }
        producer.shutdown(); // 关闭生产者
    }
}

消费模式

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook);
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        consumer.start(); // 启动消费端
        boolean running = true; // 定义一个轮询的标记
        while (running) {   // 持续抓取数据
            List<MessageExt> messages = consumer.poll(); // 抓取消息
            if (messages != null && messages.size() > 0) {
                for (Message msg : messages) {
                    System.out.printf("%s%n", msg);
                }
            }
        }

        consumer.shutdown(); // 启动消息的消费者
    }
}

业务标签

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageTagProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = null; // 定义消息对象
            if (x % 2 == 0) {   // 判断消息标签的配置
                msg = new Message(TOPIC, "dept", ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            } else {
                msg = new Message(TOPIC, "emp", ("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            }
            SendResult result = producer.send(msg); // 发送并接收发送的结果
            System.out.printf("【消息发送】%s %n", result);
        }
        producer.shutdown(); // 关闭生产者
    }
}

2、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullDeptConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "dept"); // 匹配指定主题的所有消息
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("%s %n", msg);
                }
            }
        }
        consumer.shutdown();
    }
}


3、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullEmpConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-emp"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "emp"); // 匹配指定主题的所有消息
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("%s %n", msg);
                }
            }
        }
        consumer.shutdown();
    }
}


4、
System.out.printf("【%s】%s %n", msg.getTags(), new String(msg.getBody()));

5、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullTagConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("【%s】%s %n", msg.getTags(), new String(msg.getBody()));
                }
            }
        }
        consumer.shutdown();
    }
}

消息识别码

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageKeysProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = null; // 定义消息对象
            if (x % 2 == 0) {   // 判断消息标签的配置
                String keys = "yootk-dept-keys-" + Math.random(); // 消息KEY
                msg = new Message(TOPIC, "dept", keys, ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            } else {
                String keys = "yootk-emp-keys-" + Math.random(); // 消息KEY
                msg = new Message(TOPIC, "emp", keys,("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            }
            SendResult result = producer.send(msg); // 发送并接收发送的结果
            System.out.printf("【消息发送】%s %n", result);
        }
        producer.shutdown(); // 关闭生产者
    }
}

2、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullDeptConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-dept"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "dept"); // 匹配指定主题的所有消息
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("【%s】keys = %s、body = %s %n", msg.getTags(), msg.getKeys(), new String(msg.getBody()));
                }
            }
        }
        consumer.shutdown();
    }
}


3、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullEmpConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-emp"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "emp"); // 匹配指定主题的所有消息
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("【%s】keys = %s、body = %s %n", msg.getTags(), msg.getKeys(), new String(msg.getBody()));
                }
            }
        }
        consumer.shutdown();
    }
}

Namespace

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullTagConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("【%s】keys = %s、namespace = %s、body = %s %n", msg.getTags(), msg.getKeys(), consumer.getNamespace(), new String(msg.getBody()));
                }
            }
        }
        consumer.shutdown();
    }
}


2、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageKeysProducer { // 消息生产者
    public static final String NAMESPACE = "MuyanYootk"; // 定义命名空间
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        producer.setNamespace(NAMESPACE); // 设置生产者命名空间
        for (int x = 0; x < 100; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = null; // 定义消息对象
            if (x % 2 == 0) {   // 判断消息标签的配置
                String keys = "yootk-dept-keys-" + Math.random(); // 消息KEY
                msg = new Message(TOPIC, "dept", keys, ("【部门消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            } else {
                String keys = "yootk-emp-keys-" + Math.random(); // 消息KEY
                msg = new Message(TOPIC, "emp", keys,("【雇员消息 - " + x + "】沐言科技:www.yootk.com")
                        .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            }
            SendResult result = producer.send(msg); // 发送并接收发送的结果
            System.out.printf("【消息发送】%s %n", result);
        }
        producer.shutdown(); // 关闭生产者
    }
}

3、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessagePullTagConsumer { // 消息消费者
    public static final String NAMESPACE = "MuyanYootk"; // 定义命名空间
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-tag"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(CONSUMER_GROUP, clientHook); // 使用PULL模式
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "emp||dept"); // 匹配两个标签
        consumer.setNamespace(NAMESPACE); // 配置命名空间
        consumer.start(); // 启动消息的消费者
        boolean running = true; // 接收的状态
        while (running) {   // 处理了一个死循环的概念
            List<MessageExt> messages = consumer.poll(); // 手工的进行消息的拉取
            if (messages != null) { // 因为可能拉取到消息,也可能没有拉取到
                for (Message msg : messages) {
                    System.out.printf("【%s】keys = %s、namespace = %s、body = %s %n", msg.getTags(), msg.getKeys(), consumer.getNamespace(), new String(msg.getBody()));
                }
            }
        }
        consumer.shutdown();
    }
}


4、
nohup /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/rocketmq-namesrv.log 2>&1 &
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &
java -jar /usr/local/rocketmq/rocketmq-console-ng-1.0.1.jar > /usr/local/rocketmq/logs/rocketmq-console.log 2>&1 &

消息广播

1、
package com.yootk.rockemq.broadcast;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class BroadcastMessageConsumerA { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "broadcast-group-a"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


2、
package com.yootk.rockemq.broadcast;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class BroadcastMessageConsumerB { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "broadcast-group-b"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("【%s】B、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


3、
package com.yootk.rockemq.broadcast;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class BroadcastMessageConsumerC { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "broadcast-group-c"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("【%s】C、接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


4、

package com.yootk.rockemq.broadcast;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class BroadcastMessageConsumerD1 { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "broadcast-group-d"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】D1、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}

5、
package com.yootk.rockemq.broadcast;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class BroadcastMessageConsumerD2 { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "broadcast-group-d"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】D2、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}

消息排序

1、
package com.yootk.rockemq.orders;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class OrdersMessageConsumerA { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "orders-group-a"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "tagA||tagC"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


2、

package com.yootk.rockemq.orders;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class OrdersMessageConsumerB { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "orders-group-b"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "tagB||tagE"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】A、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}

3、
package com.yootk.rockemq.orders;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class OrdersMessageConsumerC { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "orders-group-c"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "tagD"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】C、接收到新的消息:%s %n", Thread.currentThread().getName(), new String(msg.getBody()));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


4、

package com.yootk.rockemq.orders;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OrdersMessageOneWayProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group-orders"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static final String TAGS[] = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; // 业务标签
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 10; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = new Message(TOPIC, TAGS[x % TAGS.length], ("【" + String.format("%03d", x) + "】" + "沐言科技:www.yootk.com")
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            producer.sendOneway(msg); // 单向消息
        }
        producer.shutdown(); // 关闭生产者
    }
}
5、
package com.yootk.rockemq.orders;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class OrdersMessageOneWayProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group-orders"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static final String TAGS[] = new String[] {"tagA", "tagB", "tagC", "tagD", "tagE"}; // 业务标签
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        for (int x = 0; x < 10; x++) {// 3、采用循环的方式进行消息的生产
            Message msg = new Message(TOPIC, TAGS[x % TAGS.length], ("【" + String.format("%03d", x) + "】" + "沐言科技:www.yootk.com")
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            producer.sendOneway(msg, new MessageQueueSelector() {// 单向消息
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int index = ((Integer) arg) % mqs.size(); // 计算队列
                    return mqs.get(index);
                }
            }, x % TAGS.length); // 设置一个处理的参数
        }
        producer.shutdown(); // 关闭生产者
    }
}

延迟消息

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class MessageProducerDelay { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
        msg.setDelayTimeLevel(2); // 是里面等级顺序,10秒后
        SendResult result = producer.send(msg); // 发送并接收发送的结果
        System.out.printf("【消息发送】%s %n", result);
        producer.shutdown(); // 关闭生产者
    }
}

消息过滤

1、
vi /usr/local/rocketmq/conf/broker.conf


2、
enablePropertyFilter=true

3、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &

4、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessageConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, MessageSelector.bySql("level BETWEEN 8 AND 12")); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("【%s】接收到新的消息:%s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


5、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class MessageProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
        msg.putUserProperty("level", "10"); // 设置一个属性的内容
        SendResult result = producer.send(msg); // 发送并接收发送的结果
        System.out.printf("【消息发送】%s %n", result);
        producer.shutdown(); // 关闭生产者
    }
}

消息批处理

1、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessageConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


2、
package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class MessageBatchProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        // 3、如果要想进行批处理的分配,一般都是通过List集合保存批量消息数据的
        List<Message> messages = new ArrayList<>(); // 保存批处理消息
        for (int x = 0; x < 100; x++) { // 循环数据的配置
            Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            messages.add(msg); // 将消息保存在List集合之中
        }
        SendResult result = producer.send(messages); // 消息批处理发送
        System.out.printf("【消息批量发送】发送状态:%s %n", result.getSendStatus()); // 获取发送的结果
        producer.shutdown(); // 关闭生产者
    }
}

3、
package com.yootk.rockemq.util;

import org.apache.rocketmq.common.message.Message;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

// 每次拆分的时候会将完整的消息集合分为子集合的形式出现,每个子集合单独限制其大小
public class MessageListSplitter implements Iterator<List<Message>> {
    private static final int SIZE_LIMIT = 1024 * 1024 * 4; // 固定的数据量
    private List<Message> messages; // 定义集合列表
    // 整个的处理过程之中是需要进行集合长度计算的,这个计算是依据当前迭代的结果完成的\
    private int currentIndex; // 当前处理到的索引
    public MessageListSplitter(List<Message> messages) {
        this.messages = messages; // 集合数据的存储
    }
    @Override
    public boolean hasNext() {
        return this.currentIndex < this.messages.size(); // 后面还有数据
    }

    @Override
    public List<Message> next() { // 获取每一次的部分集合数据
        int startIndex = this.getBeginIndex(); // 获取开始的索引
        int nextIndex = startIndex; // 保存下一个索引
        int totalSize = 0; // 要发送数据的总长度
        for (; nextIndex < this.messages.size(); nextIndex ++) {    // 循环获取集合数据
            Message message = this.messages.get(nextIndex); // 获取当前的数据
            int messageSize = this.calcMessageSize(message); // 计算数据长度
            if (messageSize + totalSize > SIZE_LIMIT) { // 分段
                break;
            } else {
                totalSize += messageSize; // 保存的总长度
            }
        }
        List<Message> subMessages = this.messages.subList(startIndex, nextIndex); // 截取子集合
        this.currentIndex = nextIndex; // 当前的索引
        return subMessages;
    }
    // 如果此时发送的消息量过大了,超过了整个的长度的定义,则进行消息的丢弃
    public int getBeginIndex() {    // 消息处理的时候需要配置一个截取索引
        Message currentMessage = this.messages.get(this.currentIndex); // 获取当前的消息内容
        int messageSize = this.calcMessageSize(currentMessage); // 计算长度
        while (messageSize > SIZE_LIMIT) {  // 保存的长度太大了(超大消息)
            this.currentIndex += 1; // 考虑下一个开始的索引
            Message message = this.messages.get(this.currentIndex);
            messageSize = calcMessageSize(message);
        }
        return this.currentIndex; // 返回索引数据
    }
    public int calcMessageSize(Message message) {   // 计算每个消息的长度
        int tempSize = message.getTopic().length() + message.getBody().length; // 消息的长度
        Map<String, String> properties = message.getProperties(); // 附加属性的处理
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            tempSize += entry.getKey().length() + entry.getValue().length(); // 属性的长度计算
        }
        tempSize = tempSize + 20; // 一些元数据或者是日志信息
        return tempSize; // 返回单个消息的长度
    }
}


4、
package com.yootk.rockemq;

import com.yootk.rockemq.util.MessageListSplitter;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class MessageBatchSplitProducer { // 消息生产者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicYootk"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、创建一个消息的生产者
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.start(); // 启动生产者
        // 3、如果要想进行批处理的分配,一般都是通过List集合保存批量消息数据的
        List<Message> messages = new ArrayList<>(); // 保存批处理消息
        for (int x = 0; x < 1000; x++) { // 循环数据的配置
            Message msg = new Message(TOPIC, "沐言科技:www.yootk.com"
                    .getBytes(RemotingHelper.DEFAULT_CHARSET)); // 消息需要二进制
            messages.add(msg); // 将消息保存在List集合之中
        }
        MessageListSplitter messageListSplitter = new MessageListSplitter(messages);
        while(messageListSplitter.hasNext()) { // 每次只取出部分的集合
            SendResult result = producer.send(messageListSplitter.next()); // 消息批处理发送
            System.out.printf("【消息批量发送】发送状态:%s %n", result.getSendStatus()); // 获取发送的结果
        }
        producer.shutdown(); // 关闭生产者
    }
}

日志消息处理

1、
// https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client
implementation group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.9.2'
implementation group: 'org.apache.rocketmq', name: 'rocketmq-acl', version: '4.9.2'
implementation group: 'org.apache.rocketmq', name: 'rocketmq-logappender', version: '4.9.2'

// https://mvnrepository.com/artifact/ch.qos.logback/logback-core
implementation group: 'ch.qos.logback', name: 'logback-core', version: '1.2.6'
// https://mvnrepository.com/artifact/org.slf4j/slf4j-api
implementation group: 'org.slf4j', name: 'slf4j-api', version: '1.7.32'
// https://mvnrepository.com/artifact/ch.qos.logback/logback-classic
testImplementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.6'




2、

ext.versions = [                // 定义全部的依赖库版本号
    springboot           : '2.2.5.RELEASE',      // SpringBoot版本号
    springcloud          : 'Hoxton.SR3', // SpringCloud版本号
    alibabacloud         : '2.2.1.RELEASE', // SpringCloudAlibaba版本号
    lombok               : '1.18.20', // Lombok版本号
    junit                : '5.6.3', // 配置JUnit测试工具的版本编号
    junitPlatformLauncher: '1.6.3',  // JUnit测试工具运行平台版本编号
    mybatisPlus          : '3.4.3', // MyBatisPlus的版本号
    mysql                : '8.0.25', // MySQL数据库驱动版本
    druid                : '1.2.6', // Druid版本号
    swagger              : '3.0.0', // Swagger版本号
    nacos                : '2.0.2', // Nacos版本号
    httpclient           : '4.5.13', // HttpClient版本号
    feignHttpclient      : '11.6', // FeignHttpClient版本号
    sentinel             : '1.8.2', // Sentinel版本号
    caffeine             : '3.0.4', // Caffeine缓存组件版本号
    micrometer           : '1.7.0', // Prometheus相关监控依赖,与服务部署的版本号相同
    servlet              : '4.0.1', // Servlet的依赖库
    commonsCodec         : '1.15', // codec依赖库
    jjwt                 : '0.9.1', // jwt依赖库
    jaxb                 : '2.3.0', // JAXB依赖库
    admin                : '2.3.0', // SpringBootAdmin依赖版本
    rocketmq             : '4.9.2', // RocketMQ的依赖库版本
    logback              : '1.2.6', // logback组件的依赖库版本
    slf4j                : '1.7.32', // SLF4J日志标准的依赖库版本
]
ext.libraries = [            // 依赖库引入配置
     'spring-boot-gradle-plugin'        :
             "org.springframework.boot:spring-boot-gradle-plugin:${versions.springboot}",
     'spring-cloud-dependencies'        :
             "org.springframework.cloud:spring-cloud-dependencies:${versions.springcloud}",
     'spring-cloud-alibaba-dependencies':
             "com.alibaba.cloud:spring-cloud-alibaba-dependencies:${versions.alibabacloud}",
     // 以下的配置为与项目用例测试有关的依赖
     'junit-jupiter-api'                :
             "org.junit.jupiter:junit-jupiter-api:${versions.junit}",
     'junit-vintage-engine'             :
             "org.junit.vintage:junit-vintage-engine:${versions.junit}",
     'junit-jupiter-engine'             :
             "org.junit.jupiter:junit-jupiter-engine:${versions.junit}",
     'junit-platform-launcher'          :
             "org.junit.platform:junit-platform-launcher:${versions.junitPlatformLauncher}",
     'junit-platform-engine'            :
             "org.junit.platform:junit-platform-engine:${versions.junitPlatformLauncher}",
     'junit-jupiter-params'             :
             "org.junit.jupiter:junit-jupiter-params:${versions.junit}",
     'junit-bom'                        : "org.junit:junit-bom:${versions.junit}",
     'junit-platform-commons'           :
             "org.junit.platform:junit-platform-commons:${versions.junitPlatformLauncher}",
     // 以下的配置为Lombok组件有关的依赖
     'lombok'                           : "org.projectlombok:lombok:${versions.lombok}",
     // 以下的配置为数据库开发有关的依赖
     'mybatis-plus-boot-starter'        : "com.baomidou:mybatis-plus-boot-starter:${versions.mybatisPlus}",
     'mysql-connector-java'             : "mysql:mysql-connector-java:${versions.mysql}",
     'druid'                            : "com.alibaba:druid:${versions.druid}",
     // 以下的配置为Swagger有关的依赖库
    'springfox-boot-starter'            : "io.springfox:springfox-boot-starter:${versions.swagger}",
     // 以下的配置为Nacos有关的依赖库
    'nacos-client'                      : "com.alibaba.nacos:nacos-client:${versions.nacos}",
    // 以下的配置为Feign与HttpClient有关的依赖库
    'httpclient'                        : "org.apache.httpcomponents:httpclient:${versions.httpclient}",
    'feign-httpclient'                  : "io.github.openfeign:feign-httpclient:${versions.feignHttpclient}",
    // 以下的配置为Sentinel有关的组件依赖
    'sentinel-datasource-nacos'         : "com.alibaba.csp:sentinel-datasource-nacos:${versions.sentinel}",
    // 以下的配置为LoadBalancer所需要的Caffeine组件有关依赖
    'caffeine'                          : "com.github.ben-manes.caffeine:caffeine:${versions.caffeine}",
    // 以下的配置为Prometheus服务整合
    'micrometer-registry-prometheus': "io.micrometer:micrometer-registry-prometheus:${versions.micrometer}",
    'micrometer-core': "io.micrometer:micrometer-core:${versions.micrometer}",
    // 以下的配置为JWT的服务整合
    'servlet-api'                       : "javax.servlet:javax.servlet-api:${versions.servlet}",
    'commons-codec'                     : "commons-codec:commons-codec:${versions.commonsCodec}",
    'jjwt'                              : "io.jsonwebtoken:jjwt:${versions.jjwt}",
    'jaxb-api'                          : "javax.xml.bind:jaxb-api:${versions.jaxb}",
    'jaxb-impl'                         : "com.sun.xml.bind:jaxb-impl:${versions.jaxb}",
    'jaxb-core'                         : "com.sun.xml.bind:jaxb-core:${versions.jaxb}",
    // 以下的配置为SpringBootAdmin服务整合
    'spring-boot-admin-starter-server'  : "de.codecentric:spring-boot-admin-starter-server:${versions.admin}",
    'spring-boot-admin-starter-client'  : "de.codecentric:spring-boot-admin-starter-client:${versions.admin}",
    // 以下的配置为RocketMQ相关服务的整合依赖
    'rocketmq-client'                   : "org.apache.rocketmq:rocketmq-client:${versions.rocketmq}",
    'rocketmq-acl'                      : "org.apache.rocketmq:rocketmq-acl:${versions.rocketmq}",
    // 以下的配置为RocketMQ与Logback日志组件的服务整合依赖:
    'rocketmq-logappender'              : "org.apache.rocketmq:rocketmq-logappender:${versions.rocketmq}",
    'logback-core'                      : "ch.qos.logback:logback-core:${versions.logback}",
    'logback-classic'                   : "ch.qos.logback:logback-classic:${versions.logback}",
    'slf4j-api'                         : "org.slf4j:slf4j-api:${versions.slf4j}",
]



3、
project(":rocketmq-logback") {    // 部门微服务
    dependencies {
        implementation(libraries.'rocketmq-client')
        implementation(libraries.'rocketmq-acl')
        implementation(libraries.'rocketmq-logappender')
        implementation(libraries.'logback-core')
        implementation(libraries.'logback-classic')
        implementation(libraries.'slf4j-api')
    }
}

4、


<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="c:/log" />			<!-- 日志保存目录 -->
    <appender name="STDOUT"
	class="ch.qos.logback.core.ConsoleAppender">		<!-- 控制台输出 -->
        <Encoding>UTF-8</Encoding> 					<!-- 日志编码 -->
        <layout class="ch.qos.logback.classic.PatternLayout">	<!-- 日志格式 -->
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </layout>
    </appender>
    <appender name="FILE"
              class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 本地日志文件 -->
        <Encoding>UTF-8</Encoding> 					<!-- 日志编码 -->
        <!-- 定义日志文件的生成结构,将每天的日志保存在一个文件之中,最多保留30天日志 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <FileNamePattern>${LOG_HOME}/rocketmq.%d{yyyy-MM-dd}.log</FileNamePattern>
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <layout class="ch.qos.logback.classic.PatternLayout">	<!-- 日志格式 -->
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </layout>
    </appender>
    <!-- 定义RocketMQ连接的相关属性内容,注意:在RocketMQLogback中没有ACL配置支持 -->
    <appender name="RocketMQAppender"
              class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
        <tag>logback</tag> 					<!-- 消息标签 -->
        <topic>TopicLogback</topic> 				<!-- 消息主题 -->
        <producerGroup>logback-group</producerGroup> 		<!-- 消息分组 -->
        <nameServerAddress>rocketmq-server:9876</nameServerAddress> 	<!-- NameServer -->
        <layout><pattern>%date %p %t - %m%n</pattern></layout>	<!-- 消息格式 -->
    </appender>
    <appender name="RocketMQAsyncAppender"
              class="ch.qos.logback.classic.AsyncAppender"> 		<!-- 异步消息 -->
        <queueSize>1024</queueSize> 				<!-- 阻塞队列长度 -->
        <discardingThreshold>80</discardingThreshold> 		<!-- 丢弃阈值 -->
        <maxFlushTime>2000</maxFlushTime> 				<!-- 刷新时间 -->
        <neverBlock>true</neverBlock> 				<!-- 异步处理 -->
        <appender-ref ref="RocketMQAppender"/> 			<!-- 配置引用 -->
    </appender>
    <logger name="com.yootk" level="debug"/> 			<!-- 日志级别 -->
    <root level="DEBUG"> 						<!-- 日志级别 -->
        <appender-ref ref="RocketMQAppender" /> 			<!-- 消息日志 -->
        <appender-ref ref="STDOUT" /> 				<!-- 控制台日志 -->
        <appender-ref ref="FILE" /> 				<!-- 文件日志 -->
    </root>
</configuration>

5、package com.yootk.rockemq;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class MessageLogbackConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group-logback"; // 定义消费组
    public static final String TOPIC = "TopicLogback"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}


6、
package com.yootk.rocketmq.logback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogbackProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerFactory.class);
    public static void main(String[] args) {
        LOGGER.info("【INFO】沐言科技:www.yootk.com");
        LOGGER.error("【ERROR】沐言科技:www.yootk.com");
        LOGGER.debug("【DEBUG】沐言科技:www.yootk.com");
    }
}

事务消息

1、
vi /usr/local/rocketmq/conf/broker.conf

transactionCheckMax=15
transactionCheckInterval=10000
transactionTimeOut=6000


2、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &

3、
package com.yootk.rockemq.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class TransactionListenerImpl implements TransactionListener { // 事务消息监听
    private AtomicInteger transactionIndex = new AtomicInteger(0); // 操作计数
    // 保存每一个事务消息对应的业务的处理状态(KEY = 事务ID、VALUE=业务标记状态)
    private ConcurrentHashMap<String, Integer> localTransMap = new ConcurrentHashMap<>();
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 此时需要在此方法里面编写具体的分布式业务更新方法调用代码
        int value = this.transactionIndex.getAndIncrement(); // 获取一个标记值
        int status = value % 3; // 每次调用此方法都会计算一个status内容
        this.localTransMap.put(msg.getTransactionId(), status); // 保存数据
        System.out.println("【executeLocalTransaction()】调用微服务方法:" + this.localTransMap);
        return LocalTransactionState.UNKNOW; // 执行分布式业务处理的时候不知道是否成功或失败
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.err.println("【checkLocalTransaction()】检查事务状态:" + this.localTransMap);
        Integer status = this.localTransMap.get(msg.getTransactionId()); // 获取事务消息的ID
        if (status != null) {   // 获取到了事务的内容
            switch(status) {
                case 0: // 不知道的状态
                    int value = this.transactionIndex.getAndIncrement(); // 模拟业务处理
                    status = value % 3; // 每次调用此方法都会计算一个status内容
                    this.localTransMap.put(msg.getTransactionId(), status); // 保存数据
                    return LocalTransactionState.UNKNOW;
                case 1: // 成功的状态
                    return LocalTransactionState.COMMIT_MESSAGE; // 整个的事务成功
                case 2: // 表示失败的状态
                    return LocalTransactionState.ROLLBACK_MESSAGE; // 整个的事务需要回滚
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}


4、

package com.yootk.rockemq.transaction;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class TransactionMessageProducer { // 消息生产者
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String PRODUCER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicTransaction"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码

    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、事务消息创建的核心的主题在于TransactionListener接口实例的配置
        TransactionListener transactionListener = new TransactionListenerImpl();
        // 3、创建事务消息的生产者
        TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, clientHook);
        producer.setNamesrvAddr(NAME_SERVER_LIST); // NameServer地址
        producer.setExecutorService(Executors.newFixedThreadPool(8)); // 配置线程池
        producer.setTransactionListener(transactionListener); // 事务监听类
        producer.start(); // 启动生产者
        sendDeptMessage(producer); // 发送部门的消息
        sendEmpMessage(producer); // 发送雇员的消息
        TimeUnit.MINUTES.sleep(Long.MAX_VALUE); // 保持运行状态不关闭
        producer.shutdown(); // 关闭生产者
    }

    public static void sendDeptMessage(TransactionMQProducer producer) throws Exception {
        Message msg = new Message(TOPIC, "dept", "【DEPT】增加新部门事务".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送事务消息的时候需要设置一个业务的标记,同时要返回有具体的事务消息处理的结果
        TransactionSendResult result = producer.sendMessageInTransaction(msg, "DeptProvider");
        TimeUnit.MICROSECONDS.sleep(10); // 模拟分布式的业务调用
    }

    public static void sendEmpMessage(TransactionMQProducer producer) throws Exception {
        for (int x = 0; x < 3; x++) { // 循环配置多个消息
            Message msg = new Message(TOPIC, "emp", ("【EMP】增加新雇员事务 - " + x).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送事务消息的时候需要设置一个业务的标记,同时要返回有具体的事务消息处理的结果
            TransactionSendResult result = producer.sendMessageInTransaction(msg, "EmpProvider");
            TimeUnit.MICROSECONDS.sleep(10); // 模拟分布式的业务调用
        }
    }
}
5、
package com.yootk.rockemq.transaction;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class TransactionMessageConsumer { // 消息消费者
    // 如果此时你使用的是集群服务,则每个主机之间使用“,”分割
    public static final String NAME_SERVER_LIST = "rocketmq-server:9876"; // NameServer
    public static final String CONSUMER_GROUP = "muyan-group"; // 定义消费组
    public static final String TOPIC = "TopicTransaction"; // 定义主题名称
    public static final String ACCESS_KEY = "RocketMQMuyan"; // 定义用户名
    public static final String SECRET_KEY = "helloyootk"; // 定义密码
    public static void main(String[] args) throws Exception { // 懒人必备的处理形式
        // 1、由于此时的RocketMQ启动了ACL安全认证的保护机制,所以需要配置相应的回调
        RPCHook clientHook = new AclClientRPCHook(new SessionCredentials(ACCESS_KEY, SECRET_KEY));
        // 2、要在项目之中启动一个消费者的处理程序类,但是这个消费者分为两种形式
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP, clientHook, new AllocateMessageQueueAveragely());
        consumer.setNamesrvAddr(NAME_SERVER_LIST); // 通过NameServer获取相应的Broker数据
        consumer.subscribe(TOPIC, "*"); // 匹配指定主题的所有消息
        // 3、进行消息监听的处理操作,在监听的时候要使用专属的监听接口
        consumer.registerMessageListener(new MessageListenerConcurrently() { // 准备接收消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach((msg)->{
                    System.out.printf("【%s】接收到新的消息:body = %s、level = %s %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getProperty("level"));
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消息消费成功
            }
        });
        consumer.start(); // 启动消息的消费者
    }
}

上次编辑于: