跳至主要內容

RocketMQ

wangdx大约 19 分钟

RocketMQ 简介

传统实时性响应开发

  • 在标准的项目开发与设计之中,最为常见的处理形式就是客户端向服务器端发出操作请,而后用户等待服务端业务处理完成与数据响应,但是这种操作所带来的最为严重的求,后果就是当服务端处理性能较差,或者业务处理逻辑耗时的情况下,该客户端会持续占用服务端的线程资源,导致后续其他请求无法正常处理。以一个购物商城的系统为例,用户向商城的交易系统发出了一个交易请求,但是一个完整的交易系统又会涉及到订单系统、购物车系统、库房系统以及物流系统等多个关联系统的操作,整个的业务处理完成后有可能要长达几分钟甚至数十个小时的时间,而在处理期间用户不可能一直处于等待响应的状态,尤其是在高并发访问期间,这样的处理逻辑会直接造成服务瘫痪。

消息组件处理

  • 为了解决此类问题,1 最佳的做法是在项目之中引入一个消息系统,即:用户通过交易系统发出一个交易数据随后就会立即对该用户的请求做出响应。但是此交易数据并非立即处理,而是进入到一个消息系统之中等待其它相关子系统的处理。这样一来就可以极大的减少服务器端处理时间,也可以提高项目的吞吐量,起到了一个业务操作缓冲的功能而这就是消息组件的主要作用。

消息组件作用

  • 消息组件是一种基于队列处理模式的服务端应用,在消息组件中分为消息生产者与消息消费者两个处理终端。不管生产者生产了多少个消息,这些消息都会保存在消息组件之中,可以很好的实现削峰填谷的处理效果,避免大规模用户请求所带来的服务瘫痪等相关问题的出现。所有保存在消息组件中的消息数据可以依据自身的业务逻辑需要由消费端按顺序进行处理,这样就可以保证整个业务处理逻辑的可靠性以及服务运行的稳定性同时基于消息处理机制的方式还可以使得各种第三方平台的接入更加的方便。

刷盘与落盘

  • 由于消息中间件中需要进行大量消息数据的存储,所以对于数据的存储形式就提供有了“刷盘”与“落盘”两种实现机制
  • 落盘主要是为了保证数据持久化,这样即使服务器出现了宕机也不会导致数据丢失,但是由于 I0 的限制,导致处理性能较差。而刷盘会将数据先写入到内存缓冲区之中,而后由操作系统决定何时写入到磁盘之中进行持久化存储,但是一旦出现服务宕机时,那些保存在内存中未写入的数据就有可能丢失,由于此种形式主要对内存进行读写,处理性能较高。
    • 同步刷盘(SYNC FLUSH):当数据写入到缓存后立刻刷盘,在保证刷盘成功的前提下进行客户端响应,此种操作拥有较高的数据可靠性;
    • 异步刷盘(ASYNC FLUSH):写入处理速度快,吞吐量大,缓存数据积累到一定时快速写入,但是不能够保证数据的可靠性;

RocketMQ 发展史

  • RocketMQ 是由阿里推出的一款分布式、队列模型的开源消息中间件,同时在阿里巴巴内部经历了十年的双十一大促活动的洗礼中依然可以保证较高处理性能与较好的稳定性现在已经交由 Apache 负责维护,并且于 2017 年 09 月 25 日成为 Apache 的顶级项目。

RocketMQ 服务搭建

1、
cd /var/ftp/
wget https://dlcdn.apache.org/rocketmq/4.9.1/rocketmq-all-4.9.1-bin-release.zip

2、
unzip /var/ftp/rocketmq-all-4.9.1-bin-release.zip -d /usr/local/

3、
mv /usr/local/rocketmq-all-4.9.1-bin-release/ /usr/local/rocketmq


4、
mkdir /usr/local/rocketmq/logs

5、
mv jdk jdk-11
tar xzvf /var/ftp/jdk-8u191-linux-x64.tar.gz -C /usr/local/
mv /usr/local/jdk1.8.0_191/ /usr/local/jdk
javac -version

6、
/usr/local/rocketmq/bin/mqnamesrv

7、
vim /usr/local/rocketmq/bin/runserver.sh

8、
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m"

9、
nohup /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/rocketmq-namesrv.log 2>&1 &

10、
mkdir -p /usr/local/rocketmq/store/commitlog

11、
vim /usr/local/rocketmq/conf/broker.conf

namesrvAddr=rocketmq-server:9876
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitlog


12、vim /usr/local/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"

13、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &

14、
vi /etc/profile
export NAMESRV_ADDR=rocketmq-server:9876

15、
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=10912/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --reload


16、
org.apache.rocketmq.example.quickstart.Producer
org.apache.rocketmq.example.quickstart.Consumer

/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

ACL 访问控制列表

1、
vi /usr/local/rocketmq/conf/broker.conf
aclEnable=true

2、
vi /usr/local/rocketmq/conf/plain_acl.yml

3、
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*

accounts:
- accessKey: RocketMQMuyan
  secretKey: helloyootk
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: PUB|SUB
  defaultGroupPerm: PUB|SUB
  topicPerms:
  - TopicMuyan=DENY
  - TopicYootk=PUB|SUB
  - TopicBenchmark=PUB|SUB
  groupPerms:
  # the group should convert to retry topic
  - yootk-group=DENY
  - muyan-group=PUB|SUB
  - happy-group=SUB

- accessKey: RocketMQAdmin
  secretKey: hello123456
  whiteRemoteAddress: 192.168.1.*
  # if it is admin, it could access all resources
  admin: true


4、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/broker.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &

5、
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

RocketMQ 控制台

1、
https://github.com/apache/rocketmq-externals

2、
git clone git@github.com:apache/rocketmq-externals.git

git clone git@github.com:muyan-yootk/rocketmq-externals.git

3、
#if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=rocketmq-server:9876
rocketmq.config.accessKey=RocketMQMuyan
rocketmq.config.secretKey=helloyootk


4、
#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=true


5、
# Define Admin,1:管理员、0:普通用户
admin=admin,1

# Define Users
muyan=yootk

6、

java -jar /mnt/project/boot/rocketmq-console/rocketmq-dashboard-1.0.1-SNAPSHOT.jar > /usr/local/rocketmq/logs/rocketmq-console.log 2>&1 &

7、
firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --reload

8、
rocketmq-server:8080

9、
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
/usr/local/rocketmq/bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

RocketMQ 管理命令

文档open in new window

1、
/usr/local/rocketmq/bin/mqadmin

2、
/usr/local/rocketmq/bin/mqadmin updateTopic -b rocketmq-server:10911 -t TopicYootk

3、
/usr/local/rocketmq/bin/mqadmin topicList

4、
/usr/local/rocketmq/bin/mqadmin deleteTopic -c DefaultCluster -n rocketmq-server:9876 -t YootkTopic

5、
/usr/local/rocketmq/bin/mqadmin updateSubGroup -c DefaultCluster -n rocketmq-server:9876 -g muyan-group

6、
/usr/local/rocketmq/bin/mqadmin deleteSubGroup -c DefaultCluster -n rocketmq-server:9876 -g muyan-group

7、
/usr/local/rocketmq/bin/mqadmin updateBrokerConfig -c DefaultCluster -n rocketmq-server:9876 -k flushDiskType -v SYNC_FLUSH

8、
/usr/local/rocketmq/bin/mqadmin topicRoute -n rocketmq-server:9876 -t TopicTest

Benchmark 压力测试

1、
chmod 777 -R /usr/local/rocketmq/benchmark/

2、

/usr/local/rocketmq/benchmark/consumer.sh -t TopicBenchmark -n rocketmq-server:9876 -g muyan-group

3、
/usr/local/rocketmq/benchmark/producer.sh -t TopicBenchmark -n rocketmq-server:9876

4、
/usr/local/rocketmq/benchmark/shutdown.sh producer

5、
/usr/local/rocketmq/benchmark/shutdown.sh consumer

RocketMQ 核心概念

Remoting 通讯模块

1、
<dependency>
    <groupId>${project.groupId}</groupId>
    <artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
    <groupId>${project.groupId}</groupId>
    <artifactId>rocketmq-remoting</artifactId>
</dependency>
<dependency>
    <groupId>${project.groupId}</groupId>
    <artifactId>rocketmq-srvutil</artifactId>
</dependency>
<dependency>
    <groupId>${project.groupId}</groupId>
    <artifactId>rocketmq-remoting</artifactId>
</dependency>


2、
<dependencies>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
    </dependency>
    <dependency>
        <groupId>${project.groupId}</groupId>
        <artifactId>rocketmq-logging</artifactId>
    </dependency>
</dependencies>


3、
package org.apache.rocketmq.remoting;
public interface RemotingService {
    void start();					// 服务启动
    void shutdown();				// 服务关闭
    void registerRPCHook(RPCHook rpcHook); 		// 注册RPC回调钩子处理
}


4、
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);
    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}

消息结构

1、
/usr/local/rocketmq/bin/mqadmin updateTopic -t TopicYootk -n rocketmq-server:9876 -c DefaultCluster

/usr/local/rocketmq/bin/mqadmin sendMessage -t TopicYootk -n rocketmq-server:9876 -p www.yootk.com

2、
/usr/local/rocketmq/bin/mqadmin consumeMessage -t TopicYootk -n rocketmq-server:9876 -g muyan-group

3、
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.common.message;

import org.apache.rocketmq.common.sysflag.MessageSysFlag;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;

    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

    public Message() {
    }

    public Message(String topic, byte[] body) {
        this(topic, "", "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }

    public Message(String topic, String tags, byte[] body) {
        this(topic, tags, "", 0, body, true);
    }

    public Message(String topic, String tags, String keys, byte[] body) {
        this(topic, tags, keys, 0, body, true);
    }

    public void setKeys(String keys) {
        this.putProperty(MessageConst.PROPERTY_KEYS, keys);
    }

    void putProperty(final String name, final String value) {
        if (null == this.properties) {
            this.properties = new HashMap<String, String>();
        }

        this.properties.put(name, value);
    }

    void clearProperty(final String name) {
        if (null != this.properties) {
            this.properties.remove(name);
        }
    }

    public void putUserProperty(final String name, final String value) {
        if (MessageConst.STRING_HASH_SET.contains(name)) {
            throw new RuntimeException(String.format(
                "The Property<%s> is used by system, input another please", name));
        }

        if (value == null || value.trim().isEmpty()
            || name == null || name.trim().isEmpty()) {
            throw new IllegalArgumentException(
                "The name or value of property can not be null or blank string!"
            );
        }

        this.putProperty(name, value);
    }

    public String getUserProperty(final String name) {
        return this.getProperty(name);
    }

    public String getProperty(final String name) {
        if (null == this.properties) {
            this.properties = new HashMap<String, String>();
        }

        return this.properties.get(name);
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getTags() {
        return this.getProperty(MessageConst.PROPERTY_TAGS);
    }

    public void setTags(String tags) {
        this.putProperty(MessageConst.PROPERTY_TAGS, tags);
    }

    public String getKeys() {
        return this.getProperty(MessageConst.PROPERTY_KEYS);
    }

    public void setKeys(Collection<String> keys) {
        StringBuffer sb = new StringBuffer();
        for (String k : keys) {
            sb.append(k);
            sb.append(MessageConst.KEY_SEPARATOR);
        }

        this.setKeys(sb.toString().trim());
    }

    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }

        return 0;
    }

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }

    public boolean isWaitStoreMsgOK() {
        String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
        if (null == result)
            return true;

        return Boolean.parseBoolean(result);
    }

    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
        this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
    }

    public void setInstanceId(String instanceId) {
        this.putProperty(MessageConst.PROPERTY_INSTANCE_ID, instanceId);
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    public byte[] getBody() {
        return body;
    }

    public void setBody(byte[] body) {
        this.body = body;
    }

    public Map<String, String> getProperties() {
        return properties;
    }

    void setProperties(Map<String, String> properties) {
        this.properties = properties;
    }

    public String getBuyerId() {
        return getProperty(MessageConst.PROPERTY_BUYER_ID);
    }

    public void setBuyerId(String buyerId) {
        putProperty(MessageConst.PROPERTY_BUYER_ID, buyerId);
    }

    public String getTransactionId() {
        return transactionId;
    }

    public void setTransactionId(String transactionId) {
        this.transactionId = transactionId;
    }

    @Override
    public String toString() {
        return "Message{" +
            "topic='" + topic + '\'' +
            ", flag=" + flag +
            ", properties=" + properties +
            ", body=" + Arrays.toString(body) +
            ", transactionId='" + transactionId + '\'' +
            '}';
    }
}


4、
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.rocketmq.common.message;

import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;

public class MessageExt extends Message {
    private static final long serialVersionUID = 5720810158625748049L;

    private String brokerName;

    private int queueId;

    private int storeSize;

    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;

    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;

    private long preparedTransactionOffset;

    public MessageExt() {
    }

    public MessageExt(int queueId, long bornTimestamp, SocketAddress bornHost, long storeTimestamp,
        SocketAddress storeHost, String msgId) {
        this.queueId = queueId;
        this.bornTimestamp = bornTimestamp;
        this.bornHost = bornHost;
        this.storeTimestamp = storeTimestamp;
        this.storeHost = storeHost;
        this.msgId = msgId;
    }

    public static TopicFilterType parseTopicFilterType(final int sysFlag) {
        if ((sysFlag & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG) {
            return TopicFilterType.MULTI_TAG;
        }

        return TopicFilterType.SINGLE_TAG;
    }

    public static ByteBuffer socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        InetAddress address = inetSocketAddress.getAddress();
        if (address instanceof Inet4Address) {
            byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
        } else {
            byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16);
        }
        byteBuffer.putInt(inetSocketAddress.getPort());
        byteBuffer.flip();
        return byteBuffer;
    }

    public static ByteBuffer socketAddress2ByteBuffer(SocketAddress socketAddress) {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
        InetAddress address = inetSocketAddress.getAddress();
        ByteBuffer byteBuffer;
        if (address instanceof Inet4Address) {
            byteBuffer = ByteBuffer.allocate(4 + 4);
        } else {
            byteBuffer = ByteBuffer.allocate(16 + 4);
        }
        return socketAddress2ByteBuffer(socketAddress, byteBuffer);
    }

    public ByteBuffer getBornHostBytes() {
        return socketAddress2ByteBuffer(this.bornHost);
    }

    public ByteBuffer getBornHostBytes(ByteBuffer byteBuffer) {
        return socketAddress2ByteBuffer(this.bornHost, byteBuffer);
    }

    public ByteBuffer getStoreHostBytes() {
        return socketAddress2ByteBuffer(this.storeHost);
    }

    public ByteBuffer getStoreHostBytes(ByteBuffer byteBuffer) {
        return socketAddress2ByteBuffer(this.storeHost, byteBuffer);
    }

    public String getBrokerName() {
        return brokerName;
    }

    public void setBrokerName(String brokerName) {
        this.brokerName = brokerName;
    }

    public int getQueueId() {
        return queueId;
    }

    public void setQueueId(int queueId) {
        this.queueId = queueId;
    }

    public long getBornTimestamp() {
        return bornTimestamp;
    }

    public void setBornTimestamp(long bornTimestamp) {
        this.bornTimestamp = bornTimestamp;
    }

    public SocketAddress getBornHost() {
        return bornHost;
    }

    public void setBornHost(SocketAddress bornHost) {
        this.bornHost = bornHost;
    }

    public String getBornHostString() {
        if (null != this.bornHost) {
            InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();

            return null != inetAddress ? inetAddress.getHostAddress() : null;
        }

        return null;
    }

    public String getBornHostNameString() {
        if (null != this.bornHost) {
            InetAddress inetAddress = ((InetSocketAddress) this.bornHost).getAddress();

            return null != inetAddress ? inetAddress.getHostName() : null;
        }

        return null;
    }

    public long getStoreTimestamp() {
        return storeTimestamp;
    }

    public void setStoreTimestamp(long storeTimestamp) {
        this.storeTimestamp = storeTimestamp;
    }

    public SocketAddress getStoreHost() {
        return storeHost;
    }

    public void setStoreHost(SocketAddress storeHost) {
        this.storeHost = storeHost;
    }

    public String getMsgId() {
        return msgId;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public int getSysFlag() {
        return sysFlag;
    }

    public void setSysFlag(int sysFlag) {
        this.sysFlag = sysFlag;
    }

    public void setStoreHostAddressV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.STOREHOSTADDRESS_V6_FLAG; }

    public void setBornHostV6Flag() { this.sysFlag = this.sysFlag | MessageSysFlag.BORNHOST_V6_FLAG; }

    public int getBodyCRC() {
        return bodyCRC;
    }

    public void setBodyCRC(int bodyCRC) {
        this.bodyCRC = bodyCRC;
    }

    public long getQueueOffset() {
        return queueOffset;
    }

    public void setQueueOffset(long queueOffset) {
        this.queueOffset = queueOffset;
    }

    public long getCommitLogOffset() {
        return commitLogOffset;
    }

    public void setCommitLogOffset(long physicOffset) {
        this.commitLogOffset = physicOffset;
    }

    public int getStoreSize() {
        return storeSize;
    }

    public void setStoreSize(int storeSize) {
        this.storeSize = storeSize;
    }

    public int getReconsumeTimes() {
        return reconsumeTimes;
    }

    public void setReconsumeTimes(int reconsumeTimes) {
        this.reconsumeTimes = reconsumeTimes;
    }

    public long getPreparedTransactionOffset() {
        return preparedTransactionOffset;
    }

    public void setPreparedTransactionOffset(long preparedTransactionOffset) {
        this.preparedTransactionOffset = preparedTransactionOffset;
    }

    @Override
    public String toString() {
        return "MessageExt [brokerName=" + brokerName + ", queueId=" + queueId + ", storeSize=" + storeSize + ", queueOffset=" + queueOffset
            + ", sysFlag=" + sysFlag + ", bornTimestamp=" + bornTimestamp + ", bornHost=" + bornHost
            + ", storeTimestamp=" + storeTimestamp + ", storeHost=" + storeHost + ", msgId=" + msgId
            + ", commitLogOffset=" + commitLogOffset + ", bodyCRC=" + bodyCRC + ", reconsumeTimes="
            + reconsumeTimes + ", preparedTransactionOffset=" + preparedTransactionOffset
            + ", toString()=" + super.toString() + "]";
    }
}

心跳检查

1、
package org.apache.rocketmq.broker;
public class BrokerController {
    public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load(); // 配置数据读取的结果状态
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();
        if (result) {
            try {
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); // 消息的存储配置
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(
                        this, (DefaultMessageStore) messageStore);
                                ((DLedgerCommitLog)((DefaultMessageStore)
            messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector()
                        .addRoleChangeHandler(roleChangeHandler);
                }
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                MessageStorePluginContext context = new MessageStorePluginContext(
                messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(
                        new CommitLogDispatcherCalcBitMap(this.brokerConfig,
                                this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        result = result && this.messageStore.load();
        if (result) {
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
                        this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            this.fastRemotingServer = new NettyRemotingServer(fastConfig,
                        this.clientHousekeepingService);
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));
            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));
            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.replyThreadPoolQueue,
                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));
            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl("AdminBrokerThread_"));
            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));
            this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( // 心跳执行的线程池
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                this.brokerConfig.getHeartbeatThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.heartbeatThreadPoolQueue,
                new ThreadFactoryImpl("HeartbeatThread_", true));
            this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                this.brokerConfig.getEndTransactionThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.endTransactionThreadPoolQueue,
                new ThreadFactoryImpl("EndTransactionThread_"));
            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig
                  .getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));
            this.registerProcessor();
            final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(),
                  TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(
                  this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }
            if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                        this.updateMasterHAServerAddrPeriodically = false;
                    } else {
                        this.updateMasterHAServerAddrPeriodically = true;
                    }
                } else {
                    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                BrokerController.this.printMasterAndSlaveDiff();
                            } catch (Throwable e) {
                                log.error("schedule printMasterAndSlaveDiff error.", e);
                            }
                        }
                    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
                }
            }
            if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                // Register a listener to reload SslContext
                try {
                    fileWatchService = new FileWatchService(
                        new String[] {
                            TlsSystemConfig.tlsServerCertPath,
                            TlsSystemConfig.tlsServerKeyPath,
                            TlsSystemConfig.tlsServerTrustCertPath
                        },
                        new FileWatchService.Listener() {
                            boolean certChanged, keyChanged = false;

                            @Override
                            public void onChanged(String path) {
                                if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                    log.info("The trust certificate changed, reload the ssl context");
                                    reloadServerSslContext();
                                }
                                if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                    certChanged = true;
                                }
                                if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                    keyChanged = true;
                                }
                                if (certChanged && keyChanged) {
                                    log.info("The certificate and private key changed, reload the ssl context");
                                    certChanged = keyChanged = false;
                                    reloadServerSslContext();
                                }
                            }

                            private void reloadServerSslContext() {
                                ((NettyRemotingServer) remotingServer).loadSslContext();
                                ((NettyRemotingServer) fastRemotingServer).loadSslContext();
                            }
                        });
                } catch (Exception e) {
                    log.warn("FileWatchService created error, can't load the certificate dynamically");
                }
            }
            initialTransaction();
            initialAcl();
            initialRpcHooks();
        }
        return result;
    }
    public void start() throws Exception {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }
    }
}


2、
package org.apache.rocketmq.common;
public class BrokerConfig {
    /**
     * This configurable item defines interval of topics registration of broker to name server. Allowing values are between 10, 000 and 60, 000 milliseconds.
     */
    private int registerNameServerPeriod = 1000 * 30; // Broker发送到NameSerfver的心跳间隔
}


3、
public synchronized void registerBrokerAll(final boolean checkOrderConfig,
		boolean oneway, boolean forceRegister) {
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager()
			.buildTopicConfigSerializeWrapper();	// 获取全部的主题信息
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())	// 是否可写?
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { // 是否可读?
// 所有要发送到NameServer上的主题信息全部都通过TopicConfig来进行存储
        ConcurrentHashMap<String, TopicConfig> topicConfigTable =
		new ConcurrentHashMap<String, TopicConfig>(); // 保存所有的主题信息
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                new TopicConfig(topicConfig.getTopicName(),
			topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                    this.brokerConfig.getBrokerPermission()); // 将每一个主题的配置信息全部保存
            topicConfigTable.put(topicConfig.getTopicName(), tmp); 	// 保存主题信息
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), // 允许注册?
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); // 具体的注册
    }
}


4、
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
    TopicConfigSerializeWrapper topicConfigWrapper) {     // 保存了所有的主题信息
    List<RegisterBrokerResult> registerBrokerResultList =  // 保存Broker的注册信息
        this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(), // Broker集群名称
        this.getBrokerAddr(), // Broker地址
        this.brokerConfig.getBrokerName(), // Broker名称
        this.brokerConfig.getBrokerId(), // BrokerID
        this.getHAServerAddr(), // 集群地址
        topicConfigWrapper, // 主题的配置项
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills(),
        this.brokerConfig.isCompressedRegister());
    if (registerBrokerResultList.size() > 0) {     // 注册的Broker有很多
        RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); // 获取
        if (registerBrokerResult != null) { // 可以获取到
            if (this.updateMasterHAServerAddrPeriodically &&
                    registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(
                    registerBrokerResult.getHaServerAddr());    // 更新Master主机地址
            }
            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(
                        registerBrokerResult.getKvTable());    // 主题的更新操作
            }
        }
    }
}


5、
package org.apache.rocketmq.broker.out;
public class BrokerOuterAPI {
    public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {
        final List<RegisterBrokerResult> registerBrokerResultList =
			new CopyOnWriteArrayList<>(); // 实例化List集合
// 如果要想进行Broker注册,那么一定要提供有NameServer地址信息,那么此时通过RemotingClient获取
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 有吗?
// 如果存在有NameServer,则构建头信息
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr); // 设置Broker地址
            requestHeader.setBrokerId(brokerId); // 设置BrokerID
            requestHeader.setBrokerName(brokerName); // 设置Broker名称
            requestHeader.setClusterName(clusterName); // 设置集群名称
            requestHeader.setHaServerAddr(haServerAddr); // HA服务地址
            requestHeader.setCompressed(compressed); // 是否压缩
            RegisterBrokerBody requestBody = new RegisterBrokerBody(); // 请求数据主体部分
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); // 主题配置
            requestBody.setFilterServerList(filterServerList);
// 在使用RocketMQ的时候所有的心跳数据都需要进行压缩处理
            final byte[] body = requestBody.encode(compressed); // 数据的压缩处理
            final int bodyCrc32 = UtilAll.crc32(body); // crc校验码
            requestHeader.setBodyCrc32(bodyCrc32);
// 在RocketMQ之中每一个NameServer保存的配置都是相同的,等待全部注册完成
            final CountDownLatch countDownLatch = new
		CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() { 	// 注册处理
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway,
			timeoutMills, requestHeader, body); // Broker注册
                            if (result != null) {
                                registerBrokerResultList.add(result); // 保存Broker注册结果
                            }
                            log.info("register broker[{}]to name server {} OK", brokerId,
			namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {}
        }
        return registerBrokerResultList;
    }
}


6、
private RegisterBrokerResult registerBroker( // 实现具体注册操作的工具方法
    final String namesrvAddr, // NameServer服务地址
    final boolean oneway, // oneway判断
    final int timeoutMills, // 超时时间
    final RegisterBrokerRequestHeader requestHeader, // 注册头信息
    final byte[] body // 压缩的注册数据
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
    InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(
		RequestCode.REGISTER_BROKER, requestHeader); // 创建请求
    request.setBody(body); // 设置注册的主体
    if (oneway) {
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {}
        return null;
    }
    RemotingCommand response = this.remotingClient.invokeSync(
	namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: { // 注册成功?
            RegisterBrokerResponseHeader responseHeader =
                (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); // 响应头信息
            RegisterBrokerResult result = new RegisterBrokerResult(); // 注册结果
            result.setMasterAddr(responseHeader.getMasterAddr()); // Master地址
            result.setHaServerAddr(responseHeader.getHaServerAddr()); // HA服务列表
            if (response.getBody() != null) { // 响应主题不为空
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}


7、
package org.apache.rocketmq.namesrv;
public class NamesrvController {
    public boolean initialize() {
        // 其他的代码暂时略…
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() { // 扫描不活跃的Broker
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
        return true;
    }
}


8、
public void scanNotActiveBroker() {
// 扫描之前首先获得全部的Broker存活列表(每次都要更新的)
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) { // 列表迭代
        Entry<String, BrokerLiveInfo> next = it.next(); // 获取列表
    // 所有Broker会定期的发送配置的信息到NameServer之中,所以最后一次的配置更新的时间是会改变的
        long last = next.getValue().getLastUpdateTimestamp(); // 获取最后一次的更新时间戳
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { // 超时吗?
            RemotingUtil.closeChannel(next.getValue().getChannel()); // 关闭通道
            it.remove(); // 移除Broker
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

数据存储

1、
tree /usr/local/rocketmq/store/


2、
http://rocketmq.apache.org/docs/rmq-deployment/

3、
package org.apache.rocketmq.store.index;
public class IndexFile {
    private static int hashSlotSize = 4; 			// Hash插槽个数
    private static int indexSize = 20; 			// 索引长度
    private static int invalidIndex = 0; 			// 无效索引个数
    private final int hashSlotNum; 			// Hash插槽个数
    private final int indexNum; 				// 索引个数
    private final MappedFile mappedFile; 			// 内存文件映射(NIO)
    private final FileChannel fileChannel; 			// NIO中的文件处理通道
    private final MappedByteBuffer mappedByteBuffer; 	// NIO中的内存映射
    private final IndexHeader indexHeader; 			// 索引头信息
}


4、
package org.apache.rocketmq.store;
public interface MessageStore {
    default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) { // 异步消息存储
        return CompletableFuture.completedFuture(putMessages(messageExtBatch));
    }
    PutMessageResult putMessage(final MessageExtBrokerInner msg); // 同步消息存储
}


5、
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    PutMessageStatus checkStoreStatus = this.checkStoreStatus(); // 检查存储的状态
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {  // 状态OK
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    PutMessageStatus msgCheckStatus = this.checkMessage(msg); // 消息检查
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { // 有问题的消息
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }
    long beginTime = this.getSystemClock().now(); // 开始时间戳
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    putResultFuture.thenAccept((result) -> { // 消息处理
        long elapsedTime = this.getSystemClock().now() - beginTime; // 耗费时间
        if (elapsedTime > 500) { // 时间过长,发出日志警报
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); // 信息的记录
        if (null == result || !result.isOk()) { // 消息的处理失败
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); // 错误的计数
        }
    });
    return putResultFuture;
}

数据刷盘

1、
private boolean mmapOperation() {
    boolean isSuccess = false; // 成功状态
    AllocateRequest req = null; // 分配请求
    try { // 如果要想进行分配,则会将分配的请求保存在一个请求队列之中,起到数据缓冲的作用
        req = this.requestQueue.take();// 通过请求队列获取请求
        AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
        if (null == expectedRequest) { // 如果请求为空,表示不需要分配
            log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize());
            return true; // 直接返回true
        }
        if (expectedRequest != req) { // 无效的请求
            log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
            return true;
        } // 开始处理MappedFile分配操作
        if (req.getMappedFile() == null) { // 如果此时没有MappedFile实例
            long beginTime = System.currentTimeMillis();
            MappedFile mappedFile; // 创建MappedFile
            if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                try { // 读取MappedFile
                    mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                    mappedFile.init(req.getFilePath(), req.getFileSize(),
			messageStore.getTransientStorePool());// MappedFile初始化
                } catch (RuntimeException e) {
                    log.warn("Use default implementation.");
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(),
			messageStore.getTransientStorePool());// 创建
                }
            } else { // 还是要创建MappedFile
                mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
            }
            long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
            if (elapsedTime > 10) {
                int queueSize = this.requestQueue.size();
                log.warn("create mappedFile spent time(ms) " +
			elapsedTime + " queue size " + queueSize
                    + " " + req.getFilePath() + " " + req.getFileSize());
            }
            // 为数据写入做准备(预准备过程)
            if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                .getMappedFileSizeCommitLog() &&
                this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { // 预热?
                mappedFile.warmMappedFile(
		this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                    this.messageStore.getMessageStoreConfig()
			.getFlushLeastPagesWhenWarmMapedFile());
            }
            req.setMappedFile(mappedFile); // 为请求设置分配好的MappedFile对象
            this.hasException = false;
            isSuccess = true;
        }
    } catch (InterruptedException e) {
        log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
        this.hasException = true;
        return false;
    } catch (IOException e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.hasException = true;
        if (null != req) {
            requestQueue.offer(req);
            try {
                Thread.sleep(1);
            } catch (InterruptedException ignored) {
            }
        }
    } finally {
        if (req != null && isSuccess)
            req.getCountDownLatch().countDown();
    }
    return true;
}

RocketMQ 集群服务概述

1、
192.168.190.176 nameserver-cluster-a
192.168.190.177 nameserver-cluster-b
192.168.190.180 broker-cluster-a-node-1
192.168.190.181 broker-cluster-a-node-2
192.168.190.182 broker-cluster-a-node-3
192.168.190.183 broker-cluster-b-node-1
192.168.190.184 broker-cluster-b-node-2
192.168.190.185 broker-cluster-b-node-3
192.168.190.186 broker-cluster-c-node-1
192.168.190.187 broker-cluster-c-node-2
192.168.190.188 broker-cluster-c-node-3

2、
mv jdk jdk-11

3、
tar xzvf /var/ftp/jdk-8u191-linux-x64.tar.gz -C /usr/local/

4、
mv /usr/local/jdk1.8.0_191/ /usr/local/jdk

5、
vi /etc/sysconfig/network-scripts/ifcfg-ens33
IPADDR=192.168.190.176

6、
vi /etc/hostname

7、
vi /etc/hosts

192.168.190.176 nameserver-cluster-a
192.168.190.177 nameserver-cluster-b
192.168.190.180 broker-cluster-a-master
192.168.190.181 broker-cluster-a-slave-1
192.168.190.182 broker-cluster-a-slave-2
192.168.190.183 broker-cluster-b-master
192.168.190.184 broker-cluster-b-slave-1
192.168.190.185 broker-cluster-b-slave-2
192.168.190.186 broker-cluster-c-master
192.168.190.187 broker-cluster-c-slave-1
192.168.190.188 broker-cluster-c-slave-2

NameServer 集群

1、
unzip /var/ftp/rocketmq-all-4.9.1-bin-release.zip -d /usr/local/

2、
mkdir /usr/local/rocketmq/logs

3、
vi /usr/local/rocketmq/bin/runserver.sh

4、
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

5、
scp -r /usr/local/rocketmq nameserver-cluster-b:/usr/local/

6、
nohup /usr/local/rocketmq/bin/mqnamesrv > /usr/local/rocketmq/logs/rocketmq-namesrv.log 2>&1 &

7、
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --reload

Broker 集群

1、
mkdir -p /usr/data/rocketmq/store/commitlog

2、
unzip /var/ftp/rocketmq-all-4.9.1-bin-release.zip -d /usr/local/

3、
mv /usr/local/rocketmq-all-4.9.1-bin-release/ /usr/local/rocketmq

4、
mkdir -p /usr/local/rocketmq/logs

5、
vi /usr/local/rocketmq/bin/runbroker.sh

6、
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"

7、
cp /usr/local/rocketmq/conf/dledger/broker-n0.conf /usr/local/rocketmq/conf/dledger/broker-cluster.conf

8、vi /usr/local/rocketmq/conf/dledger/broker-cluster.conf

brokerClusterName = YootkRocketMQCluster
brokerName=Yootk-A
listenPort=30911
namesrvAddr=nameserver-cluster-a:9876;nameserver-cluster-b:9876
storePathRootDir=/usr/data/rocketmq/store
storePathCommitLog=/usr/data/rocketmq/store/commitlog
enableDLegerCommitLog=true
dLegerGroup=Yootk-A
dLegerPeers=n10-192.168.190.180:40911;n11-192.168.190.181:40911;n12-192.168.190.182:40911
dLegerSelfId=n10
sendMessageThreadPoolNums=16

9、
scp -r /usr/local/rocketmq/ broker-cluster-a-node-2:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-a-node-3:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-b-node-1:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-b-node-2:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-b-node-3:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-c-node-1:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-c-node-2:/usr/local
scp -r /usr/local/rocketmq/ broker-cluster-c-node-3:/usr/local

10、
vi /usr/local/rocketmq/conf/dledger/broker-cluster.conf

11、
brokerName=Yootk-B
dLegerGroup=Yootk-B
dLegerPeers=n20-192.168.190.183:40911;n21-192.168.190.184:40912;n22-192.168.190.185:40913
LegerSelfId=n20

12、
nohup /usr/local/rocketmq/bin/mqbroker -c /usr/local/rocketmq/conf/dledger/broker-cluster.conf > /usr/local/rocketmq/logs/rocketmq-broker.log 2>&1 &

13、
firewall-cmd --zone=public --add-port=30909/tcp --permanent
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=40911/tcp --permanent
firewall-cmd --zone=public --add-port=40912/tcp --permanent
firewall-cmd --zone=public --add-port=40913/tcp --permanent
firewall-cmd --reload

14、
rocketmq.config.namesrvAddr=nameserver-cluster-a:9876;nameserver-cluster-b:9876

事务消息

demo


上次编辑于: