hello云胜

技术与生活

0%

Rocketmq存储模块

源码就在store模块下,入口类DefaultMessageStore

该类定义了很多重要的属性,如commitlog文件,index文件,consumequeue文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 存储相关的配置,如存储位置,commitlog文件大小等
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;

// 队列,按消息主题分组了
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;

// 消息队列的刷盘线程服务
private final FlushConsumeQueueService flushConsumeQueueService;

// 删除commitlog的线程服务
private final CleanCommitLogService cleanCommitLogService;

// 清楚consumeQueue文件的线程
private final CleanConsumeQueueService cleanConsumeQueueService;

// 索引文件相关服务
private final IndexService indexService;

// 文件内存映射服务
private final AllocateMappedFileService allocateMappedFileService;

//CommitLog消息分发服务,根据CommitLog文件构建ConsumerQueue、IndexFile文件
private final ReputMessageService reputMessageService;

commitlog文件

我们先必须清楚commitlog文件在服务器上是怎么存储的。

代码中的commitlog 对应磁盘上的 commitlog文件夹

代码commitlog中有一个MappedFileQueue 对应磁盘上 commitlog下的文件列表

MappedFileQueue 里的MappedFile 映射 磁盘上具体的一个commitlog文件

入口方法

org.apache.rocketmq.store.DefaultMessageStore#putMessage

开始做了一些校验,校验了broker状态和msg的状态等

然后调用

1
this.commitLog.putMessage(msg);

putMessage方法

1,对消息体计算CRC32

2,获取当前正在写入的mappedFile

mappedFile非常重要。

我们知道RocketMq的消息是存储在commitlog中的,这是从大的方面理解。

继续往下追,commitlog又是基于mappedFileQueue

1
protected final MappedFileQueue mappedFileQueue;

从名字上就能看出,mappedFileQueue是一个队列,它的底层又是

1
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

MappedFile队列。

MappedFile对应着最终磁盘上的存储文件,

1
private MappedByteBuffer mappedByteBuffer;

是MappedByteBuffer的封装,消息存储跟磁盘、内存的交互都是通过它完成。

获取当前正在写入的mappedFile

1
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

如果是null,或者满了。新创建一个文件

1
2
3
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}

3,追加消息

1
result = mappedFile.appendMessage(msg, this.appendMessageCallback);

这里传一个回调接口appendMessageCallback。appendMessageCallback是一个内部类,这个commitlog类接近2000行了,个人认为这里不该用内部类,拆分出去更好。

MappedFile的几个指针

wrotePosition : 文件的写入指针。标记当前写到文件的位置。

committedPosition: 提交位置指针

flushedPosition : 刷盘位置指针

4,回调append

回调commitlog的内部类方法。代码 org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner)

4.1 计算msgid
4.2获取consume queue的偏移量
1
2
3
4
5
6
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
4.3 消息的size校验
4.4 消息写入bytebuffer
4.5更新队列的offset
1
2
// The next update ConsumeQueue information
CommitLog.this.topicQueueTable.put(key, ++queueOffset);

5,成功后,刷盘线程和同步线程

1
2
3
4
5
6
//释放锁
putMessageLock.unlock();
//刷盘
handleDiskFlush(result, putMessageResult, msg);
//执行HA主从同步
handleHA(result, putMessageResult, msg);

刷盘

消息都是先存到内存中,MappedByteBuffer中。然后根据是同步刷盘还是异步刷盘进行不同的刷盘策略。

文件内存映射

RocketMQ通过使用内存映射文件来提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量。

MappedFile类对应commitlog文件夹下的消息文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static final int OS_PAGE_SIZE = 1024 * 4;//操作系统的页大小,默认是4K
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前JVM实例中MappedFile虚拟内存
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);//当前JVM实例中MappedFile对象个数
protected final AtomicInteger wrotePosition = new AtomicInteger(0);//当前文件的写指针
protected final AtomicInteger committedPosition = new AtomicInteger(0);//当前文件的提交指针
private final AtomicInteger flushedPosition = new AtomicInteger(0);//刷写到磁盘指针
protected int fileSize;//文件大小
protected FileChannel fileChannel;//文件通道
/**
* 消息都是先放到这,然后再放进文件通道
*/
protected ByteBuffer writeBuffer = null;//堆外内存ByteBuffer
protected TransientStorePool transientStorePool = null;//堆外内存池
private String fileName;//文件名称
private long fileFromOffset;//该文件的初始偏移量
private File file;//物理文件
private MappedByteBuffer mappedByteBuffer;//物理文件对应的内存映射Buffer
private volatile long storeTimestamp = 0;//文件最后一次内容写入时间
private boolean firstCreateInQueue = false;//是否是MappedFileQueue队列中第一个文件

consumequeue文件

commitlog文件时顺序写,这样可以极大的提高写性能,但是如果随机读取就会效率很低。所以就有了consumequeue呵index文件

在messageStore初始化的时候,会开启一个线程reputMessageService,进行消息的分发。根据commitlog准实时的更新consumequeue中的偏移量

1
2
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
this.reputMessageService.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

DefaultMessageStore.log.info(this.getServiceName() + " service end");
}

可以看到其run方法,不断执行doReput。

看看doReput都在干嘛