提问5要素
主题 + 数量 + 细节 + 方式 + 格式
有用的提示语
给我一点惊喜
一步一步思考
输出格式
1 | 用简洁的,通俗易懂的语言告诉新手: |
语法汇总:
| ^word | 以word为行首的那一行 |
|---|---|
| word$ | 以word为行尾的那一行 |
| . | 代表一个任意字符 |
| * | 代表前面的字符重复0次或多次,注意可以是0次 |
| \ | 转义 |
| [多个字符] | 代表多个字符中的一个,或的关系 |
| [a-z] | -代表返回,比如a-z表示全部小写字母,A-Z代表所有大写字母。 |
| [^字符] | 取反,不包含这个字符。注意^在[]里和[]外的不同含义 |
\{n,m\} |
{}里加次数,表示重复指定次数。\{n,m\}是重复n到m次。\{n,\}是重复n次以上 |
另外,注意区分正则表达式和通配符的区别。强调*这个字符
在正则中表示重复前面的字符0次或多次
在通配符中表示任意字符
举例来说。(ls命令是不支持正则的,grep,sed支持)
1 | ls -l a* |
是取出以a开头的所有文件
1 | grep 'a*' 某文件 |
会把文件中所有的数据全部取出,因为正则中*可以表示0次,所以a*的含义就是所有字符,而不是以a开头的意思
上面是正则的基本语法,除此之外,正则还有延申的特殊参数
要使用的话用grep -E,不过更常用的是egrep
| + | 代表前面的字符重复1次或多次,注意和*的区别 |
|---|---|
| ? | 0次或1次 |
| | | 或的含义 |
| () | 分组 |
| ()+ | 多个重复分组 |
sed当管道使用
sed是一个管道命令。
sed后接动作要用单引号括住
1 | sed '2,5d' |
2,5表示对第二行到第五行进行动作。d表示删除动作
可以接的动作如下
d删除。前面加上行号。如上所示,如果要实现从某一行删除到文件结束sed '2,$d'
a 新增。后面加上内容。在当前行的下一行新增内容
1 | [api@kfxqtyglpt ~]$ cat test.txt |
sed也是管道操作,所以直接输出到了控制台上
i在前面新增。接的内容插入在上一行
1 | [api@kfxqtyglpt ~]$ cat test.txt | sed '1i helphelp' |
p 显示
我们要显示一个文件的第10到20行,可以用
1 | head -n 20 | tail -n 10 |
现在有了sed,可以直接
1 | sed -n '10,20p' |
s 搜索并替换,这是sed很常用的功能
1 | sed 's/old/new/g' |
old部分可以用正则表达式
c按行取代
1 | [api@kfxqtyglpt ~]$ cat test.txt |
比较危险,使用时先用管道场景进行测试
1 | sed -i '$a the last line' test.txt |
加上-i参数就是直接修改文件,为不是输出到console了
$是最后一行,$a就是在最后以后后新增的意思了
将一行数据分成多个段来处理
格式
1 | awk '条件1{动作1} 条件2{动作2}...' 文件名 |
awk的处理流程是:
变量$0是一整行的数据,$1是第一分段数据
awk的内建变量:
NF:每一行($0)拥有的分段总数
NR:当前awk是在处理第几行
FS:当前的分隔符
通过FS=”:”可以设置分隔符为:
如
1 | $ last -n 5 |egrep -v '^$|wtmp' | awk '{FS=":"} {print $1 "\t all:" NF "\t now: " NR}' |
但是这样有个问题,就是第一行在执行的时候,分隔符还是空格,所以第一行的结果有问题
解决办法是使用关键字BEGIN
1 | [api@kfxqtyglpt ~]$ last -n 5 |egrep -v '^$|wtmp' | awk 'BEGIN {FS=":"} {print $1 "\t all:" NF "\t now: " NR}' |
awk中可以使用条件运算符:>,<,>=,<=,==,!=
1 | [api@kfxqtyglpt ~]$ last -n 5 |egrep -v '^$|wtmp' | awk 'BEGIN {FS=":"} {printf $1 "\t all:" NF "\t now: " NR} NR>3 {printf "\t more than 3 \n"} NR<=3 {printf "\n"}' |
对大于3的行添加了一个输出。注意我这里使用了printf,要自己加上\n换行
1 | vim hello.sh |
1 | #!/bin/bash |
执行
1 | sh hello.sh |
注意,现在用
1 | ./hello.sh |
会执行失败。
因为我们创建的文件并没有x可执行权限。而sh hello.sh中我们的sh是作为sh的参数传入,因此只要hello.sh有r读权限即可
1 | [api@kfxqtyglpt bin]$ ll |
可以加上x权限,再执行
1 | chmod a+x hello.sh |
定义变量
使用变量
(见上一篇文章)
sh脚本中也是使用$()来执行linux的命令
1 | $(seq 1 100) |
执行seq命令,生产1到100的数组,可用在for循环中
很多场景下需要用户输入一些信息,此时可使用read命令
1 | #!/bin/bash |
根据当天的日期创建文件,使用到date命令
1 | #!/bin/bash |
$()用于执行命令
我们需要根据一个文件是否存在,从而执行不同的动作
用到test命令
1 | #!/bin/bash |
test命令有很多参数,只记录一些常用的,
| -e | (文件或目录)是否存在 |
|---|---|
| -f | 是否存在并且是文件 |
| -d | 是否存在并且是目录 |
| -r | 是否存在并且有读权限 |
| -w | 是否存在并且有写权限 |
| -x | 是否存在并且有可执行权限 |
| -s | 是否存在并且为非空白文件 |
| -nt | newer than,test file1 -nt file2 判断file1是否比file2新 |
| -ot | older than, test file1 -ot file2 判断file1是否比file2旧 |
| -ef | 判断两个文件是否是同一个文件,最终是根据两个文件是否指向同一个inode |
| -eq | 判断两个整数相等 |
| -gt | 大于 |
| -lt | 小于 |
| -ge | 大于等于 |
| -le | 小于等于 |
| -z | 判断是否是空字符串,是空字符串,返回true |
| -n | 判断是否是空字符串,不是空字符串,返回true |
| == | test str1 == str2,判断两个字符串相等 |
| != | 判断字符串不相等 |
| ! | 取反,test ! -e file,如果文件不存在返回true |
| -a | and同时成立,test -r file -a -w file,同时有读和写权限 |
| -o | or或者,test -r file -a -w file,同时有读或写权限都返回true |
上面的test命令也可以用判断符[]来代替。bash中的判断符就是中括号
1 | [ "${param}" == "xxx" ] |
格式如上,表示判断param变量和xxx字符串是否相等
需要特别注意的是[]里必须跟着空格,==这类判断符号两边也必须跟着空格
变量${}要用双引号包裹
常量最好用单引号或双引号包裹
[] 判断符常用在if…then…if的条件语法中
这基本是必用的功能,我们执行脚本时经常在脚本后直接跟上参数,那我们自己编写的脚本怎么接收参数?
默认参数变量
在shell脚本中已经默认使用如$0,$1作为接收的参数变量,具体如下
| $0 | 代表执行的脚本文件名。 |
|---|---|
| $1 | 后接的第一个参数,同理$2就是第二个参数,依次类推 |
| $# | 代表一共的参数个数 |
| $@ | 全部变量内容 |
$0,$1等也可以加上大括号${0},${1}
1 | [api@kfxqtyglpt bin]$ cat param.sh |
1 | [api@kfxqtyglpt bin]$ ./param.sh one two |
sh脚本中条件判断的写法
1 | if [ 判断表达式 ]; then |
这是虽简单的单层判断
[ 判断表达式 ] 可以使用&&或||多个相连
1 | if [ 判断表达式 ]; then |
也是使用else关键字
再复杂一点
1 | if [ 判断表达式1 ]; then |
sh脚本中也有case语法
1 | case $变量 in |
注意,
sh脚本中的循环也有两种形式,while和for
语法格式
1 | while [ 条件 ] |
sh脚本里还有一个until…do…done的语法,
while的含义是当条件成立就do
until的含义是do直到条件成立,也就是说当条件成立了就不做
for in的语法
1 | for var in 多个变量,比如数组 |
举个例子
1 | #!/bin/bash |
效果
1 | [api@kfxqtyglpt bin]$ ./for.sh |
上面的$(seq 1 10)也可以用{1..10}来代替。这是bash的内置机制。
两个..表示连续出现的意思
{a..d}就表示a,b,c,d
sh脚本中只要是列表的都可进行for循环
1 | for finename in $(ls 目录) |
和其他编程语言一样,除了for in,还有
for 数字
1 | for ((初始值;限制条件;步进)) |
举个例子
1 | #!/bin/bash |
$(())双括号是数值计算
定义函数的语法
1 | function 函数名() { |
很标准的语法。
需要注意的是:
函数的定义必须放在函数的使用之前,否则找不到
1 | #!/bin/bash |
使用函数时直接用函数名,不需要加括号
1 | [api@kfxqtyglpt bin]$ ./func.sh one |
myprint后面的;,只是为了分开两个命令,后面的echo是另一个命令
完全也可以将echo语句换一行,效果是一样的
1 | #!/bin/bash |
函数也可以接收参数,参数也是$0,$1,$2等,$0代表函数名,$1是第一个参数。
注意这个参数和整个sh脚本接收的参数不是一回事。
这个是函数的参数。
比如将上面的例子改一下
1 | #!/bin/bash |
编写sh脚本经常会遇到语法问题,有一个参数可以帮我们进行语法校验
1 | $sh -n xxx.sh |
sh使用-n参数可以进行sh脚本的语法校验
还有其他参数
1 | sh -x xxx.sh |
-x参数可以将执行过程全部打印出来
[TOC]
一共有10种,常用的有5种
String字符串
Hash 字典
List 列表
Set集合
ZSet 有序集合
Pubsub 发布订阅 (不推荐使用,坑很多)
Bitmap 位图
GEO 地理位置 (有限使用,附近的人)
Stream 流(5.0) (与Kafka非常像)
Hyperloglog 基数统计,HyperLogLog 提供不精确的去重计数方案
redis提供了非常丰富的集群模式:主从、哨兵、cluster,满足服务高可用的需求。
其中cluster模式为目前大多数公司所采用的方式。redis cluster是redis亲生的集群方案,它的主要特点就是去中心化,无需proxy代理。其中一个主要设计目标就是达到线性可扩展性。

不再需要额外的Sentinel集群,为使用者提供了一致的方案,减少了学习成本。
去中心架构,节点对等,集群可支持上千个节点。
副本功能能够实现自动故障转移,大部分情况下无需人工介入。
可水平线性扩展
redis cluster采用虚拟槽的概念,把所有的key映射到 0~16383一共16384个整数槽内,当需要在其中存取一个key时,redis客户端会首先对这个key采用crc16算法算出一个值,然后对这个值进行mod操作。
1 | crc16(key)mod 16384 |

redis cluster所有的集群操作都是围绕slot进行。所以必须存储slot的存储关系。
redis节点发送心跳包时,需要把所有的槽信息放在这个心跳包里,所以必须优化使数据量最小。
使用使用bitmap来存储是最节省空间的。这个数组的长度为 16384/8=2048 Byte,2K字节。

请求随机落到一个主节点,该节点检查key是否在自己负责的slot上,是就处理。不是就根据信息将请求转发到对应的节点上。所以,客户端连接集群中的任意一台机器,都能够完成操作。
当数据库中的16384个槽都有节点在处理时,集群处于上线状态(ok);相反地,如果数据库中有任何一个槽没有得到处理,那么集群处于下线状态(fail)。(可以配置,使集群强制可读)
redis-cluster可以自动完成一定程度的故障转移。
集群中的每个节点都会定期地向集群中的其他节点发送ping消息,以此来检测对方是否在线,如果接收ping消息的节点没有在规定的时间内返回pong消息,那么发送ping消息的节点就会将接收ping消息的节点标记为疑似下线(PFAIL)。
如果在一个集群里面,半数以上节点都将某个主节点x报告为疑似下线,那么这个主节点x将被标记为已下线(FAIL),将x标记为FAIL的节点会向集群广播一条关于x的FAIL消息,所有收到这条FAIL消息的节点都会立即将x标记为FAIL。
大家可以注意到这个过程,与es和zk的节点判断类似,都是半数以上才进行判断,所以主节点的数量一般都是奇数。
当一个节点发现自己的主节点进入fail状态,将会从这个主节点的从节点当中,选出一台,执行slaveof no one命令,变身为主节点。
新的节点完成自己的槽指派以后,会向集群广播一条pong消息,以便让其他节点立即知道自己的这些变化。它告诉别人:我已经是主节点了,我已经接管了有问题的节点,成为了新的主节点。
当一台从机连接到master之后,会发送一个sync指令。master在收到这个指令后,会在后台启动存盘进程。执行完毕后,master将整个数据库文件传输到slave,这样就完成了第一次全量同步。
接下来,master会把自己收到的变更指令,依次传送给slave,从而达到数据的最终同步。从redis 2.8开始,就支持主从复制的断点续传,如果主从复制过程中,网络连接断掉了,那么可以接着上次复制的地方,继续复制下去,而不是从头开始复制一份。
redis cluster中节点之间使用异步复制,并没有类似kafka这种ack的概念。节点之间通过gossip协议交换状态信息,用投票机制完成Slave到Master的角色提升,完成这个过程注定了需要时间。在发生故障的过程中就容易存在窗口,导致丢失写入的数据。比如以下两种情况。
一、命令已经到到master,此时数据并没有同步到slave,master会对客户端回复ok。如果这个时候主节点宕机,那么这条数据将会丢失。redis这样做会避免很多问题,但对一个对数据可靠性要求较高的系统,是不可忍受的。
二、由于路由表是在客户端存放的,存在一个时效问题。如果分区导致一个节点不可达,提升了某个从节点,但原来的主节点在这个时候又可以用了(并未完成failover)。这个时候一旦客户端的路由表并没有更新,那么它将会把数据写到错误的节点,造成数据丢失。
所以redis cluster在通常情况下运行的很好,在极端情况下某些值丢失问题,目前无解。
1、redis cluster号称能够支持1k个节点,但你最好不要这么做。当节点数量增加到10,就能够感受到集群的一些抖动。
2、一定要避免产生热点,如果流量全部打到了某个节点,后果一般很严重。
3、大key不要放redis,它会产生大量的慢查询,影响正常的查询。
4、如果你不是作为存储,缓存一定要设置过期时间。占着茅坑不拉屎的感觉是非常讨厌的。
5、大流量,不要开aof,开rdb即可。
6、redis cluster的操作,少用pipeline,少用multi-key,它们会产生大量不可预料的结果。

redis最早支持的,就是M-S模式,也就是一主多从。redis单机qps可达到10w+,但是在某些高访问量场景下,依然不太够用。一般通过读写分离来增加slave,减少主机的压力。
数据同步问题

先进行全量同步,再进行增量同步。主从同步是异步进行的。
要注意的一个点是:
run_id是master唯一标示,slave连接master时会传runid,master每次重启runid都发生变化,当slave发现master的runid变化时都会触发全量复制流程。

哨兵模式是主从的升级版,因为主从的出现故障后,不会自动恢复,需要人为干预。所以在主从的基础上,实现哨兵模式就是为了监控主从的运行状况,对主从的健壮进行监控。并且当master出现故障的时候,会自动选举一个slave作为master顶上去。
当master被认为客观下线后,首先需要在哨兵中选出一个老大哨兵进行故障恢复。选举老大哨兵的算法还是Raft算法
选出大佬哨兵后,大佬哨兵就会对故障进行自动恢复,即从slave中选出一名slave作为master
slave-priority优先级最高的会被选中。
代理模式在redis cluster出现之前,非常流行,比如codis,predixy。
redis提供了两种持久化方式:aof和rdb,常用的是rdb。RDB 和 AOF 两种方式也可以同时使用,也可以都不用,也可以只用一个。
RDB
Redis DataBase。简而言之,就是在不同的时间点,将 redis 存储的数据生成快照并存储到磁盘上;
AOF
则是换了一个角度来实现持久化,那就是将 redis 执行过的所有写指令记录下来,在下次 redis 重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了
对于 RDB 方式,redis 会单独创建(fork)一个子进程来进行持久化,而主进程是不会进行任何 IO 操作的,这样就确保了 redis 极高的性能。
RDB 有不少优点,但它的缺点也是不容忽视的。
即使你每 5 分钟都持久化一次,当 redis 故障时,仍然会有近 5 分钟的数据丢失
默认的 AOF 持久化策略是每秒钟 fsync 一次(fsync 是指把缓存中的写指令记录到磁盘中),
因为在这种情况下,redis 仍然可以保持很好的处理性能,即使 redis 故障,也只会丢失最近 1 秒钟的数据。
也可以配置每次执行命令都fsync一次。
因为采用了追加方式,如果不做任何处理的话,AOF 文件会变得越来越大,为此,redis 提供了 AOF 文件重写(rewrite)机制,即当 AOF 文件的大小超过所设定的阈值时,redis 就会启动 AOF 文件的内容压缩
rdb的优点是文件小,恢复快。但是存在数据丢失较多的风险。
aof的有点是数据丢失风险较小,缺点是文件较大。
为什么有一致性问题?
建议使用:Cache Aside Pattern
读请求:
变更操作:
不要更新缓存,直接淘汰缓存
先操作数据库,再 淘汰 缓存
涉及到复杂的事务和回滚操作,可以把淘汰放在finally里。
影响,轻微。
高流量下 大量请求读取一个失效的Key -> Redis Miss -> 穿透到DB
解决方式:采用分布式锁,只有拿到锁的第一个线程去请求数据库,然后插入缓存
影响,一般。
故意访问一个不存在的Key(恶意攻击)-> Redis Miss -> 穿透到DB
解决方式:
影响:严重。
大量Key同时失效 | Redis当机 -> Redis Miss -> 压力打到DB
解决方式:
monitor指令 回显所有执行的指令。可以使用grep配合过滤keyspace-events 订阅某些Key的事件。比如,删除某条数据的事件,底层实现基于pubsubslow log 顾名思义,慢查询,非常有用--bigkeys启动参数 Redis大Key健康检查。使用的是scan的方式执行, 不用担心阻塞memory usage key、memory stats 指令info指令,关注instantaneous_ops_per_sec、used_memory_human、connected_clientsredis-rdb-tools rdb线下分析使用 keys * 把库堵死,——使用别名把这个命令改名
rename
超过最大内存(maxmemory)后,部分数据被删除——这个有删除策略的,选择适合自己的即可
volatile-lru 从设置过期数据集里查找最近最少使用
volatile-ttl 从设置过期的数据集里面优先删除剩余时间短的Key
volatile-random 从设置过期的数据集里面任意选择数据淘汰
volatile-lfu 从过期的数据集里删除 最近不常使用 的数据淘汰
allkeys-lru
allkeys-lfu
allkeys-random
no-enviction 禁止删除,默认策略
没开持久化,却重启了实例,数据全掉
记得非缓存的信息需要打开持久化
RDB 的持久化需要 vm.overcommit_memory=1,否则有可能会持久化失败
没有持久化情况下,主从,主重启太快,从还没认为主挂的情况下,从会清空自己的数据——人为重启主节点前,先关闭从节点的同步
maxmemory,Redis将一直使用内存,直到触发操作系统的OOM-KILLER。虽然 Redis支持持久化,但将所有数据存储在 Redis 中,成本非常昂贵。建议将热数据 (如 QPS超过 5k) 的数据加载到 Redis 中。低频数据可存储在 Mysql、 ElasticSearch中。
不要将不相关的数据业务都放到一个 Redis中。一方面避免业务相互影响,另一方面避免单实例膨胀,并能在故障时降低影响面,快速恢复。
由于 Redis 是单线程服务,消息过大会阻塞并拖慢其他操作。保持消息内容在 1KB 以下是个好的习惯。严禁超过 50KB 的单条记录。消息过大还会引起网络带宽的高占用,持久化到磁盘时的 IO 问题。
连接的频繁创建和销毁,会浪费大量的系统资源,极限情况会造成宿主机当机。请确保使用了正确的 Redis 客户端连接池配置。
作为缓存使用的 Key,必须要设置失效时间。失效时间并不是越长越好,请根据业务性质进行设置。注意,失效时间的单位有的是秒,有的是毫秒,这个很多同学不注意容易搞错。
缓存应该仅作缓存用,去掉后业务逻辑不应发生改变,万不可切入到业务里。第一,缓存的高可用会影响业务;第二,产生深耦合会发生无法预料的效果;第三,会对维护行产生负效果。
小应用就算了
如单 redis 集群并不能为你的数据服务,不要着急扩大你的 redis 集群(包括 M/S 和 Cluster),集群越大,在状态同步和持久化方面的性能越差。 优先使用客户端 hash 进行集群拆分。如:根据用户 id 分 10 个集群,用户尾号为 0 的落在第一个集群。
Redis 的 Key 一定要规范,这样在遇到问题时,能够进行方便的定位。Redis 属于无 scheme 的 KV 数据库,所以,我们靠约定来建立其 scheme 语义。其好处:
1、能够根据某类 key 进行数据清理
2、能够根据某类 key 进行数据更新
3、能够方面了解到某类 key 的归属方和应用场景
4、为统一化、平台化做准备,减少技术变更
一般,一个 key 需要带以下维度:业务、key 用途、变量等,各个维度使用 : 进行分隔
Keys 命令效率极低,属于 O(N)操作,会阻塞其他正常命令,在 cluster 上,会是灾难性的操作。严禁使用,DBA 应该 rename 此命令,从根源禁用。
flush 命令会清空所有数据,属于高危操作。严禁使用,DBA 应该 rename 此命令,从根源禁用,仅 DBA 可操作。
如没有非常特殊的需求,严禁将 Redis 当作消息队列使用。Redis 当作消息队列使用,会有容量、网络、效率、功能方面的多种问题。如需要消息队列,可使用高吞吐的 Kafka 或者高可靠的 RocketMQ。
redis 那么快,慢查询除了网络延迟,就属于这些批量操作函数。大多数线上问题都是由于这些函数引起。
1、[zset] 严禁对 zset 的不设范围操作
ZRANGE、 ZRANGEBYSCORE等多个操作 ZSET 的函数,严禁使用 ZRANGE myzset 0 -1 等这种不设置范围的操作。请指定范围,如 ZRANGE myzset 0 100。如不确定长度,可使用 ZCARD 判断长度
2、[hash] 严禁对大数据量 Key 使用 HGETALL
HGETALL会取出相关 HASH 的所有数据,如果数据条数过大,同样会引起阻塞,请确保业务可控。如不确定长度,可使用 HLEN 先判断长度
3、[key] Redis Cluster 集群的 mget 操作
Redis Cluster 的 MGET 操作,会到各分片取数据聚合,相比传统的 M/S架构,性能会下降很多,请提前压测和评估
4、[其他] 严禁使用 sunion, sinter, sdiff等一些聚合操作
select函数用来切换 database,对于使用方来说,这是很容易发生问题的地方,cluster 模式也不支持多个 database,且没有任何收益,禁用。
redis 本身已经很快了,如无大的必要,建议捕获异常进行回滚,不要使用事务函数,很少有人这么干。
lua 脚本虽然能做很多看起来很 cool 的事情,但它就像是 SQL 的存储过程,会引入性能和一些难以维护的问题,禁用。
monitor函数可以快速看到当前 redis 正在执行的数据流,但是当心,高峰期长时间阻塞在 monitor 命令上,会严重影响 redis 的性能。此命令不禁止使用,但使用一定要特别特别注意。
基于内存
单线程,没有cpu的上下文切换,无锁
io多路复用
##1.1 为什么要用MQ
消息队列是一种“先进先出”的数据结构

其应用场景主要包含以下3个方面
系统的耦合性越高,容错性就越低。以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验。

使用消息队列解耦合,系统的耦合性就会提高了。比如物流系统发生故障,需要几分钟才能来修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统回复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障。


应用系统如果遇到系统请求流量的瞬间猛增,有可能会将系统压垮。有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验。

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,这会影响用户体验,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这样总不能下单体验要好。
处于经济考量目的:
业务系统正常时段的QPS如果是1000,流量最高峰是10000,为了应对流量高峰配置高性能的服务器显然不划算,这时可以使用消息队列对峰值流量削峰

通过消息队列可以让数据在多个系统更加之间进行流通。数据的产生方不需要关心谁来使用数据,只需要将数据发送到消息队列,数据使用方直接在消息队列中直接获取数据即可

优点:解耦、削峰、数据分发
缺点包含以下几点:
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
如何保证MQ的高可用?
系统复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败。
如何保证消息数据处理的一致性?
常见的MQ产品包括Kafka、ActiveMQ、RabbitMQ、RocketMQ。

RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,RocketMQ承接了例如“双11”等高并发场景的消息流转,能够处理万亿级别的消息。
RocketMQ最新版本:4.5.1
Linux64位系统
JDK1.8(64位)
源码安装需要安装Maven 3.2.x
本教程以二进制包方式安装
1 | # 1.启动NameServer |
1 | # 1.启动Broker |
问题描述:
RocketMQ默认的虚拟机内存较大,启动Broker如果因为内存不足失败,需要编辑如下两个配置文件,修改JVM内存大小
1 | # 编辑runbroker.sh和runserver.sh修改默认JVM大小 |
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
1 | # 1.设置环境变量 |
1 | # 1.设置环境变量 |
1 | # 1.关闭NameServer |

NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下:
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
消息高可用采用2m-2s(同步双写)方式

| 序号 | IP | 角色 | 架构模式 |
|---|---|---|---|
| 1 | 192.168.25.135 | nameserver、brokerserver | Master1、Slave2 |
| 2 | 192.168.25.138 | nameserver、brokerserver | Master2、Slave1 |
1 | vim /etc/hosts |
配置如下:
1 | # nameserver |
配置完成后, 重启网卡
1 | systemctl restart network |
宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单粗暴的方式是直接关闭防火墙
1 | # 关闭防火墙 |
或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:
nameserver 默认使用 9876 端口master 默认使用 10911 端口slave 默认使用11011 端口执行以下命令:
1 | # 开放name server默认端口 |
1 | vim /etc/profile |
在profile文件的末尾加入如下命令
1 | #set rocketmq |
输入:wq! 保存并退出, 并使得配置立刻生效:
1 | source /etc/profile |
1 | mkdir /usr/local/rocketmq/store |
服务器:192.168.25.135
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties |
修改配置如下:
1 | #所属集群名字 |
服务器:192.168.25.135
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties |
修改配置如下:
1 | #所属集群名字 |
服务器:192.168.25.138
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties |
修改配置如下:
1 | #所属集群名字 |
服务器:192.168.25.138
1 | vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties |
修改配置如下:
1 | #所属集群名字 |
1 | vi /usr/local/rocketmq/bin/runbroker.sh |
需要根据内存大小进行适当的对JVM参数进行调整:
1 | #=================================================== |
####2)runserver.sh
1 | vim /usr/local/rocketmq/bin/runserver.sh |
1 | JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" |
分别在192.168.25.135和192.168.25.138启动NameServer
1 | cd /usr/local/rocketmq/bin |
master1:
1 | cd /usr/local/rocketmq/bin |
slave2:
1 | cd /usr/local/rocketmq/bin |
master2
1 | cd /usr/local/rocketmq/bin |
slave1
1 | cd /usr/local/rocketmq/bin |
启动后通过JPS查看启动进程

1 | # 查看nameServer日志 |
进入RocketMQ安装位置,在bin目录下执行./mqadmin {command} {args}
###3.4.2 命令介绍
####1)Topic相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateTopic | 创建更新Topic配置 | -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port |
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询) | ||
| -h- | 打印帮助 | ||
| -n | NameServer服务地址,格式 ip:port | ||
| -p | 指定新topic的读写权限( W=2|R=4|WR=6 ) | ||
| -r | 可读队列数(默认为 8) | ||
| -w | 可写队列数(默认为 8) | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
| deleteTopic | 删除Topic | -c | cluster 名称,表示删除某集群下的某个 topic (集群 可通过 clusterList 查询) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic 名称(名称只能使用字符 ^[a-zA-Z0-9_-]+$ ) | ||
| topicList | 查看 Topic 列表信息 | -h | 打印帮助 |
| -c | 不配置-c只返回topic列表,增加-c返回clusterName, topic, consumerGroup信息,即topic的所属集群和订阅关系,没有参数 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicRoute | 查看 Topic 路由信息 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicStatus | 查看 Topic 消息队列offset | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| topicClusterList | 查看 Topic 所在集群列表 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| updateTopicPerm | 更新 Topic 读写权限 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -b | Broker 地址,表示 topic 所在 Broker,只支持单台Broker,地址为ip:port | ||
| -p | 指定新 topic 的读写权限( W=2|R=4|WR=6 ) | ||
| -c | cluster 名称,表示 topic 所在集群(集群可通过 clusterList 查询),-b优先,如果没有-b,则对集群中所有Broker执行命令 | ||
| updateOrderConf | 从NameServer上创建、删除、获取特定命名空间的kv配置,目前还未启用 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic,键 | ||
| -v | orderConf,值 | ||
| -m | method,可选get、put、delete | ||
| allocateMQ | 以平均负载算法计算消费者列表负载消息队列的负载结果 | -t | topic 名称 |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | ipList,用逗号分隔,计算这些ip去负载Topic的消息队列 | ||
| statsAll | 打印Topic订阅关系、TPS、积累量、24h读写总量等信息 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -a | 是否只打印活跃topic | ||
| -t | 指定topic |
####2)集群相关
| 名称 | 含义 | 命令选项 | 说明 |
| clusterList | 查看集群信息,集群、BrokerName、BrokerId、TPS等信息 | -m | 打印更多信息 (增加打印出如下信息 #InTotalYest, #OutTotalYest, #InTotalToday ,#OutTotalToday) |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | 打印间隔,单位秒 | ||
| clusterRT | 发送消息检测集群各Broker RT。消息发往${BrokerName} Topic。 | -a | amount,每次探测的总数,RT = 总时间 / amount |
| -s | 消息大小,单位B | ||
| -c | 探测哪个集群 | ||
| -p | 是否打印格式化日志,以|分割,默认不打印 | ||
| -h | 打印帮助 | ||
| -m | 所属机房,打印使用 | ||
| -i | 发送间隔,单位秒 | ||
| -n | NameServer 服务地址,格式 ip:port |
####3)Broker相关
| 名称 | 含义 | 命令选项 | 说明 |
| updateBrokerConfig | 更新 Broker 配置文件,会修改Broker.conf | -b | Broker 地址,格式为ip:port |
| -c | cluster 名称 | ||
| -k | key 值 | ||
| -v | value 值 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerStatus | 查看 Broker 统计信息、运行状态(你想要的信息几乎都在里面) | -b | Broker 地址,地址为ip:port |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| brokerConsumeStats | Broker中各个消费者的消费情况,按Message Queue维度返回Consume Offset,Broker Offset,Diff,TImestamp等信息 | -b | Broker 地址,地址为ip:port |
| -t | 请求超时时间 | ||
| -l | diff阈值,超过阈值才打印 | ||
| -o | 是否为顺序topic,一般为false | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| getBrokerConfig | 获取Broker配置 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| wipeWritePerm | 从NameServer上清除 Broker写权限 | -b | Broker 地址,地址为ip:port |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| cleanExpiredCQ | 清理Broker上过期的Consume Queue,如果手动减少对列数可能产生过期队列 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| cleanUnusedTopic | 清理Broker上不使用的Topic,从内存中释放Topic的Consume Queue,如果手动删除Topic会产生不使用的Topic | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker 地址,地址为ip:port | ||
| -c | 集群名称 | ||
| sendMsgStatus | 向Broker发消息,返回发送状态和RT | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | BrokerName,注意不同于Broker地址 | ||
| -s | 消息大小,单位B | ||
| -c | 发送次数 |
####4)消息相关
| 名称 | 含义 | 命令选项 | 说明 |
| queryMsgById | 根据offsetMsgId查询msg,如果使用开源控制台,应使用offsetMsgId,此命令还有其他参数,具体作用请阅读QueryMsgByIdSubCommand。 | -i | msgId |
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByKey | 根据消息 Key 查询消息 | -k | msgKey |
| -t | Topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByOffset | 根据 Offset 查询消息 | -b | Broker 名称,(这里需要注意 填写的是 Broker 的名称,不是 Broker 的地址,Broker 名称可以在 clusterList 查到) |
| -i | query 队列 id | ||
| -o | offset 值 | ||
| -t | topic 名称 | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| queryMsgByUniqueKey | 根据msgId查询,msgId不同于offsetMsgId,区别详见常见运维问题。-g,-d配合使用,查到消息后尝试让特定的消费者消费消息并返回消费结果 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -i | uniqe msg id | ||
| -g | consumerGroup | ||
| -d | clientId | ||
| -t | topic名称 | ||
| checkMsgSendRT | 检测向topic发消息的RT,功能类似clusterRT | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -a | 探测次数 | ||
| -s | 消息大小 | ||
| sendMessage | 发送一条消息,可以根据配置发往特定Message Queue,或普通发送。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -p | body,消息体 | ||
| -k | keys | ||
| -c | tags | ||
| -b | BrokerName | ||
| -i | queueId | ||
| consumeMessage | 消费消息。可以根据offset、开始&结束时间戳、消息队列消费消息,配置不同执行不同消费逻辑,详见ConsumeMessageCommand。 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -b | BrokerName | ||
| -o | 从offset开始消费 | ||
| -i | queueId | ||
| -g | 消费者分组 | ||
| -s | 开始时间戳,格式详见-h | ||
| -d | 结束时间戳 | ||
| -c | 消费多少条消息 | ||
| printMsg | 从Broker消费消息并打印,可选时间段 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -d | 是否打印消息体 | ||
| printMsgByQueue | 类似printMsg,但指定Message Queue | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -t | topic名称 | ||
| -i | queueId | ||
| -a | BrokerName | ||
| -c | 字符集,例如UTF-8 | ||
| -s | subExpress,过滤表达式 | ||
| -b | 开始时间戳,格式参见-h | ||
| -e | 结束时间戳 | ||
| -p | 是否打印消息 | ||
| -d | 是否打印消息体 | ||
| -f | 是否统计tag数量并打印 | ||
| resetOffsetByTime | 按时间戳重置offset,Broker和consumer都会重置 | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | 消费者分组 | ||
| -t | topic名称 | ||
| -s | 重置为此时间戳对应的offset | ||
| -f | 是否强制重置,如果false,只支持回溯offset,如果true,不管时间戳对应offset与consumeOffset关系 | ||
| -c | 是否重置c++客户端offset |
| 名称 | 含义 | 命令选项 | 说明 |
| consumerProgress | 查看订阅组消费状态,可以查看具体的client IP的消息积累量 | -g | 消费者所属组名 |
| -s | 是否打印client IP | ||
| -h | 打印帮助 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| consumerStatus | 查看消费者状态,包括同一个分组中是否都是相同的订阅,分析Process Queue是否堆积,返回消费者jstack结果,内容较多,使用者参见ConsumerStatusSubCommand | -h | 打印帮助 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -g | consumer group | ||
| -i | clientId | ||
| -s | 是否执行jstack | ||
| getConsumerStatus | 获取 Consumer 消费进度 | -g | 消费者所属组名 |
| -t | 查询主题 | ||
| -i | Consumer 客户端 ip | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| updateSubGroup | 更新或创建订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| -s | 分组是否允许消费 | ||
| -m | 是否从最小offset开始消费 | ||
| -d | 是否是广播模式 | ||
| -q | 重试队列数量 | ||
| -r | 最大重试次数 | ||
| -i | 当slaveReadEnable开启时有效,且还未达到从slave消费时建议从哪个BrokerId消费,可以配置备机id,主动从备机消费 | ||
| -w | 如果Broker建议从slave消费,配置决定从哪个slave消费,配置BrokerId,例如1 | ||
| -a | 当消费者数量变化时是否通知其他消费者负载均衡 | ||
| deleteSubGroup | 从Broker删除订阅关系 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -b | Broker地址 | ||
| -c | 集群名称 | ||
| -g | 消费者分组名称 | ||
| cloneGroupOffset | 在目标群组中使用源群组的offset | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -s | 源消费者组 | ||
| -d | 目标消费者组 | ||
| -t | topic名称 | ||
| -o | 暂未使用 |
| 名称 | 含义 | 命令选项 | 说明 |
| consumerConnec tion | 查询 Consumer 的网络连接 | -g | 消费者所属组名 |
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| producerConnec tion | 查询 Producer 的网络连接 | -g | 生产者所属组名 |
| -t | 主题名称 | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 |
| 名称 | 含义 | 命令选项 | 说明 |
| updateKvConfig | 更新NameServer的kv配置,目前还未使用 | -s | 命名空间 |
| -k | key | ||
| -v | value | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| deleteKvConfig | 删除NameServer的kv配置 | -s | 命名空间 |
| -k | key | ||
| -n | NameServer 服务地址,格式 ip:port | ||
| -h | 打印帮助 | ||
| getNamesrvConfig | 获取NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| updateNamesrvConfig | 修改NameServer配置 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 | ||
| -k | key | ||
| -v | value |
| 名称 | 含义 | 命令选项 | 说明 |
| startMonitoring | 开启监控进程,监控消息误删、重试队列消息数等 | -n | NameServer 服务地址,格式 ip:port |
| -h | 打印帮助 |
RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console进行编译打包运行。

1 | git clone https://github.com/apache/rocketmq-externals |
注意:打包前在rocketmq-console中配置namesrv集群地址:
1 | rocketmq.config.namesrvAddr=192.168.25.135:9876;192.168.25.138:9876 |
启动rocketmq-console:
1 | java -jar rocketmq-console-ng-1.0.0.jar |
启动成功后,我们就可以通过浏览器访问http://localhost:8080进入控制台界面了,如下图:

集群状态:

1 | <dependency> |
1 | 1.创建消息生产者producer,并制定生产者组名 |
1 | 1.创建消费者Consumer,制定消费者组名 |
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1 | public class SyncProducer { |
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
1 | public class AsyncProducer { |
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1 | public class OnewayProducer { |
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同
1 | public static void main(String[] args) throws Exception { |
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的
1 | public static void main(String[] args) throws Exception { |
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
1 | /** |
1 | /** |
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
1 | public class ScheduledMessageConsumer { |
1 | public class ScheduledMessageProducer { |
###4.3.3 验证
您将会看到消息的消费比存储时间晚10秒
1 | // org/apache/rocketmq/store/config/MessageStoreConfig.java |
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
1 | String topic = "BatchTest"; |
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
1 | public class ListSplitter implements Iterator<List<Message>> { |
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); |
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
1 | ------------ |
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
常量支持类型为:
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
1 | public void subscribe(finalString topic, final MessageSelector messageSelector) |
发送消息时,你能通过putUserProperty来设置消息的属性
1 | DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); |
用MessageSelector.bySql来使用sql筛选消息
1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); |
###4.6.1 流程分析

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息共有三种状态,提交状态、回滚状态、中间状态:
使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
1 | public class Producer { |
当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
1 | public class TransactionListenerImpl implements TransactionListener { |
transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。transactionMsgTimeout 参数。模拟电商网站购物场景中的【下单】和【支付】业务
###1)下单

###2)支付

用户提交订单后,扣减库存成功、扣减优惠券成功、使用余额成功,但是在确认订单操作失败,需要对库存、库存、余额进行回退。
如何保证数据的完整性?

使用MQ保证在下单失败后系统数据的完整性
.png)
###问题2
用户通过第三方支付平台(支付宝、微信)支付成功后,第三方支付平台要通过回调API异步通知商家支付系统用户支付结果,支付系统根据支付结果修改订单状态、记录支付日志和给用户增加积分。
商家支付系统如何保证在收到第三方支付平台的异步通知时,如何快速给第三方支付凭条做出回应?

通过MQ进行数据分发,提高系统处理性能


下载rocketmq-spring项目
将rocketmq-spring安装到本地仓库
1 | mvn install -Dmaven.skip.test=true |
1 | <parent> |
1 | # application.properties |
1 | @SpringBootApplication |
1 | @RunWith(SpringRunner.class) |
同消息生产者
同消息生产者
1 | @SpringBootApplication |
1 | @Slf4j |
下载dubbo-spring-boot-starter依赖包
将dubbo-spring-boot-starter安装到本地仓库
1 | mvn install -Dmaven.skip.test=true |

/user/local/zookeeper-cluster,将解压后的Zookeeper复制到以下三个目录1 | /usr/local/zookeeper-cluster/zookeeper-1 |
配置每一个 Zookeeper 的 dataDir(zoo.cfg) clientPort 分别为 2181 2182 2183
修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
1 | clientPort=2181 |
修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
1 | clientPort=2182 |
修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
1 | clientPort=2183 |
在每个 zookeeper 的 data 目录下创建一个 myid 文件,内容分别是 1、2、3 。这个文件就是记录每个服务器的 ID
在每一个 zookeeper 的 zoo.cfg 配置客户端访问端口(clientPort)和集群服务器 IP 列表。
集群服务器 IP 列表如下
1 | server.1=192.168.25.140:2881:3881 |
解释:server.服务器 ID=服务器 IP 地址:服务器之间通信端口:服务器之间投票选举端口
启动集群就是分别启动每个实例。

1 | public interface IUserService { |
1 | <parent> |
1 | # application.properties |
1 | @EnableDubboConfiguration |
1 | @Component |
1 | <parent> |
1 | # application.properties |
1 | @EnableDubboConfiguration |
1 | @RestController |
| Field | Type | Comment |
|---|---|---|
| coupon_id | bigint(50) NOT NULL | 优惠券ID |
| coupon_price | decimal(10,2) NULL | 优惠券金额 |
| user_id | bigint(50) NULL | 用户ID |
| order_id | bigint(32) NULL | 订单ID |
| is_used | int(1) NULL | 是否使用 0未使用 1已使用 |
| used_time | timestamp NULL | 使用时间 |
| Field | Type | Comment |
|---|---|---|
| goods_id | bigint(50) NOT NULL | 主键 |
| goods_name | varchar(255) NULL | 商品名称 |
| goods_number | int(11) NULL | 商品库存 |
| goods_price | decimal(10,2) NULL | 商品价格 |
| goods_desc | varchar(255) NULL | 商品描述 |
| add_time | timestamp NULL | 添加时间 |
| Field | Type | Comment |
|---|---|---|
| order_id | bigint(50) NOT NULL | 订单ID |
| user_id | bigint(50) NULL | 用户ID |
| order_status | int(1) NULL | 订单状态 0未确认 1已确认 2已取消 3无效 4退款 |
| pay_status | int(1) NULL | 支付状态 0未支付 1支付中 2已支付 |
| shipping_status | int(1) NULL | 发货状态 0未发货 1已发货 2已退货 |
| address | varchar(255) NULL | 收货地址 |
| consignee | varchar(255) NULL | 收货人 |
| goods_id | bigint(50) NULL | 商品ID |
| goods_number | int(11) NULL | 商品数量 |
| goods_price | decimal(10,2) NULL | 商品价格 |
| goods_amount | decimal(10,0) NULL | 商品总价 |
| shipping_fee | decimal(10,2) NULL | 运费 |
| order_amount | decimal(10,2) NULL | 订单价格 |
| coupon_id | bigint(50) NULL | 优惠券ID |
| coupon_paid | decimal(10,2) NULL | 优惠券 |
| money_paid | decimal(10,2) NULL | 已付金额 |
| pay_amount | decimal(10,2) NULL | 支付金额 |
| add_time | timestamp NULL | 创建时间 |
| confirm_time | timestamp NULL | 订单确认时间 |
| pay_time | timestamp NULL | 支付时间 |
| Field | Type | Comment |
|---|---|---|
| goods_id | int(11) NOT NULL | 商品ID |
| order_id | varchar(32) NOT NULL | 订单ID |
| goods_number | int(11) NULL | 库存数量 |
| log_time | datetime NULL | 记录时间 |
| Field | Type | Comment |
|---|---|---|
| user_id | bigint(50) NOT NULL | 用户ID |
| user_name | varchar(255) NULL | 用户姓名 |
| user_password | varchar(255) NULL | 用户密码 |
| user_mobile | varchar(255) NULL | 手机号 |
| user_score | int(11) NULL | 积分 |
| user_reg_time | timestamp NULL | 注册时间 |
| user_money | decimal(10,0) NULL | 用户余额 |
| Field | Type | Comment |
|---|---|---|
| user_id | bigint(50) NOT NULL | 用户ID |
| order_id | bigint(50) NOT NULL | 订单ID |
| money_log_type | int(1) NOT NULL | 日志类型 1订单付款 2 订单退款 |
| use_money | decimal(10,2) NULL | 操作金额 |
| create_time | timestamp NULL | 日志时间 |
| Field | Type | Comment |
|---|---|---|
| pay_id | bigint(50) NOT NULL | 支付编号 |
| order_id | bigint(50) NULL | 订单编号 |
| pay_amount | decimal(10,2) NULL | 支付金额 |
| is_paid | int(1) NULL | 是否已支付 1否 2是 |
| Field | Type | Comment |
|---|---|---|
| id | varchar(100) NOT NULL | 主键 |
| group_name | varchar(100) NULL | 生产者组名 |
| msg_topic | varchar(100) NULL | 消息主题 |
| msg_tag | varchar(100) NULL | Tag |
| msg_key | varchar(100) NULL | Key |
| msg_body | varchar(500) NULL | 消息内容 |
| msg_status | int(1) NULL | 0:未处理;1:已经处理 |
| create_time | timestamp NOT NULL | 记录时间 |
###9)MQ消息消费表
| Field | Type | Comment |
|---|---|---|
| msg_id | varchar(50) NULL | 消息ID |
| group_name | varchar(100) NOT NULL | 消费者组名 |
| msg_tag | varchar(100) NOT NULL | Tag |
| msg_key | varchar(100) NOT NULL | Key |
| msg_body | varchar(500) NULL | 消息体 |
| consumer_status | int(1) NULL | 0:正在处理;1:处理成功;2:处理失败 |
| consumer_times | int(1) NULL | 消费次数 |
| consumer_timestamp | timestamp NULL | 消费时间 |
| remark | varchar(500) NULL | 备注 |
shop系统基于Maven进行项目管理

共12个系统

使用Mybatis逆向工程针对数据表生成CURD持久层代码
ID生成器
IDWorker:Twitter雪花算法
异常处理类
CustomerException:自定义异常类
CastException:异常抛出类
常量类
ShopCode:系统状态类
响应实体类
Result:封装响应状态和响应信息
.png)
1 | public interface IOrderService { |
1 | @Slf4j |
.png)
1 | private void checkOrder(TradeOrder order) { |
###4)生成预订单

1 | private Long savePreOrder(TradeOrder order) { |
###5)扣减库存
1 | private void reduceGoodsNum(TradeOrder order) { |
1 | @Override |
###6)扣减优惠券
1 | private void changeCoponStatus(TradeOrder order) { |
1 | @Override |
###7)扣减用户余额
1 | private void reduceMoneyPaid(TradeOrder order) { |

1 | @Override |
###8)确认订单
1 | private void updateOrderStatus(TradeOrder order) { |
1 | @Override |
1 | rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 |
1 | @Autowired |
1 | @Override |
1 | private void sendMessage(String topic, String tags, String keys, String body) throws Exception { |
1 | rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 |
1 | @Slf4j |

1 | @Slf4j |
1 | @Slf4j |
1 | @Slf4j |
1 | @Override |
1 | @RunWith(SpringRunner.class) |
###1)准备测试数据
###2)测试下单成功流程
1 | @Test |
执行完毕后,查看数据库中用户的余额、优惠券数据,及订单的状态数据
###3)测试下单失败流程
代码同上。
执行完毕后,查看用户的余额、优惠券数据是否发生更改,订单的状态是否为取消。

1 | public Result createPayment(TradePay tradePay) { |

1 | public Result callbackPayment(TradePay tradePay) { |
1 | @Bean |
1 | @Autowired |
支付成功后,支付服务payService发送MQ消息,订单服务、用户服务、日志服务需要订阅消息进行处理
以下用订单服务为例说明消息的处理情况
1 | mq.pay.topic=payTopic |
1 | public class BaseConsumer { |
1 | @Slf4j |
通过Rest客户端请求shop-order-web和shop-pay-web完成下单和支付操作
1 | @Configuration |
1 | server.host=http://localhost |
1 | server.host=http://localhost |
1 | @RunWith(SpringRunner.class) |
1 | @RunWith(SpringRunner.class) |
分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

Apache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障

文件系统
目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘挂了,否则一般是不会出现无法持久化的故障问题。

###1.1.2 性能对比
文件系统>关系型数据库DB
磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。
Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。
一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:
1)read;读取本地文件内容;
2)write;将读取的内容通过网络发送出去。
这两个看似简单的操作,实际进行了4 次数据复制,分别是:
通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的
RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。
这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成 的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
####3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

RocketMQ分布式集群是通过Master和Slave的配合达到高可用性的。
Master和Slave的区别:在Broker的配置文件中,参数 brokerId的值为0表明这个Broker是Master,大于0表明这个Broker是 Slave,同时brokerRole参数也会说明这个Broker是Master还是Slave。
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。有了自动切换Consumer这种机制,当一个Master角色的机器出现故障后,Consumer仍然可以从Slave读取消息,不影响Consumer程序。这就达到了消费端的高可用性。
在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可 用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
####1)同步复制
同步复制方式是等Master和Slave均写 成功后才反馈给客户端写成功状态;
在同步复制方式下,如果Master出故障, Slave上有全部的备份数据,容易恢复,但是同步复制会增大数据写入 延迟,降低系统吞吐量。
####2)异步复制
异步复制方式是只要Master写成功 即可反馈给客户端写成功状态。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写 入Slave,有可能会丢失;
####3)配置
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个。
####4)总结

实际应用中要结合业务场景,合理设置刷盘方式和主从复制方式, 尤其是SYNC_FLUSH方式,由于频繁地触发磁盘写动作,会明显降低 性能。通常情况下,应该把Master和Save配置成ASYNC_FLUSH的刷盘 方式,主从之间配置成SYNC_MASTER的复制方式,这样即使有一台 机器出故障,仍然能保证数据不丢,是个不错的选择。
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:

图中箭头线条上的标号代表顺序,发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:

还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:

需要注意的是,集群模式下,queue都是只允许分配只一个实例,这是由于如果多个实例同时消费一个queue的消息,由于拉取哪些消息是consumer主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue。
通过增加consumer实例去分摊queue的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue将分配到其他实例上继续消费。
但是如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
####2)广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,所有consumer都分到所有的queue。

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
| 第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
|---|---|---|---|
| 1 | 10 秒 | 9 | 7 分钟 |
| 2 | 30 秒 | 10 | 8 分钟 |
| 3 | 1 分钟 | 11 | 9 分钟 |
| 4 | 2 分钟 | 12 | 10 分钟 |
| 5 | 3 分钟 | 13 | 20 分钟 |
| 6 | 4 分钟 | 14 | 30 分钟 |
| 7 | 5 分钟 | 15 | 1 小时 |
| 8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
1 | public class MessageListenerImpl implements MessageListener { |
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。
1 | public class MessageListenerImpl implements MessageListener { |
自定义消息最大重试次数
消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
1 | Properties properties = new Properties(); |
注意:
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
1 | public class MessageListenerImpl implements MessageListener { |
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信消息具有以下特性
死信队列具有以下特性:


一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
1 | Message message = new Message(); |
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
1 | consumer.subscribe("ons_test", "*", new MessageListener() { |
依赖工具
从官方仓库 https://github.com/apache/rocketmq clone或者download源码。

源码目录结构:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
common :公共包
dev :开发者信息(非源代码)
distribution :部署实例文件夹(非源代码)
example: RocketMQ 例代码
filter :消息过滤相关基础类
filtersrv:消息过滤服务器实现相关类(Filter启动进程)
logappender:日志实现相关类
namesrv:NameServer实现相关类(NameServer启动进程)
openmessageing:消息开放标准
remoting:远程通信模块,给予Netty
srcutil:服务工具类
store:消息存储实现相关类
style:checkstyle相关实现
test:测试相关类
tools:工具类,监控命令相关实现类
###2.1.2 导入IDEA

执行安装
1 | clean install -Dmaven.test.skip=true |
创建conf配置文件夹,从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml




重新启动
控制台打印结果
1 | The Name Server boot success. serializeType=JSON |
broker.conf配置文件内容1 | brokerClusterName = DefaultCluster |
dataDirBrokerStartup,配置broker.conf和ROCKETMQ_HOME

org.apache.rocketmq.example.quickstart1 | DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); |
main方法,发送消息org.apache.rocketmq.example.quickstart1 | DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); |
main方法,消费消息消息中间件的设计思路一般是基于主题订阅发布的机制,消息生产者(Producer)发送某一个主题到消息服务器,消息服务器负责将消息持久化存储,消息消费者(Consumer)订阅该兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(Push模式)或者消费者主动向消息服务器拉去(Pull模式),从而实现消息生产者与消息消费者解耦。为了避免消息服务器的单点故障导致的整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那消息生产者如何知道消息要发送到哪台消息服务器呢?如果某一台消息服务器宕机了,那么消息生产者如何在不重启服务情况下感知呢?
NameServer就是为了解决以上问题设计的。

Broker消息服务器在启动的时向所有NameServer注册,消息生产者(Producer)在发送消息时之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送。NameServer与每台Broker保持长连接,并间隔30S检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中删除。但是路由变化不会马上通知消息生产者。这样设计的目的是为了降低NameServer实现的复杂度,在消息发送端提供容错机制保证消息发送的可用性。
NameServer本身的高可用是通过部署多台NameServer来实现,但彼此之间不通讯,也就是NameServer服务器之间在某一个时刻的数据并不完全相同,但这对消息发送并不会造成任何影响,这也是NameServer设计的一个亮点,总之,RocketMQ设计追求简单高效。

启动类:org.apache.rocketmq.namesrv.NamesrvStartup
####步骤一
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController
代码:NamesrvController#createNamesrvController
1 | //创建NamesrvConfig |
NamesrvConfig属性
1 | private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); |
**rocketmqHome:**rocketmq主目录
**kvConfig:**NameServer存储KV配置属性的持久化路径
**configStorePath:**nameServer默认配置文件路径
**orderMessageEnable:**是否支持顺序消息
NettyServerConfig属性
1 | private int listenPort = 8888; |
**listenPort:**NameServer监听端口,该值默认会被初始化为9876
**serverWorkerThreads:**Netty业务线程池线程个数
**serverCallbackExecutorThreads:**Netty public任务线程池线程个数,Netty网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型未注册线程池,则由public线程池执行。
**serverSelectorThreads:**IO线程池个数,主要是NameServer、Broker端解析请求、返回相应的线程个数,这类线程主要是处理网路请求的,解析请求包,然后转发到各个业务线程池完成具体的操作,然后将结果返回给调用方;
**serverOnewaySemaphoreValue:**send oneway消息请求并发读(Broker端参数);
**serverAsyncSemaphoreValue:**异步消息发送最大并发度;
**serverChannelMaxIdleTimeSeconds :**网络连接最大的空闲时间,默认120s。
**serverSocketSndBufSize:**网络socket发送缓冲区大小。
serverSocketRcvBufSize: 网络接收端缓存区大小。
**serverPooledByteBufAllocatorEnable:**ByteBuffer是否开启缓存;
**useEpollNativeSelector:**是否启用Epoll IO模型。
根据启动属性创建NamesrvController实例,并初始化该实例。NameServerController实例为NameServer核心控制器
代码:NamesrvController#initialize
1 | public boolean initialize() { |
在JVM进程关闭之前,先将线程池关闭,及时释放资源
代码:NamesrvStartup#start
1 | //注册JVM钩子函数代码 |
NameServer的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。
代码:RouteInfoManager
1 | private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; |

**topicQueueTable:**Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
**brokerAddrTable:**Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
**clusterAddrTable:**Broker集群信息,存储集群中所有Broker名称
**brokerLiveTable:**Broker状态信息,NameServer每次收到心跳包是会替换该信息
**filterServerTable:**Broker上的FilterServer列表,用于类模式消息过滤。
RocketMQ基于定于发布机制,一个Topic拥有多个消息队列,一个Broker为每一个主题创建4个读队列和4个写队列。多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构,brokerId为0代表Master,大于0为Slave。BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。


#####1)发送心跳包

RocketMQ路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳信息,每隔30s向集群中所有NameServer发送心跳包,NameServer收到心跳包时会更新brokerLiveTable缓存中BrokerLiveInfo的lastUpdataTimeStamp信息,然后NameServer每隔10s扫描brokerLiveTable,如果连续120S没有收到心跳包,NameServer将移除Broker的路由信息同时关闭Socket连接。
代码:BrokerController#start
1 | //注册Broker信息 |
代码:BrokerOuterAPI#registerBrokerAll
1 | //获得nameServer地址信息 |
代码:BrokerOutAPI#registerBroker
1 | if (oneway) { |

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor网路处理类解析请求类型,如果请求类型是为REGISTER_BROKER,则将请求转发到RouteInfoManager#regiesterBroker
代码:DefaultRequestProcessor#processRequest
1 | //判断是注册Broker信息 |
代码:DefaultRequestProcessor#registerBroker
1 | RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker( |
代码:RouteInfoManager#registerBroker
维护路由信息
1 | //加锁 |
1 | //维护brokerAddrTable |
1 | //维护topicQueueTable |
代码:RouteInfoManager#createAndUpdateQueueData
1 | private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) { |
1 | //维护brokerLiveTable |
1 | //维护filterServerList |
1 |
|
代码:RouteInfoManager#scanNotActiveBroker
1 | public void scanNotActiveBroker() { |
代码:RouteInfoManager#onChannelDestroy
1 | //申请写锁,根据brokerAddress从brokerLiveTable和filterServerTable移除 |
1 | //维护brokerAddrTable |
1 | //维护clusterAddrTable |
1 | //维护topicQueueTable队列 |
1 | //释放写锁 |
RocketMQ路由发现是非实时的,当Topic路由出现变化后,NameServer不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。
代码:DefaultRequestProcessor#getRouteInfoByTopic
1 | public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, |

消息生产者的代码都在client模块中,相对于RocketMQ来讲,消息生产者就是客户端,也是消息的提供者。

###2.3.1 方法和属性
####1)主要方法介绍

1 | //创建主题 |
1 | //根据时间戳从队列中查找消息偏移量 |
1 | //查找消息队列中最大的偏移量 |
1 | //查找消息队列中最小的偏移量 |
1 | //根据偏移量查找消息 |
1 | //根据条件查找消息 |
1 | //根据消息ID和主题查找消息 |

1 | //启动 |
1 | //关闭 |
1 | //查找该主题下所有消息 |
1 | //同步发送消息 |
1 | //同步超时发送消息 |
1 | //异步发送消息 |
1 | //异步超时发送消息 |
1 | //发送单向消息 |
1 | //选择指定队列同步发送消息 |
1 | //选择指定队列异步发送消息 |
1 | //选择指定队列单项发送消息 |
1 | //批量发送消息 |
####2)属性介绍

1 | producerGroup:生产者所属组 |

代码:DefaultMQProducerImpl#start
1 | //检查生产者组是否满足要求 |
整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
同一个clientId只会创建一个MQClientInstance。
MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
代码:MQClientManager#getAndCreateMQClientInstance
1 | public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, |
代码:DefaultMQProducerImpl#start
1 | //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求 |

代码:DefaultMQProducerImpl#send(Message msg)
1 | //发送消息 |
代码:DefaultMQProducerImpl#send(Message msg,long timeout)
1 | //发送消息,默认超时时间为3s |
代码:DefaultMQProducerImpl#sendDefaultImpl
1 | //校验消息 |
####1)验证消息
代码:Validators#checkMessage
1 | public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) |
####2)查找路由
代码:DefaultMQProducerImpl#tryToFindTopicPublishInfo
1 | private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { |

代码:TopicPublishInfo
1 | public class TopicPublishInfo { |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | TopicRouteData topicRouteData; |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | //判断路由是否需要更改 |
代码:MQClientInstance#updateTopicRouteInfoFromNameServer
1 | if (changed) { |
代码:MQClientInstance#topicRouteData2TopicPublishInfo
1 | public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) { |
代码:TopicPublishInfo#selectOneMessageQueue(lastBrokerName)
1 | public MessageQueue selectOneMessageQueue(final String lastBrokerName) { |
代码:TopicPublishInfo#selectOneMessageQueue()
1 | //第一次选择队列 |
1 | public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { |

1 | public interface LatencyFaultTolerance<T> { |
1 | class FaultItem implements Comparable<FaultItem> { |
1 | public class MQFaultStrategy { |
原理分析
代码:DefaultMQProducerImpl#sendDefaultImpl
1 | sendResult = this.sendKernelImpl(msg, |
如果上述发送过程出现异常,则调用DefaultMQProducerImpl#updateFaultItem
1 | public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { |
代码:MQFaultStrategy#updateFaultItem
1 | public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { |
代码:MQFaultStrategy#computeNotAvailableDuration
1 | private long computeNotAvailableDuration(final long currentLatency) { |
代码:LatencyFaultToleranceImpl#updateFaultItem
1 | public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { |
####4)发送消息
消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl
1 | private SendResult sendKernelImpl( |
代码:DefaultMQProducerImpl#sendKernelImpl
1 | //获得broker网络地址信息 |
1 | //为消息分类唯一ID |
1 | //如果注册了消息发送钩子函数,在执行消息发送前的增强逻辑 |
代码:SendMessageHook
1 | public interface SendMessageHook { |
代码:DefaultMQProducerImpl#sendKernelImpl
1 | //构建消息发送请求包 |
1 | case ASYNC: //异步发送 |
1 | //如果注册了钩子函数,则发送完毕后执行钩子函数 |

批量消息发送是将同一个主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息总长度不能超过DefaultMQProducer#maxMessageSize。
批量消息发送要解决的问题是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
代码:DefaultMQProducer#send
1 | public SendResult send(Collection<Message> msgs) |
代码:DefaultMQProducer#batch
1 | private MessageBatch batch(Collection<Message> msgs) throws MQClientException { |
###2.4.1 消息存储核心类

1 | private final MessageStoreConfig messageStoreConfig; //消息配置属性 |

消息存储入口:DefaultMessageStore#putMessage
1 | //判断Broker角色如果是从节点,则无需写入 |
代码:CommitLog#putMessage
1 | //记录消息存储时间 |
代码:MappedFile#appendMessagesInner
1 | //获得文件的写入指针 |
代码:CommitLog#doAppend
1 | //文件写入位置 |
代码:CommitLog#calMsgLength
1 | protected static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) { |
代码:CommitLog#doAppend
1 | //消息长度不能超过4M |
代码:CommitLog#putMessage
1 | //释放锁 |

RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

1 | String storePath; //存储目录 |
1 | public MappedFile getMappedFileByTime(final long timestamp) { |
1 | public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { |
1 | public long getMinOffset() { |
1 | public long getMaxOffset() { |
1 | public long getMaxWrotePosition() { |

1 | int OS_PAGE_SIZE = 1024 * 4; //操作系统每页大小,默认4K |
MappedFile初始化
transientStorePoolEnable。transientStorePoolEnable=true为true表示数据先存储到堆外内存,然后通过Commit线程将数据提交到内存映射Buffer中,再通过Flush线程将内存映射Buffer中数据持久化磁盘。1 | private void init(final String fileName, final int fileSize) throws IOException { |
开启transientStorePoolEnable
1 | public void init(final String fileName, final int fileSize, |
MappedFile提交
提交数据到FileChannel,commitLeastPages为本次提交最小的页数,如果待提交数据不满commitLeastPages,则不执行本次提交操作。如果writeBuffer如果为空,直接返回writePosition指针,无需执行commit操作,表名commit操作主体是writeBuffer。
1 | public int commit(final int commitLeastPages) { |
MappedFile#isAbleToCommit
判断是否执行commit操作,如果文件已满返回true;如果commitLeastpages大于0,则比较writePosition与上一次提交的指针commitPosition的差值,除以OS_PAGE_SIZE得到当前脏页的数量,如果大于commitLeastPages则返回true,如果commitLeastpages小于0表示只要存在脏页就提交。
1 | protected boolean isAbleToCommit(final int commitLeastPages) { |
MappedFile#commit0
具体提交的实现,首先创建WriteBuffer区共享缓存区,然后将新创建的position回退到上一次提交的位置(commitPosition),设置limit为wrotePosition(当前最大有效数据指针),然后把commitPosition到wrotePosition的数据写入到FileChannel中,然后更新committedPosition指针为wrotePosition。commit的作用就是将MappedFile的writeBuffer中数据提交到文件通道FileChannel中。
1 | protected void commit0(final int commitLeastPages) { |
MappedFile#flush
刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。

1 | public int flush(final int flushLeastPages) { |
MappedFile#getReadPosition
获取当前文件最大可读指针。如果writeBuffer为空,则直接返回当前的写指针;如果writeBuffer不为空,则返回上一次提交的指针。在MappedFile设置中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据
1 | public int getReadPosition() { |
MappedFile#selectMappedBuffer
查找pos到当前最大可读之间的数据,由于在整个写入期间都未曾改MappedByteBuffer的指针,如果mappedByteBuffer.slice()方法返回的共享缓存区空间为整个MappedFile,然后通过设置ByteBuffer的position为待查找的值,读取字节长度当前可读最大长度,最终返回的ByteBuffer的limit为size。整个共享缓存区的容量为(MappedFile#fileSize-pos)。故在操作SelectMappedBufferResult不能对包含在里面的ByteBuffer调用filp方法。
1 | public SelectMappedBufferResult selectMappedBuffer(int pos) { |
MappedFile#shutdown
MappedFile文件销毁的实现方法为public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。
1 | public void shutdown(final long intervalForcibly) { |
短暂的存储池。RocketMQ单独创建一个MappedByteBuffer内存缓存池,用来临时存储数据,数据先写入该内存映射中,然后由commit线程定时将数据从该内存复制到与目标物理文件对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

1 | private final int poolSize; //availableBuffers个数 |
初始化
1 | public void init() { |
消息消费队文件、消息属性索引文件都是基于CommitLog文件构建的,当消息生产者提交的消息存储在CommitLog文件中,ConsumerQueue、IndexFile需要及时更新,否则消息无法及时被消费,根据消息属性查找消息也会出现较大延迟。RocketMQ通过开启一个线程ReputMessageService来准实时转发CommitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumerQueue、IndexFile文件。


代码:DefaultMessageStore:start
1 | //设置CommitLog内存中最大偏移量 |
代码:DefaultMessageStore:run
1 | public void run() { |
代码:DefaultMessageStore:deReput
1 | //从result中循环遍历消息,一次读一条,创建DispatherRequest对象。 |
DispatchRequest

1 | String topic; //消息主题名称 |

1 | class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher { |
代码:DefaultMessageStore#putMessagePositionInfo
1 | public void putMessagePositionInfo(DispatchRequest dispatchRequest) { |
代码:DefaultMessageStore#putMessagePositionInfo
1 | //依次将消息偏移量、消息长度、tag写入到ByteBuffer中 |

1 | class CommitLogDispatcherBuildIndex implements CommitLogDispatcher { |
代码:DefaultMessageStore#buildIndex
1 | public void buildIndex(DispatchRequest req) { |
由于RocketMQ存储首先将消息全量存储在CommitLog文件中,然后异步生成转发任务更新ConsumerQueue和Index文件。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个愿意宕机,导致CommitLog、ConsumerQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即便在CommitLog中文件中存在,但由于没有转发到ConsumerQueue,这部分消息将永远复发被消费者消费。

####1)存储文件加载
代码:DefaultMessageStore#load
判断上一次是否异常退出。实现机制是Broker在启动时创建abort文件,在退出时通过JVM钩子函数删除abort文件。如果下次启动时存在abort文件。说明Broker时异常退出的,CommitLog与ConsumerQueue数据有可能不一致,需要进行修复。
1 | //判断临时文件是否存在 |
代码:DefaultMessageStore#load
1 | //加载延时队列 |
代码:MappedFileQueue#load
加载CommitLog到映射文件
1 | //指向CommitLog文件目录 |
代码:DefaultMessageStore#loadConsumeQueue
加载消息消费队列
1 | //执行消费队列目录 |
代码:IndexService#load
加载索引文件
1 | public boolean load(final boolean lastExitOK) { |
代码:DefaultMessageStore#recover
文件恢复,根据Broker是否正常退出执行不同的恢复策略
1 | private void recover(final boolean lastExitOK) { |
代码:DefaultMessageStore#recoverTopicQueueTable
恢复ConsumerQueue后,将在CommitLog实例中保存每隔消息队列当前的存储逻辑偏移量,这也是消息中不仅存储主题、消息队列ID、还存储了消息队列的关键所在。
1 | public void recoverTopicQueueTable() { |
####2)正常恢复
代码:CommitLog#recoverNormally
1 | public void recoverNormally(long maxPhyOffsetOfConsumeQueue) { |
代码:MappedFileQueue#truncateDirtyFiles
1 | public void truncateDirtyFiles(long offset) { |
####3)异常恢复
Broker异常停止文件恢复的实现为CommitLog#recoverAbnormally。异常文件恢复步骤与正常停止文件恢复流程基本相同,其主要差别有两个。首先,正常停止默认从倒数第三个文件开始进行恢复,而异常停止则需要从最后一个文件往前走,找到第一个消息存储正常的文件。其次,如果CommitLog目录没有消息文件,如果消息消费队列目录下存在文件,则需要销毁。
代码:CommitLog#recoverAbnormally
1 | if (!mappedFiles.isEmpty()) { |
RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷盘策略在不同时间进行刷写磁盘。
消息追加到内存后,立即将数据刷写到磁盘文件

代码:CommitLog#handleDiskFlush
1 | //刷盘服务 |
GroupCommitRequest

1 | long nextOffset; //刷盘点偏移量 |
代码:GroupCommitService#run
1 | public void run() { |
代码:GroupCommitService#doCommit
1 | private void doCommit() { |
在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

开启transientStorePoolEnable后异步刷盘步骤:
代码:CommitLog$CommitRealTimeService#run
提交线程工作机制
1 | //间隔时间,默认200ms |
代码:CommitLog$FlushRealTimeService#run
刷盘线程工作机制
1 | //表示await方法等待,默认false |
由于RocketMQ操作CommitLog、ConsumerQueue文件是基于内存映射机制并在启动的时候回加载CommitLog、ConsumerQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以要引入一种机制来删除已过期的文件。RocketMQ顺序写CommitLog、ConsumerQueue文件,所有写操作全部落在最后一个CommitLog或者ConsumerQueue文件上,之前的文件在下一个文件创建后将不会再被更新。RocketMQ清除过期文件的方法时:如果当前文件在在一定时间间隔内没有再次被消费,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时,通过在Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。
代码:DefaultMessageStore#addScheduleTask
1 | private void addScheduleTask() { |
代码:DefaultMessageStore#cleanFilesPeriodically
1 | private void cleanFilesPeriodically() { |
代码:DefaultMessageStore#deleteExpiredFiles
1 | private void deleteExpiredFiles() { |
删除文件操作的条件
代码:CleanCommitLogService#isSpaceToDelete
当磁盘空间不足时执行删除过期文件
1 | private boolean isSpaceToDelete() { |
代码:MappedFileQueue#deleteExpiredFileByTime
执行文件销毁和删除
1 | for (int i = 0; i < mfsLength; i++) { |
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件(ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件(checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件,记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时并不会判断该消息文件上的消息是否被消费。
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费者组可订阅多个主题,消费组之间有集群模式和广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息,将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传递也有两种模式:推模式、拉模式。所谓的拉模式,是消费端主动拉起拉消息请求,而推模式是消息达到消息服务器后,推送给消息消费者。RocketMQ消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用思想:一个消息队列同一个时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一个主题的全局顺序消费,可以将该主题的队列数设置为1,牺牲高可用性。
###2.5.2 消息消费初探
消息推送模式

消息消费重要方法
1 | void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName):发送消息确认 |
DefaultMQPushConsumer

1 | //消费者组 |

代码:DefaultMQPushConsumerImpl#start
1 | public synchronized void start() throws MQClientException { |
消息消费模式有两种模式:广播模式与集群模式。广播模式比较简单,每一个消费者需要拉取订阅主题下所有队列的消息。本文重点讲解集群模式。在集群模式下,同一个消费者组内有多个消息消费者,同一个主题存在多个消费队列,消费者通过负载均衡的方式消费消息。
消息队列负载均衡,通常的作法是一个消息队列在同一个时间只允许被一个消费消费者消费,一个消息消费者可以同时消费多个消息队列。
从MQClientInstance的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

代码:PullMessageService#run
1 | public void run() { |
PullRequest

1 | private String consumerGroup; //消费者组 |
代码:PullMessageService#pullMessage
1 | private void pullMessage(final PullRequest pullRequest) { |
####2)ProcessQueue实现机制
ProcessQueue是MessageQueue在消费端的重现、快照。PullMessageService从消息服务器默认每次拉取32条消息,按照消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

属性
1 | //消息容器 |
方法
1 | //移除消费超时消息 |
#####1.客户端发起拉取请求

代码:DefaultMQPushConsumerImpl#pullMessage
1 | public void pullMessage(final PullRequest pullRequest) { |
#####2.消息服务端Broker组装消息

代码:PullMessageProcessor#processRequest
1 | //构建消息过滤器 |
代码:DefaultMessageStore#getMessage
1 | GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; |
代码:PullMessageProcessor#processRequest
1 | //根据拉取结果填充responseHeader |
#####3.消息拉取客户端处理消息

代码:MQClientAPIImpl#processPullResponse
1 | private PullResult processPullResponse( |
PullResult类
1 | private final PullStatus pullStatus; //拉取结果 |

代码:DefaultMQPushConsumerImpl$PullCallback#OnSuccess
1 | //将拉取到的消息存入processQueue |

RocketMQ未真正实现消息推模式,而是消费者主动向消息服务器拉取消息,RocketMQ推模式是循环向消息服务端发起消息拉取请求,如果消息消费者向RocketMQ拉取消息时,消息未到达消费队列时,如果不启用长轮询机制,则会在服务端等待shortPollingTimeMills时间后(挂起)再去判断消息是否已经到达指定消息队列,如果消息仍未到达则提示拉取消息客户端PULL—NOT—FOUND(消息不存在);如果开启长轮询模式,RocketMQ一方面会每隔5s轮询检查一次消息是否可达,同时一有消息达到后立马通知挂起线程再次验证消息是否是自己感兴趣的消息,如果是则从CommitLog文件中提取消息返回给消息拉取客户端,否则直到挂起超时,超时时间由消息拉取方在消息拉取是封装在请求参数中,PUSH模式为15s,PULL模式通过DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis设置。RocketMQ通过在Broker客户端配置longPollingEnable为true来开启长轮询模式。
代码:PullMessageProcessor#processRequest
1 | //当没有拉取到消息时,通过长轮询方式继续拉取消息 |
PullRequestHoldService方式实现长轮询
代码:PullRequestHoldService#suspendPullRequest
1 | //将拉取消息请求,放置在ManyPullRequest集合中 |
代码:PullRequestHoldService#run
1 | public void run() { |
代码:PullRequestHoldService#checkHoldRequest
1 | //遍历拉取任务 |
代码:PullRequestHoldService#notifyMessageArriving
1 | //如果拉取消息偏移大于请求偏移量,如果消息匹配调用executeRequestWhenWakeup处理消息 |
如果开启了长轮询机制,PullRequestHoldService会每隔5s被唤醒去尝试检测是否有新的消息的到来才给客户端响应,或者直到超时才给客户端进行响应,消息实时性比较差,为了避免这种情况,RocketMQ引入另外一种机制:当消息到达时唤醒挂起线程触发一次检查。
DefaultMessageStore$ReputMessageService机制
代码:DefaultMessageStore#start
1 | //长轮询入口 |
代码:DefaultMessageStore$ReputMessageService#run
1 | public void run() { |
代码:DefaultMessageStore$ReputMessageService#deReput
1 | //当新消息达到是,进行通知监听器进行处理 |
代码:NotifyMessageArrivingListener#arriving
1 | public void arriving(String topic, int queueId, long logicOffset, long tagsCode, |
RocketMQ消息队列重新分配是由RebalanceService线程来实现。一个MQClientInstance持有一个RebalanceService实现,并随着MQClientInstance的启动而启动。
代码:RebalanceService#run
1 | public void run() { |
代码:MQClientInstance#doRebalance
1 | public void doRebalance() { |
代码:RebalanceImpl#doRebalance
1 | //遍历订阅消息对每个主题的订阅的队列进行重新负载 |
代码:RebalanceImpl#rebalanceByTopic
1 | //从主题订阅消息缓存表中获取主题的队列信息 |
RocketMQ默认提供5中负载均衡分配算法
1 | AllocateMessageQueueAveragely:平均分配 |
注意:消息队列的分配遵循一个消费者可以分配到多个队列,但同一个消息队列只会分配给一个消费者,故如果出现消费者个数大于消息队列数量,则有些消费者无法消费消息。
PullMessageService负责对消息队列进行消息拉取,从远端服务器拉取消息后将消息存储ProcessQueue消息队列处理队列中,然后调用ConsumeMessageService#submitConsumeRequest方法进行消息消费,使用线程池来消费消息,确保了消息拉取与消息消费的解耦。ConsumeMessageService支持顺序消息和并发消息,核心类图如下:

并发消息消费
代码:ConsumeMessageConcurrentlyService#submitConsumeRequest
1 | //消息批次单次 |
代码:ConsumeMessageConcurrentlyService$ConsumeRequest#run
1 | //检查processQueue的dropped,如果为true,则停止该队列消费。 |
定时消息是消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费,RocketMQ并不支持任意的时间精度,如果要支持任意时间精度定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免的带来巨大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。消息延迟级别在Broker端通过messageDelayLevel配置,默认为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,delayLevel=1表示延迟消息1s,delayLevel=2表示延迟5s,依次类推。
RocketMQ定时消息实现类为ScheduleMessageService,该类在DefaultMessageStore中创建。通过在DefaultMessageStore中调用load方法加载该类并调用start方法启动。
代码:ScheduleMessageService#load
1 | //加载延迟消息消费进度的加载与delayLevelTable的构造。延迟消息的进度默认存储路径为/store/config/delayOffset.json |
代码:ScheduleMessageService#start
1 | //遍历延迟队列创建定时任务,遍历延迟级别,根据延迟级别level从offsetTable中获取消费队列的消费进度。如果不存在,则使用0 |
调度机制
ScheduleMessageService的start方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。定时调度任务的实现类为DeliverDelayedMessageTimerTask,核心实现方法为executeOnTimeup
代码:ScheduleMessageService$DeliverDelayedMessageTimerTask#executeOnTimeup
1 | //根据队列ID与延迟主题查找消息消费队列 |
顺序消息实现类是org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService
代码:ConsumeMessageOrderlyService#start
1 | public void start() { |
代码:ConsumeMessageOrderlyService#submitConsumeRequest
1 | //构建消息任务,并提交消费线程池中 |
代码:ConsumeMessageOrderlyService$ConsumeRequest#run
1 | //如果消息队列为丢弃,则停止本次消费任务 |
RocketMQ消息消费方式分别为集群模式、广播模式。
消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时间只会分配给一个消费者。
消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。如果消息消费过慢产生消息堆积会触发消息消费拉取流控。
并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费,消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器),广播模式消息消费进度存储在消费者端。
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。
顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队列只能串行消费。并并发消息消费最本质的区别是消息消费时必须成功锁定消息消费队列,在Broker端会存储消息消费队列的锁占用情况。
上一篇中,我们在k8s中部署了一个单节点的redis。这一节就继续升级难度,部署一个redis集群。
redis集群和redis单节点差别很大,而且相比于在服务器上部署redis集群,在k8s环境下的部署又麻烦了一些。
原因在于服务器的ip的固定不变的,但是容器环境下pod的ip每次重启都会改变。
redis cluster维持集群又需要确人ip。所以需要我们处理pod重启的情况。
configmap文件
1 | apiVersion: v1 |
注意这个fix-ip.sh脚本,他的作用是在pod重启时,将自己之前生成的nodes.conf文件中myself的ip改成新的。
其他活着的pod能够自动更新新的ip,只有自己需要处理下。
statefulset文件
1 | apiVersion: v1 |
我们使用statefulset的volumeClaimTemplates,为每一个pod生成其对应的pvc和pv
service
1 | apiVersion: v1 |
好了。
分别执行。全部启动
1 | [root@paas-m-k8s-master-1 ~]# kc -n redis-test get pod,sts,svc |
我们在服务器上组件集群会用以下命令
1 | redis-cli -a 123456 --cluster create ip:port ip2:port ip3:port ip4:port ip5:port ip6:port --cluster-replicas 1 |
我们又可以用以下命令获取所有pod的ip
1 | kubectl -n redis-test get pods -l app=redis-cluster -o jsonpath='{range.items[*]}{.status.podIP}:6379 ' |
所以连起来就是
1 | kubectl -n redis-test exec -it redis-cluster-0 -- redis-cli -a 123456 --cluster create $(kubectl -n redis-test get pods -l app=redis-cluster -o jsonpath='{range.items[*]}{.status.podIP}:6379 ') --cluster-replicas 1 |

好了,搭建完成。
去pod里连接一下看看

用service连接一下看看
1 | [root@paas-m-k8s-master-1 redis]# kc -n yys exec -it myredis-sample-0 -- redis-cli -c -h redis-cluster.redis-test -a 123456 |
注意,现在我们的redis在集群外是不能连接的,就算把service设置为nodeport模式也不行。因为现在redis配置的ip是k8s集群内的ip。
在外面不能被识别。
目前我们的策略是生产环境就应该是生产环境集群内使用,本来也不应该被外界访问。
这样确实在需要排查问题时会比较麻烦,我们单独开发了一个web页面用来供用户被授权后查询使用。
另外有个取巧的方法是,部署一个predixy进行redis的代理。
先写一个数据aaa,看到这个数据存到了100.105.152.12这个pod
1 | [root@paas-m-k8s-master-1 ~]# kc -n yys exec -it myredis-sample-0 -- redis-cli -c -h redis-cluster.redis-test -a 123456 |
看到这个pod是redis-cluster-1
1 | [root@paas-m-k8s-master-1 ~]# kc -n redis-test get pod -owide |
删除redis-cluster-1 。新启动的pod的ip变为100.105.152.25
1 | [root@paas-m-k8s-master-1 ~]# kc -n redis-test delete pod redis-cluster-1 |
查看集群的数据ok
1 | [root@paas-m-k8s-master-1 ~]# kc -n myhome exec -it myredis-sample-0 -- redis-cli -c -h redis-cluster.redis-test -a 123456 |
说明持久化和fix-ip是ok的
其实一开始的时候,测试使用podName.svcName.nameSpace这种statefulset特有的访问方式,因为statefulset这种headless service方式来说,其podName.svcName.nameSpace命名是固定不变的,所以以podName.svcName.nameSpace次方式来构建redis cluster简直太合适了。
1 | kubectl -n redis-test exec -it redis-cluster-0 -- redis-cli -a 123456 --cluster create redis-cluster-0.redis-cluster.redis-test:6379 redis-cluster-1.redis-cluster.redis-test:6379 redis-cluster-2.redis-cluster.redis-test:6379 redis-cluster-3.redis-cluster.redis-test:6379 redis-cluster-4.redis-cluster.redis-test:6379 redis-cluster-5.redis-cluster.redis-test:6379 --cluster-replicas 1 |
不过非常可惜,redis有检验,这种方式不是合法的ip

另外ix-ip这种方案有一个风险,那就是如果多个redis实例同时宕机重启。也就是cluster集群已经fail,那么整个集群无法自我修复。需要人工干预。
ansible命令适合执行简单的操作。如果要完成一个复杂的部署,需要很多ansible操作,写起来会很乱。
所以有了ansible-playbook
把一件事切分成很多任务,有序的组织起来
官方给的playbook工程的最佳实践
1 | production # 生产环境的服务器清单 |
第一眼看起来很乱,不要慌,我们一个一个看就清晰了。看完这几个文件,也就初步学会playbook了。
其中重点要看的是roles目录!!!
比如这里的production和stage文件。在ansible中常称为inventory file(清单文件),即服务器清单
可以自己根据需要创建。
注意:自己创建的这个清单文件在后面ansible-playbook使用时要加上 -i 指定使用
否则使用的是默认的/etc/ansible/hosts这个文件
我这里创建一个名为redis_hosts的清单文件,内容如下
1 | [prod] |
[prod]是组名,用中括号括起,组名里不要出现特殊字符,_可以用
下面是在组内的服务器地址。这是最简单的例子。
这种就是上面例子中的方式。这种方式要求提前将ssh-key发送到被操作主机,保证ansible主机对被操作主机的ssh权限
1 | 10.66.77.88 ansible_ssh_user=“redis” ansible_ssh_pass=“redispass” |
把登录用户和密码信息配上
1 | redisserver ansible_ssh_host=10.66.77.88 ansible_ssh_user=“redis” ansible_ssh_pass=“redispass” |
可以给服务器起一个便于识别的别名
使用ansible_ssh_pass密码这种方式不安全,通常使用的是
1 | redisserver ansible_ssh_host=10.66.77.88 |
其他内置的登录参数
1 | ansible_ssh_host |
使用方式如下,比如改了ssh的端口
1 | [prod] |
playbook中可以使用的变量,从来源上来分,可以分为3大类
ansible中有一些内置变量可供我们使用,当然,这些内置变量的变量名是被ansible保留的,我们定义变量时不能使用这些变量名
| ansible_version | ansible的版本号 | |
|---|---|---|
| hostvars | hostvars可以在操作当前主机时获取到其他主机中的信息。 | ““ |
| inventory_hostname | 被操作的当前主机的主机名称 | 这里所说的主机名称并不是linux系统的主机名,而是对应主机在清单中配置的名称 |
| inventory_hostname_short | 被操作的当前主机的主机的简短名称 | |
| play_hosts | 当前play所操作的所有主机的主机名列表 | |
| groups | 清单中”所有分组”的”分组信息 | |
| group_names | 当前主机所在分组的组名 | |
| inventory_dir | 清单文件的存放路径 | 默认的清单文件/etc/ansible/hosts |
setup模块获取的fact信息,其中就包含了大量的变量,可以直接调用
Playbook在执行时默认就会收集目标主机的facts信息并存为变量,可以指定变量来直接调用。当在Playbook中使用facts变量时,就不能将gather_facts设为no。只有被收集过的facts信息才能被后面的play引用到
可以执行下setup看看,会返回超级多的信息
1 | ansible hostname -m setup |
按优先级排序
ansible-playbook命令的命令行中的-e VARS, --extra-vars=VARS,这样就可以直接把自定义的变量传入
这里的VARS是 key=value的形式
举例
1 | - hosts: websrvs |
vars.yml是自己创建的一个变量文件
在playbook文件中可以通过vars关键字定义变量
举例
1 | - hosts: websrvs |
用于存放host相关的变量
和组名同理。主机变量名和这里创建的文件名必须一致。
在清单文件中可以给机条目上可以加变量,以便后面的playbook执行时使用,比如加一个paasword的变量
1 | [prod] |
用于存放group相关的变量
在上面的inventory 文件中,我们定义过组变量
如果组变量可以抽取出来多个组公用,就可以放在group_vars下。
比如上面我给名叫prod的组定义过变量。现在要抽取出来,就要在group_vars目录下创建一个名为prod的文件
把变量放进去。
所以,文件名和组名是对应的。
文件内容格式如下:
1 | --- |
另外group_vars目录下创建的名为all的文件有特殊地位。对主机清单中的所有主机有效
在清单文件中也可以给主机组加变量
1 | [prod] |
在具体的roles/XXRole/ 目录下的vars目录里也可以定义变量。
roles/XXRole/ 目录下的default目录里也可以定义变量。
这两个目录的变量都是属于这个role的,只能传给这个role
比如这里的site.yml
主playbook文件,我们开始执行一个playbook就是从这个主文件开始
1 | ansible-playbook site.yml |
这个主文件不是一定要叫site.yml。随便起名。根据你自己的业务来起名即可。比如后面的webservers.yml和dbservers.yml。
我们这里写一个redis.yml,用来部署redis
1 | - hosts: prod |
主文件的内容就是指定哪些主机进行什么操作。用什么用户等等。
也可以直接指定tasks
1 | - hosts: prod |
1 | ansible-playbook -i redis_hosts redis.yml |
roles是整个playbook的重点。role可以理解为做一件事的一个角色。
roles 用于层次性、结构化地组织playbook。
roles目录下面你根据自己的业务,定义多个子目录,对应完成某件工作。
比如这里的common,webserver,monitoring,fooapp等
每个role目录下能够根据层次型结构自动装载变量文件vars、task以及handlers等
tasks下的文件就是完成工作的一个个具体动作,至少应该有一个名为main.yml的文件
playbook执行时默认就找这个main.yml。可以定义其他的yml文件,在main.yml引入
比如
1 | --- |
通过-name来说明这个动作的作用
可以看到里面使用了变量。用包围的。这些变量放在vars文件夹下。
其中的动作Action,group,user,file,yum等就是我们上一篇中说的ansible的内置模块
有时我们也会看到这样的写法
1 | - name: 安装supervisor |
其实这里的ansible.builtin.yum和yum是一样的
ansible.builtin.yum是内置yum模块的完整名称。yum是一种简写形式而已。在大多数情况下,可以互换使用,但为了确保兼容性和避免混淆,建议在编写playbook时使用完整的模块名
playbook对task的执行时从上到下按顺序一个一个执行的。执行的结果是幂等的。这个特性非常使用。
对一台客户机多次执行playbook是安全的。
为什么是幂等的呢?
仔细观察task的动作,会发现其对动作的描述都是声明式的。
比如 yum: name= state=present
有一个state值是present,表明我们期望达到的效果是安装了这个包。所以ansible会采取的操作是先检查有没有安装,没有安装才进行安装。
1 | - name: install conf file to centos7 |
对迭代项的引用,固定变量名为”item”
1 | - name: unstall web packages |
1 | - name: add some users |
通过指定分隔符和切分后的对象序号(序号从0开始),来取得变量值中需要的部分。
举例
1 | - name: split |
在这个例子中,还用到了template模块。它会去找templates目录下的模板文件,进行变量替换后,放到客户机指定的目录下。
还有一个notify,这个特性和下面要讲到的handler有关
作用就是某任务的状态在运行后为changed时,可通过“notify”通知给相应的handlers;
比如我们这里的
1 | - name: Template Set {{ pkgname }} Config Files |
当运行这个task的结果是changed时,即redis.conf内容变了的情况下,就会触发restart redis service这个handler。
这个handler可以定义在handlers目录里
任务可以通过“tags“打标签,而后可在ansible-playbook命令上使用-t指定进行调用;
比如上面例子中的
1 | - name: yum install {{ pkgname }} |
我们可以这样使用
1 | ansible-playbook redis.yml -t install_app |
就会直接只执行install_app这一步
应当包含一个main.yml文件,用于定义此角色用到的各handlers,
在handler中可以使用inclnude引入其它的handlers文件;
handler是干嘛的?
handler是用来描述当关注的资源的状态发生变化时要采取的操作。
直白的讲就是由特定条件触发的任务
比如main.yml中这样写
1 | handlers: #注意,前面没有-,是两个空格 |
这里的name要注意,它的值必须和上面task文件中norify的值一致!!,不是随便写的。
也就是说,当task中的redis.conf文件修改了之后,会触发notify这个名叫restart redis service的handler
这个handler执行的操作就是调用service模块,对某个service进行restart
存放模板文件。
playbook使用jinja2模板文件。Jinja2是python的一种模板语言,以Django的模板语言为原本。
比如我们讲redis.conf作为一个模板配置文件redis.conf.j2
1 | bind {{ bindip }} |
模板文件中的变量从vars目录下取
用来存放会用到的文件。
比如script模块用到的sh脚本文件。需要copy到目标主机的安装包等。
存在自定义的变量。
应当包含一个main.yml文件
注意,变量不能有下划线
应当包含一个main.yml文件,也是用于为当前role定义变量。只不过设定的是默认值。优先级低于vars。
应当包含一个main.yml文件,用于定义此角色的特殊设定及其依赖关系;ansible1.3及其以后的版本才支持;
ansible支持自定义扩展功能,新手先不用管这个
执行就简单了,通过ansible-playbook redis.yaml 命令运行即可
1 | # ansible-playbook -h |
部署ingress-nginx-controller
基于 Nginx 的 Ingress Controller 有两种,
一种是 k8s 社区提供的 ingress-nginx,就是本文的部署方案
另一种是 Nginx 社区提供的nginx-ingress
下载部署文件
ingress-nginx的官方地址是
https://github.com/kubernetes/ingress-nginx

我的k8s是1.25版本的,决定部署1.9.3的ingress-nginx
下载deploy文件
1 | wget https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.9.3/deploy/static/provider/cloud/deploy.yaml |
查看该yaml文件,找到需要的镜像是
1 | registry.k8s.io/ingress-nginx/controller:v1.9.3 |
我已经转发传到了dockerhub
分别修改镜像地址
1 | anjia0532/google-containers.ingress-nginx.controller:v1.9.3 |
修改deployment为hostnetwork
1 | hostNetwork: true |
监听主机的80端口
然后apply该文件即可
1 | kubectl apply -f deploy.yaml |
分析该yaml文件,创建的资源如下
部署一个tomcat的serveice
1 | apiVersion: v1 |
部署一个ingress用于转发
1 | apiVersion: networking.k8s.io/v1 |
本地电脑上改一下host
1 | 要ingress-controller启动的那台node的ip test.xxx.com |

现在所有的流量都走ingress-controller的pod所在的那台服务器,存在单点风险。
应该每个node节点部署一个ingress-controller。然后流量进行负载均衡。
修改deploy文件
我的测试集群只有两个node节点
1 | replicas: 2 |
进行pod反亲和,使两个pod分别部署到两个node上
1 | affinity: |
这样在前面可以加一个kong或者其他的负载均衡器来代理一下。
贴一下最后完成的deploy.yaml