Rocketmq部署+故障演练

RocketMQ集群工作流程

NamServer

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现;

启动****NameServer NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心。

主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息

Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

生产环境使用自动容灾切换方案: RocketMQ-on-DLedger Group 一组相同名称的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。 RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。 RocketMQ-on-DLedger Group 可以水平扩展 可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。

**启动 Broker **与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。

Topic

Rocketmq在5.0 默认开启了 消息类型校验 一个topic只能发送一种类型的消息

  • TRANSACTION:事务消息
  • DELAY:定时/延时消息
  • FIFO:顺序消息
  • NORMAL:普通消息

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上、或指定集群名称,也可以在发送消息时自动创建Topic。

生产者发送消息

生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker发消息。

消费者接受消息

消费者接受消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

部署准备

集群架构图

主机规划

主机名称 主机ip 功能 备注
ks-rocketmq-cluster01-rocketmq01 10.6.28.242 nameserver+broker+控制台
ks-rocketmq-cluster01-rocketmq02 10.6.230.74 nameserver+broker
ks-rocketmq-cluster01-rocketmq03 10.6.174.144 nameserver+broker
ks-rocketmq-cluster02-rocketmq04 10.6.40.117 nameserver+broker
ks-rocketmq-cluster02-rocketmq05 10.6.120.17 nameserver+broker
ks-rocketmq-cluster02-rocketmq06 10.6.119.79 nameserver+broker

ks-rocketmq-cluster01-rocketmq01 主机执行

  1. 配置集群内的主机信息
1
2
3
4
5
6
7
8
cat >> /etc/hosts << EOF
10.6.28.242 ks-rocketmq-cluster01-rocketmq01
10.6.230.74 ks-rocketmq-cluster01-rocketmq02
10.6.174.144 ks-rocketmq-cluster01-rocketmq03
10.6.40.117 ks-rocketmq-cluster02-rocketmq04
10.6.120.17 ks-rocketmq-cluster02-rocketmq05
10.6.119.79 ks-rocketmq-cluster02-rocketmq06
EOF
  1. 设置免密
1
for i in `cat /etc/hosts  |grep ks- |awk '{print $1}' `;do ssh-copy-id -o StrictHostKeyChecking=no  $i ;done
  1. 设置主机名称
1
2
3
4
5
6
ssh 10.6.28.242   hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq01 
ssh 10.6.230.74 hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq02
ssh 10.6.174.144 hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq03
ssh 10.6.40.117 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq04
ssh 10.6.120.17 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq05
ssh 10.6.119.79 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq06

安装jdk8(每台主机都执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apt-get update  && apt install openjdk-8-jdk -y 

cat >> /etc/profile << EOF
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/
export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar
export PATH=\$PATH:\$JAVA_HOME/bin
EOF


java -version

openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-8u372-ga~us1-0ubuntu1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)

Rocketmq****安装

ks-rocketmq-cluster01-rocketmq01 执行 下载****rocketmq

在第一台主机配置好后 批量复制到其他主机

1
wget https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

rocketmq 配置修改

配置文件详细解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
brokerClusterName=KsStoryCluster:指定该 Broker 所属的集群名称。
brokerName=KsNode05:指定该 Broker 的名称。
listenPort=10911:指定 Broker 监听的端口。
namesrvAddr:指定 NameServer 的地址列表,用分号分隔。
storePathRootDir:指定存储根目录。
storePathCommitLog:指定 CommitLog 文件的存储路径。
enableDLegerCommitLog=true:启用 DLedger 模式,支持多副本存储。
dLegerGroup=KsNode05:指定 DLedger 组名。
dLegerPeers:指定 DLedger 集群中的节点及其端口。
dLegerSelfId=n3:指定当前节点的 DLedger ID,必须唯一。
sendMessageThreadPoolNums=32:发送消息的线程池数量。
deleteWhen=04:每天凌晨 4 点删除过期的消息日志文件。
waitTimeMillsInSendQueue=1000:发送队列中等待的时间(毫秒)。
useReentrantLockWhenPutMessage=true:在放置消息时使用可重入锁。
transientStorePoolEnable=true:启用瞬时存储池。
transientStorePoolSize=5:瞬时存储池的大小。
flushDiskType=ASYNC_FLUSH:指定磁盘刷新类型为异步刷新。
fileReservedTime=168:文件保留时间,单位为小时,这里是 168 小时(7 天)。
autoCreateTopicEnable=false:禁用自动创建 Topic。
tracetopicEnable=true:启用消息轨迹功能。
slaveReadEnable=true:允许从节点读取。
warmMapedFileEnable=true:启用内存映射文件的预热。
brokerRole=ASYNC_MASTER:设置 Broker 角色为异步主节点。
osPageCacheBusyTimeOutMills=5000:操作系统页面缓存繁忙超时时间,单位为毫秒。
waitTimeMillsInSendQueue=3000:发送队列中等待的时间(毫秒),这里有两处配置,最后一处会覆盖前一处(如果文件中有多个相同的配置项,以最后一个为准)。

每个主机修改后的配置

第一组集群

ks-rocketmq-cluster01-rocketmq01配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode05-YanLian
listenPort=10911
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node03
storePathCommitLog=/data/rmqstore/node03/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode05-YanLian
dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913
## must be unique
dLegerSelfId=n3
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

配置文件拷贝到其他主机

1
2
3
4
5
6
scp -r  rocketmq-4.9.2 10.6.230.74:/usr/local/
scp -r rocketmq-4.9.2 10.6.174.144:/usr/local/

scp -r rocketmq-4.9.2 10.6.40.117:/usr/local/
scp -r rocketmq-4.9.2 10.6.120.17:/usr/local/
scp -r rocketmq-4.9.2 10.6.119.79:/usr/local/

ks-rocketmq-cluster01-rocketmq02配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n4.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode05-YanLian
listenPort=10921
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node04
storePathCommitLog=/data/rmqstore/node04/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode05-YanLian
dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913
## must be unique
dLegerSelfId=n4
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

ks-rocketmq-cluster01-rocketmq03配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n5.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode05-YanLian
listenPort=10931
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node05
storePathCommitLog=/data/rmqstore/node05/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode05-YanLian
dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913
## must be unique
dLegerSelfId=n5
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

第二组集群

ks-rocketmq-cluster02-rocketmq04配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode06-YanLian
listenPort=10911
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node03
storePathCommitLog=/data/rmqstore/node03/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode06-YanLian
dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913
## must be unique
dLegerSelfId=n3
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

ks-rocketmq-cluster02-rocketmq05配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n4.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode06-YanLian
listenPort=10921
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node04
storePathCommitLog=/data/rmqstore/nod04/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode06-YanLian
dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913
## must be unique
dLegerSelfId=n4
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

ks-rocketmq-cluster02-rocketmq06配置文件

  • /usr/local/rocketmq-4.9.2/conf/dledger/broker-n5.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
brokerClusterName = KsStoryClusterYanLian
brokerName=KsNode06-YanLian
listenPort=10931
namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876
storePathRootDir=/data/rmqstore/node05
storePathCommitLog=/data/rmqstore/node05/commitlog
enableDLegerCommitLog=true
dLegerGroup=KsNode06-YanLian
dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913
## must be unique
dLegerSelfId=n5
sendMessageThreadPoolNums=2

## new add conf
deleteWhen=04
waitTimeMillsInSendQueue=1000
useReentrantLockWhenPutMessage=true
transientStorePoolEnable=true
transientStorePoolSize=1
flushDiskType=ASYNC_FLUSH
fileReservedTime=168
autoCreateTopicEnable=false
tracetopicEnable=true
slaveReadEnable=true
warmMapedFileEnable=true
brokerRole=ASYNC_MASTER
osPageCacheBusyTimeOutMills=5000
waitTimeMillsInSendQueue=3000

启动服务

详细配置在进程管理工具中

命令行验证集群状态

1
2
3
4
5
6
7
8
9
10
11
12
# 设置nameserver地址
export NAMESRV_ADDR="10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876"

cd /usr/local/rocketmq-4.9.2/bin/
./mqadmin clusterList
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE
KsStoryClusterYanLian KsNode05-YanLian 0 10.6.230.74:10921 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900
KsStoryClusterYanLian KsNode05-YanLian 4 10.6.28.242:10911 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.1000
KsStoryClusterYanLian KsNode05-YanLian 6 10.6.174.144:10931 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900
KsStoryClusterYanLian KsNode06-YanLian 0 10.6.120.17:10921 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900
KsStoryClusterYanLian KsNode06-YanLian 4 10.6.40.117:10911 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900
KsStoryClusterYanLian KsNode06-YanLian 6 10.6.119.79:10931 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900

Rocketmq 控制台安装

安装****maven

1
wget https://archive.apache.org/dist/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz

配置阿里云源 下载依赖更快

1
2
3
4
5
6
7
8
9
10
11
conf/settings.xml 
<mirrors>
...
新增
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
<mirror>

安装控制台

1
2
git clone https://github.com/muyan-yootk/rocketmq-externals.git
cd rocketmq-externals-master/rocketmq-console

修改配置文件

  • src/main/resources/application.properties (主配置文件)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server.contextPath=
server.port=8080

### SSL setting
#server.ssl.key-store=classpath:rmqcngkeystore.jks
#server.ssl.key-store-password=rocketmq
#server.ssl.keyStoreType=PKCS12
#server.ssl.keyAlias=rmqcngkey

#spring.application.index=true
spring.application.name=rocketmq-console
spring.http.encoding.charset=UTF-8
spring.http.encoding.enabled=true
spring.http.encoding.force=true
logging.config=classpath:logback.xml
#if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
rocketmq.config.namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 # 修改此处 填写nameserver
#if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
rocketmq.config.isVIPChannel=
#rocketmq-console's data path:dashboard/monitor
rocketmq.config.dataPath=/tmp/rocketmq-console/data # 前端持久化存储文件
#set it false if you don't want use dashboard.default true
rocketmq.config.enableDashBoardCollect=true
#set the message track trace topic if you don't want use the default one
rocketmq.config.msgTrackTopicName=
rocketmq.config.ticketKey=ticket

#Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required
rocketmq.config.loginRequired=true # 开启控制台登录权限认证
  • src/main/resources/users.properties (用户管理)
1
2
3
4
5
6
# 加入一下内容 最后1 代表超管 默认为普通用户0
admin=Tkamc.00!@#123,1

# Define Users
rocketmq1=rocketmq1!@#123
rocketmq2=rocketmq2!@#123

编译

回到pom.xml 目录同级

1
2
cd rocketmq-externals-master/rocketmq-console
/root/apache-maven-3.8.4/bin/mvn clean package -Dmaven.test.skip=true

启动服务

编译完成后 target会生成一个jar包

1
mv target/rocketmq-console-ng-1.0.1.jar /opt/

启动命令详细配置在进程管理工具中

验证

安装进程管理工具(每个主机都执行)

1
apt-get install supervisor

supervisor 主配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
; supervisor config file

[unix_http_server]
file=/var/run/supervisor.sock ; (the path to the socket file)
chmod=0700 ; sockef file mode (default 0700)

[supervisord]
logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log)
pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid)
childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP)
minfds=65535

; the below section must remain in the config file for RPC
; (supervisorctl/web interface) to work, additional interfaces may be
; added by defining them in separate rpcinterface: sections
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket

; The [include] section can just contain the "files" setting. This
; setting can list multiple files (separated by whitespace or
; newlines). It can also contain wildcards. The filenames are
; interpreted as relative to this file. Included files *cannot*
; include files themselves.

[include]
files = /etc/supervisor/conf.d/*.conf

创建目录 存储日志

1
mkdir /data/logs/rocketmq-4.9.2/ -p

启动nameserver配置文件

/etc/supervisor/conf.d/mqnamesrv-4.9.2.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
program:mqnamesrv-4.9.2]
environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64",ROCKETMQ_HOME="/usr/local/rocketmq-4.9.2"
directory = /usr/local/rocketmq-4.9.2
command = bash /usr/local/rocketmq-4.9.2/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup
user = rocketmq
autostart = true
autorestart = true
stopsignal=QUIT
stopwaitsecs=10
stopasgroup=true
killasgroup=true
stdout_logfile = /data/logs/rocketmq-4.9.2/mqnamesrv_stdout.log
stderr_logfile = /data/logs/rocketmq-4.9.2/mqnamesrv_stderr.log

启动broker配置文件

这里演练机器配置不高 修改/usr/local/rocketmq-4.9.2/bin/runbroker.sh jvm 内存配置

1
2
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g"
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=3g"

/etc/supervisor/conf.d/mqbroker-4.9.2.conf

这里的broker-n3.conf配置文件 每个主机不一样 按照情况配置

1
2
3
4
5
6
7
8
9
10
11
12
13
[program:mqbroker-4.9.2]
environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64",ROCKETMQ_HOME="/usr/local/rocketmq-4.9.2"
directory = /usr/local/rocketmq-4.9.2
command = bash /usr/local/rocketmq-4.9.2/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf
user = root
autostart = true
autorestart = true
stopsignal=TERM
stopwaitsecs=10
stopasgroup=true
killasgroup=true
stdout_logfile = /data/logs/rocketmq-4.9.2/mqbroker_stdout.log
stderr_logfile = /data/logs/rocketmq-4.9.2/mqbroker_stderr.log

启动Dashboard配置文件

/etc/supervisor/conf.d/mqdashboard.conf (只有安装控制台的主机才配置)

1
2
3
4
5
6
7
8
9
10
11
12
13
[program:mqdashboard]
environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
directory = /opt/
command = java -jar /opt/rocketmq-console-ng-1.0.1.jar
user = root
autostart = true
autorestart = true
stopsignal=TERM
stopwaitsecs=10
stopasgroup=true
killasgroup=true
stdout_logfile = /data/logs/rocketmq-4.9.2/mqdashboard_stdout.log
stderr_logfile = /data/logs/rocketmq-4.9.2/mqdashboard_stderr.log

进程工具常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 重启守护进程
systemctl restart supervisor.service
supervisorctl reload

# 查看进程状态
supervisorctl status

# 重启单个进程
supervisorctl restart mqdashboard

# 启动单个进程
supervisorctl start mqdashboard

# 关闭单个进程
supervisorctl stop mqdashboard

# 重启所有进程
supervisorctl restart all

验证Rocketmq集群

使用python脚本 模块地址: https://github.com/apache/rocketmq-client-python

使用ubuntu系统启动 生产者 消费者脚本

安装模块

1
2
3
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
pip install rocketmq-client-python

生产者脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# producer.py
from rocketmq.client import Producer, Message
import time

def main():
producer = Producer('producer_group_test')
producer.set_name_server_address('10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876') # 替换为你的 NameServer 地址
producer.start()

num = 0
while True:
msg = Message('myTopic')
msg.set_keys('key%s' % num )
msg.set_tags('tag%s' % num)
msg.set_body('Hello RocketMQ %s' % num)

ret = producer.send_sync(msg)
print(f'Send Status: {ret.status}, Message ID: {ret.msg_id}')
num += 1
time.sleep(1)

producer.shutdown()

if __name__ == "__main__":
main()

消费者脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# consumer.py
from rocketmq.client import PushConsumer, ConsumeStatus

def callback(msg):
print(f'Received message: {msg.body.decode("utf-8")}')
return ConsumeStatus.CONSUME_SUCCESS

def main():
consumer = PushConsumer('consumer_group_test')
consumer.set_name_server_address('10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876') # 替换为你的 NameServer 地址
consumer.subscribe('myTopic', callback)

consumer.start()

print('Consumer started. Waiting for messages...')
import time
while True:
time.sleep(3600) # Keep the consumer running

if __name__ == "__main__":
main()

创建topic

程序默认不会创建 需要手动创建完 执行脚本

验证

启动生产者

启动消费者

生产者详情

消费详情

集群故障演练

模拟KsNode05-YanLian 一个master节点下线

正常状态

下线KsNode05-YanLian的master节点

1
2
ssh 10.6.230.74
supervisorctl stop mqbroker-4.9.2 # 下线

集群状态

消费者消费正常 且id一直在自增 说明生产者也在正常工作 (topic同时注册到了Node5、Node6中)

模拟KsNode05-YanLian 一个master节点下线后 再下线新提权的master

集群状态

下线KsNode05-YanLian的master节点

1
2
ssh 10.6.28.242
supervisorctl stop mqbroker-4.9.2

集群状态

KsNode05-YanLian组的两个master全部下线后,slave不会再提权为master了 这里的slave生产总数不再增加了 说明这组集群已经失去了 写入的能力, 没有了写入 自然也不会有读取的数据 这组集群基本失去工作能力了

消费者消费正常 且id一直在自增 说明生产者也在正常工作(topic同时注册到了Node5、Node6中)

模拟KsNode05-YanLian 一个master节点下线后 再下线一个slave

集群状态

下线KsNode05-YanLian的slave节点

1
2
ssh 10.6.230.74
supervisorctl stop mqbroker-4.9.2

集群状态

10.6.174.144从原来的master 自动被降级到了slave 集群已经失去了 写入的能力, 没有了写入 自然也不会有读取的数据 这组集群基本失去工作能力了

总结

Topic创建时应选择 注册到所有的组中

  • 一组集群中至少需要存活两个broker 当一组中只剩下一个broker 这组集群就丧失了 写入的能力
  • 多组集群同时挂掉2个broker rocketmq不可用 生产者无法再生产数据 由于slave还在线 broker中已有的数据还可正常消费

Topic导出导入

Rocketmq Topic 导出(4.x)

命令导出为集群内所有topic 导入到别的集群中 应删除系统自建的Topic

1
/usr/local/rocketmq-4.9.2/bin/mqadmin topicList

自建集群创建topic(4.x)

  • -c 指定集群名称
  • -n 指定nameserver 地址
  • -t 指定topic名称
  • -r 队列读取数量
  • -w 队列写入数量
1
mqadmin updateTopic -c KsStoryClusterYanLian  -n '10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876' -t gph -r 16 -w 16

阿里云导入topic 调研 (rocketmq 5.x)

阿里云支持一键导入功能 需要准备 topicName****MessageType Remark

默认5.0开始 开启了强制校验功能 每个topic 只允许发送一种消息类型的消息 为兼容4.x 需要关闭校验消息类型

  • Remark Topic备注

Rocketmq部署+故障演练
https://blog.api-bj.top/2024/09/30/Rocketmq部署-故障演练/
作者
郭培华
发布于
2024年9月30日
许可协议