RocketMQ集群工作流程 NamServer NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现;
启动****NameServer NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心。
主要包括两个功能:
Broker管理 ,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
路由信息管理 ,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息
Broker Broker主要负责消息的存储、投递和查询以及服务高可用保证。
生产环境使用自动容灾切换方案: RocketMQ-on-DLedger Group 一组 相同名称 的 Broker,至少需要 3 个节点,通过 Raft 自动选举出一个 Leader,其余节点 作为 Follower,并在 Leader 和 Follower 之间复制数据以保证高可用。 RocketMQ-on-DLedger Group 能自动容灾切换,并保证数据一致。 RocketMQ-on-DLedger Group 可以水平扩展 可以部署任意多个 RocketMQ-on-DLedger Group 同时对外提供服务。
**启动 Broker **与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
Topic Rocketmq在5.0 默认开启了 消息类型校验 一个topic只能发送一种类型的消息
TRANSACTION:事务消息
DELAY:定时/延时消息
FIFO:顺序消息
NORMAL:普通消息
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上、或指定集群名称,也可以在发送消息时自动创建Topic。
生产者发送消息 生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker发消息。
消费者接受消息 消费者接受消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。
部署准备 集群架构图
主机规划
主机名称
主机ip
功能
备注
ks-rocketmq-cluster01-rocketmq01
10.6.28.242
nameserver+broker+控制台
ks-rocketmq-cluster01-rocketmq02
10.6.230.74
nameserver+broker
ks-rocketmq-cluster01-rocketmq03
10.6.174.144
nameserver+broker
ks-rocketmq-cluster02-rocketmq04
10.6.40.117
nameserver+broker
ks-rocketmq-cluster02-rocketmq05
10.6.120.17
nameserver+broker
ks-rocketmq-cluster02-rocketmq06
10.6.119.79
nameserver+broker
ks-rocketmq-cluster01-rocketmq01 主机执行
配置集群内的主机信息
1 2 3 4 5 6 7 8 cat >> /etc/hosts << EOF 10.6.28.242 ks-rocketmq-cluster01-rocketmq01 10.6.230.74 ks-rocketmq-cluster01-rocketmq02 10.6.174.144 ks-rocketmq-cluster01-rocketmq03 10.6.40.117 ks-rocketmq-cluster02-rocketmq04 10.6.120.17 ks-rocketmq-cluster02-rocketmq05 10.6.119.79 ks-rocketmq-cluster02-rocketmq06 EOF
设置免密
1 for i in `cat /etc/hosts |grep ks- |awk '{print $1}' `;do ssh-copy-id -o StrictHostKeyChecking=no $i ;done
设置主机名称
1 2 3 4 5 6 ssh 10.6.28.242 hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq01 ssh 10.6.230.74 hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq02 ssh 10.6.174.144 hostnamectl set-hostname ks-rocketmq-cluster01-rocketmq03 ssh 10.6.40.117 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq04 ssh 10.6.120.17 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq05 ssh 10.6.119.79 hostnamectl set-hostname ks-rocketmq-cluster02-rocketmq06
安装jdk8(每台主机都执行) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 apt-get update && apt install openjdk-8-jdk -y cat >> /etc/profile << EOF export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/ export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar export PATH=\$PATH:\$JAVA_HOME/bin EOF java -version openjdk version "1.8.0_362" OpenJDK Runtime Environment (build 1.8.0_362-8u372-ga~us1-0ubuntu1~18.04-b09) OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)
Rocketmq****安装 ks-rocketmq-cluster01-rocketmq01 执行 下载****rocketmq
在第一台主机配置好后 批量复制到其他主机
1 wget https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
rocketmq 配置修改 配置文件详细解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 brokerClusterName=KsStoryCluster:指定该 Broker 所属的集群名称。 brokerName=KsNode05:指定该 Broker 的名称。 listenPort=10911:指定 Broker 监听的端口。 namesrvAddr:指定 NameServer 的地址列表,用分号分隔。 storePathRootDir:指定存储根目录。 storePathCommitLog:指定 CommitLog 文件的存储路径。 enableDLegerCommitLog=true:启用 DLedger 模式,支持多副本存储。 dLegerGroup=KsNode05:指定 DLedger 组名。 dLegerPeers:指定 DLedger 集群中的节点及其端口。 dLegerSelfId=n3:指定当前节点的 DLedger ID,必须唯一。 sendMessageThreadPoolNums=32:发送消息的线程池数量。 deleteWhen=04:每天凌晨 4 点删除过期的消息日志文件。 waitTimeMillsInSendQueue=1000:发送队列中等待的时间(毫秒)。 useReentrantLockWhenPutMessage=true:在放置消息时使用可重入锁。 transientStorePoolEnable=true:启用瞬时存储池。 transientStorePoolSize=5:瞬时存储池的大小。 flushDiskType=ASYNC_FLUSH:指定磁盘刷新类型为异步刷新。 fileReservedTime=168:文件保留时间,单位为小时,这里是 168 小时(7 天)。 autoCreateTopicEnable=false:禁用自动创建 Topic。 tracetopicEnable=true:启用消息轨迹功能。 slaveReadEnable=true:允许从节点读取。 warmMapedFileEnable=true:启用内存映射文件的预热。 brokerRole=ASYNC_MASTER:设置 Broker 角色为异步主节点。 osPageCacheBusyTimeOutMills=5000:操作系统页面缓存繁忙超时时间,单位为毫秒。 waitTimeMillsInSendQueue=3000:发送队列中等待的时间(毫秒),这里有两处配置,最后一处会覆盖前一处(如果文件中有多个相同的配置项,以最后一个为准)。
每个主机修改后的配置 第一组集群
ks-rocketmq-cluster01-rocketmq01配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode05-YanLian listenPort=10911 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node03 storePathCommitLog=/data/rmqstore/node03/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode05-YanLian dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913 ## must be unique dLegerSelfId=n3 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
配置文件拷贝到其他主机
1 2 3 4 5 6 scp -r rocketmq-4.9.2 10.6.230.74:/usr/local/ scp -r rocketmq-4.9.2 10.6.174.144:/usr/local/ scp -r rocketmq-4.9.2 10.6.40.117:/usr/local/ scp -r rocketmq-4.9.2 10.6.120.17:/usr/local/ scp -r rocketmq-4.9.2 10.6.119.79:/usr/local/
ks-rocketmq-cluster01-rocketmq02配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n4.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode05-YanLian listenPort=10921 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node04 storePathCommitLog=/data/rmqstore/node04/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode05-YanLian dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913 ## must be unique dLegerSelfId=n4 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
ks-rocketmq-cluster01-rocketmq03配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n5.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode05-YanLian listenPort=10931 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node05 storePathCommitLog=/data/rmqstore/node05/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode05-YanLian dLegerPeers=n3-10.6.28.242:20911;n4-10.6.230.74:20912;n5-10.6.174.144:20913 ## must be unique dLegerSelfId=n5 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
第二组集群
ks-rocketmq-cluster02-rocketmq04配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode06-YanLian listenPort=10911 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node03 storePathCommitLog=/data/rmqstore/node03/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode06-YanLian dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913 ## must be unique dLegerSelfId=n3 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
ks-rocketmq-cluster02-rocketmq05配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n4.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode06-YanLian listenPort=10921 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node04 storePathCommitLog=/data/rmqstore/nod04/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode06-YanLian dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913 ## must be unique dLegerSelfId=n4 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
ks-rocketmq-cluster02-rocketmq06配置文件
/usr/local/rocketmq-4.9.2/conf/dledger/broker-n5.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 brokerClusterName = KsStoryClusterYanLian brokerName=KsNode06-YanLian listenPort=10931 namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 storePathRootDir=/data/rmqstore/node05 storePathCommitLog=/data/rmqstore/node05/commitlog enableDLegerCommitLog=true dLegerGroup=KsNode06-YanLian dLegerPeers=n3-10.6.40.117:20911;n4-10.6.120.17:20912;n5-10.6.119.79:20913 ## must be unique dLegerSelfId=n5 sendMessageThreadPoolNums=2 ## new add conf deleteWhen=04 waitTimeMillsInSendQueue=1000 useReentrantLockWhenPutMessage=true transientStorePoolEnable=true transientStorePoolSize=1 flushDiskType=ASYNC_FLUSH fileReservedTime=168 autoCreateTopicEnable=false tracetopicEnable=true slaveReadEnable=true warmMapedFileEnable=true brokerRole=ASYNC_MASTER osPageCacheBusyTimeOutMills=5000 waitTimeMillsInSendQueue=3000
启动服务 详细配置在进程管理工具中
命令行验证集群状态 1 2 3 4 5 6 7 8 9 10 11 12 # 设置nameserver地址 export NAMESRV_ADDR="10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876" cd /usr/local/rocketmq-4.9.2/bin/ ./mqadmin clusterList #Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #PCWait(ms) #Hour #SPACE KsStoryClusterYanLian KsNode05-YanLian 0 10.6.230.74:10921 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900 KsStoryClusterYanLian KsNode05-YanLian 4 10.6.28.242:10911 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.1000 KsStoryClusterYanLian KsNode05-YanLian 6 10.6.174.144:10931 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900 KsStoryClusterYanLian KsNode06-YanLian 0 10.6.120.17:10921 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900 KsStoryClusterYanLian KsNode06-YanLian 4 10.6.40.117:10911 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900 KsStoryClusterYanLian KsNode06-YanLian 6 10.6.119.79:10931 V4_9_2 0.00(0,0ms) 0.00(0,0ms) 0 478442.56 0.0900
Rocketmq 控制台安装 安装****maven 1 wget https://archive.apache.org/dist/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz
配置阿里云源 下载依赖更快
1 2 3 4 5 6 7 8 9 10 11 conf/settings.xml <mirrors> ... 新增 <mirror> <id>aliyunmaven</id> <mirrorOf>*</mirrorOf> <name>阿里云公共仓库</name> <url>https://maven.aliyun.com/repository/public</url> </mirror> <mirror>
安装控制台 1 2 git clone https://github.com/muyan-yootk/rocketmq-externals.git cd rocketmq-externals-master/rocketmq-console
修改配置文件
src/main/resources/application.properties (主配置文件)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 server.contextPath= server.port=8080 ### SSL setting #server.ssl.key-store=classpath:rmqcngkeystore.jks #server.ssl.key-store-password=rocketmq #server.ssl.keyStoreType=PKCS12 #server.ssl.keyAlias=rmqcngkey #spring.application.index=true spring.application.name=rocketmq-console spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true spring.http.encoding.force=true logging.config=classpath:logback.xml #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876 rocketmq.config.namesrvAddr=10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876 # 修改此处 填写nameserver #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true rocketmq.config.isVIPChannel= #rocketmq-console's data path:dashboard/monitor rocketmq.config.dataPath=/tmp/rocketmq-console/data # 前端持久化存储文件 #set it false if you don't want use dashboard.default true rocketmq.config.enableDashBoardCollect=true #set the message track trace topic if you don't want use the default one rocketmq.config.msgTrackTopicName= rocketmq.config.ticketKey=ticket #Must create userInfo file: ${rocketmq.config.dataPath}/users.properties if the login is required rocketmq.config.loginRequired=true # 开启控制台登录权限认证
src/main/resources/users.properties (用户管理)
1 2 3 4 5 6 # 加入一下内容 最后1 代表超管 默认为普通用户0 admin=Tkamc.00!@#123,1 # Define Users rocketmq1=rocketmq1!@#123 rocketmq2=rocketmq2!@#123
编译
回到pom.xml 目录同级
1 2 cd rocketmq-externals-master/rocketmq-console /root/apache-maven-3.8.4/bin/mvn clean package -Dmaven.test.skip=true
启动服务
编译完成后 target会生成一个jar包
1 mv target/rocketmq-console-ng-1.0.1.jar /opt/
启动命令详细配置在进程管理工具中
验证
安装进程管理工具(每个主机都执行) 1 apt-get install supervisor
supervisor 主配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ; supervisor config file [unix_http_server] file=/var/run/supervisor.sock ; (the path to the socket file) chmod=0700 ; sockef file mode (default 0700) [supervisord] logfile=/var/log/supervisor/supervisord.log ; (main log file;default $CWD/supervisord.log) pidfile=/var/run/supervisord.pid ; (supervisord pidfile;default supervisord.pid) childlogdir=/var/log/supervisor ; ('AUTO' child log dir, default $TEMP) minfds=65535 ; the below section must remain in the config file for RPC ; (supervisorctl/web interface) to work, additional interfaces may be ; added by defining them in separate rpcinterface: sections [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL for a unix socket ; The [include] section can just contain the "files" setting. This ; setting can list multiple files (separated by whitespace or ; newlines). It can also contain wildcards. The filenames are ; interpreted as relative to this file. Included files *cannot* ; include files themselves. [include] files = /etc/supervisor/conf.d/*.conf
创建目录 存储日志
1 mkdir /data/logs/rocketmq-4.9.2/ -p
启动nameserver 配置文件 /etc/supervisor/conf.d/mqnamesrv-4.9.2.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 program:mqnamesrv-4.9.2] environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64",ROCKETMQ_HOME="/usr/local/rocketmq-4.9.2" directory = /usr/local/rocketmq-4.9.2 command = bash /usr/local/rocketmq-4.9.2/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup user = rocketmq autostart = true autorestart = true stopsignal=QUIT stopwaitsecs=10 stopasgroup=true killasgroup=true stdout_logfile = /data/logs/rocketmq-4.9.2/mqnamesrv_stdout.log stderr_logfile = /data/logs/rocketmq-4.9.2/mqnamesrv_stderr.log
启动broker配置文件 这里演练机器配置不高 修改/usr/local/rocketmq-4.9.2/bin/runbroker.sh jvm 内存配置
1 2 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g" JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=3g"
/etc/supervisor/conf.d/mqbroker-4.9.2.conf
这里的broker-n3.conf配置文件 每个主机不一样 按照情况配置
1 2 3 4 5 6 7 8 9 10 11 12 13 [program:mqbroker-4.9.2] environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64",ROCKETMQ_HOME="/usr/local/rocketmq-4.9.2" directory = /usr/local/rocketmq-4.9.2 command = bash /usr/local/rocketmq-4.9.2/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup -c /usr/local/rocketmq-4.9.2/conf/dledger/broker-n3.conf user = root autostart = true autorestart = true stopsignal=TERM stopwaitsecs=10 stopasgroup=true killasgroup=true stdout_logfile = /data/logs/rocketmq-4.9.2/mqbroker_stdout.log stderr_logfile = /data/logs/rocketmq-4.9.2/mqbroker_stderr.log
启动Dashboard配置文件 /etc/supervisor/conf.d/mqdashboard.conf (只有安装控制台的主机才配置)
1 2 3 4 5 6 7 8 9 10 11 12 13 [program:mqdashboard] environment=JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64" directory = /opt/ command = java -jar /opt/rocketmq-console-ng-1.0.1.jar user = root autostart = true autorestart = true stopsignal=TERM stopwaitsecs=10 stopasgroup=true killasgroup=true stdout_logfile = /data/logs/rocketmq-4.9.2/mqdashboard_stdout.log stderr_logfile = /data/logs/rocketmq-4.9.2/mqdashboard_stderr.log
进程工具常用命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # 重启守护进程 systemctl restart supervisor.service supervisorctl reload # 查看进程状态 supervisorctl status # 重启单个进程 supervisorctl restart mqdashboard # 启动单个进程 supervisorctl start mqdashboard # 关闭单个进程 supervisorctl stop mqdashboard # 重启所有进程 supervisorctl restart all
验证Rocketmq集群 使用python脚本 模块地址: https://github.com/apache/rocketmq-client-python
使用ubuntu系统启动 生产者 消费者脚本
安装模块 1 2 3 wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb pip install rocketmq-client-python
生产者脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # producer.py from rocketmq.client import Producer, Message import time def main(): producer = Producer('producer_group_test') producer.set_name_server_address('10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876') # 替换为你的 NameServer 地址 producer.start() num = 0 while True: msg = Message('myTopic') msg.set_keys('key%s' % num ) msg.set_tags('tag%s' % num) msg.set_body('Hello RocketMQ %s' % num) ret = producer.send_sync(msg) print(f'Send Status: {ret.status}, Message ID: {ret.msg_id}') num += 1 time.sleep(1) producer.shutdown() if __name__ == "__main__": main()
消费者脚本 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # consumer.py from rocketmq.client import PushConsumer, ConsumeStatus def callback(msg): print(f'Received message: {msg.body.decode("utf-8")}') return ConsumeStatus.CONSUME_SUCCESS def main(): consumer = PushConsumer('consumer_group_test') consumer.set_name_server_address('10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876') # 替换为你的 NameServer 地址 consumer.subscribe('myTopic', callback) consumer.start() print('Consumer started. Waiting for messages...') import time while True: time.sleep(3600) # Keep the consumer running if __name__ == "__main__": main()
创建topic
程序默认不会创建 需要手动创建完 执行脚本
验证 启动生产者
启动消费者
生产者详情
消费详情
集群故障演练 模拟KsNode05-YanLian 一个master节点下线 正常状态
下线KsNode05-YanLian的master 节点
1 2 ssh 10.6.230.74 supervisorctl stop mqbroker-4.9.2 # 下线
集群状态
消费者消费正常 且id一直在自增 说明生产者也在正常工作 (topic同时注册到了Node5、Node6中)
模拟KsNode05-YanLian 一个master节点下线后 再下线新提权的master 集群状态
下线KsNode05-YanLian的master 节点
1 2 ssh 10.6.28.242 supervisorctl stop mqbroker-4.9.2
集群状态
KsNode05-YanLian组的两个master 全部下线后,slave 不会再提权为master了 这里的slave生产总数不再增加了 说明这组集群已经失去了 写入的能力, 没有了写入 自然也不会有读取的数据 这组集群基本失去工作能力了
消费者消费正常 且id一直在自增 说明生产者也在正常工作(topic同时注册到了Node5、Node6中)
模拟KsNode05-YanLian 一个master节点下线后 再下线一个slave 集群状态
下线KsNode05-YanLian的slave 节点
1 2 ssh 10.6.230.74 supervisorctl stop mqbroker-4.9.2
集群状态
10.6.174.144从原来的master 自动被降级到了slave 集群已经失去了 写入的能力, 没有了写入 自然也不会有读取的数据 这组集群基本失去工作能力了
总结 Topic创建时应选择 注册到所有的组中
一组集群中至少需要存活两个broker 当一组中只剩下一个broker 这组集群就丧失了 写入的能力
多组集群同时挂掉2个broker rocketmq不可用 生产者无法再生产数据 由于slave还在线 broker中已有的数据还可正常消费
Topic导出导入 Rocketmq Topic 导出(4.x) 命令导出为集群内所有topic 导入到别的集群中 应删除系统自建的Topic
1 /usr/local/rocketmq-4.9.2/bin/mqadmin topicList
自建集群创建topic(4.x)
-c 指定集群名称
-n 指定nameserver 地址
-t 指定topic名称
-r 队列读取数量
-w 队列写入数量
1 mqadmin updateTopic -c KsStoryClusterYanLian -n '10.6.28.242:9876;10.6.230.74:9876;10.6.174.144:9876;10.6.40.117:9876;10.6.120.17:9876;10.6.119.79:9876' -t gph -r 16 -w 16
阿里云导入topic 调研 (rocketmq 5.x) 阿里云支持一键导入功能 需要准备 topicName****MessageType Remark
默认5.0开始 开启了强制校验功能 每个topic 只允许发送一种消息类型的消息 为兼容4.x 需要关闭校验消息类型