博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ入门学习(五*二)消息持久化存储源码解析
阅读量:3521 次
发布时间:2019-05-20

本文共 4461 字,大约阅读时间需要 14 分钟。

四、总结

面试被问:Broker收到消息后怎么持久化的?

回答者:有两种方式:同步和异步。一般选择异步,同步效率低,但是更可靠。消息存储大致原理是:

核心类MappedFile对应的是每个commitlog文件,MappedFileQueue相当于文件夹,管理所有的文件,还有一个管理者CommitLog对象,他负责提供一些操作。具体的是Broker端拿到消息后先将消息、topic、queue等内容存到ByteBuffer里,然后去持久化到commitlog文件中。commitlog文件大小为1G,超出大小会新创建commitlog文件来存储,采取的nio方式。

五、补充:同步/异步刷盘

1、关键类

类名 描述 刷盘性能
CommitRealTimeService 异步刷盘 &&开启字节缓冲区 最高
FlushRealTimeService 异步刷盘&&关闭内存字节缓冲区 较高
GroupCommitService 同步刷盘,刷完盘才会返回消息写入成功 最低

2、图解

3、同步刷盘

3.1、源码

// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}// Synchronization flushif (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {    // 同步刷盘service -> GroupCommitService    final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;    if (messageExt.isWaitStoreMsgOK()) {        // 数据准备        GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),                                                 this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());        // 将数据对象放到requestsWrite里        service.putRequest(request);        return request.future();    } else {        service.wakeup();        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);    }}

putRequest

public synchronized void putRequest(final GroupCommitRequest request) {    synchronized (this.requestsWrite) {        this.requestsWrite.add(request);    }    // 这里很关键!!!,给他设置成true。然后计数器-1。下面run方法的时候才会进行交换数据且return    if (hasNotified.compareAndSet(false, true)) {        waitPoint.countDown(); // notify    }}

run

public void run() {    while (!this.isStopped()) {        try {            // 是同步还是异步的关键方法,也就是说组不阻塞全看这里。            this.waitForRunning(10);            // 真正的刷盘逻辑            this.doCommit();        } catch (Exception e) {            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);        }    }}

waitForRunning

protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);// 其实就是CountDownLatchprotected final CountDownLatch2 waitPoint = new CountDownLatch2(1);protected void waitForRunning(long interval) {    // 如果是true,且给他改成false成功的话,则onWaitEnd()且return,但是默认是false,也就是默认情况下这个if不会进。    if (hasNotified.compareAndSet(true, false)) {        this.onWaitEnd();        return;    }    //entry to wait    waitPoint.reset();    try {        // 等待,默认值是1,也就是waitPoint.countDown()一次后就会激活这里。        waitPoint.await(interval, TimeUnit.MILLISECONDS);    } catch (InterruptedException e) {        log.error("Interrupted", e);    } finally {        // 给状态值设置成false        hasNotified.set(false);        this.onWaitEnd();    }}

3.2、总结

总结下同步刷盘的主要流程:

核心类是GroupCommitService,核心方法 是waitForRunning。

  • 先调用putRequest方法将hasNotified变为true,且进行notify,也就是waitPoint.countDown()

  • 其次是run方法里的waitForRunning()waitForRunning()判断hasNotified是不是true,是true则交换数据然后return掉,也就是不进行await阻塞,直接return。

  • 最后上一步return了,没有阻塞,那么顺理成章的调用doCommit进行真正意义的刷盘。

4、异步刷盘

4.1、源码

核心类是:FlushRealTimeService

// {@link org.apache.rocketmq.store.CommitLog#submitFlushRequest()}// Asynchronous flushif (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {    flushCommitLogService.wakeup();} else  {    commitLogService.wakeup();}return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);

run

// {@link org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run()}class FlushRealTimeService extends FlushCommitLogService {    @Override    public void run() {        while (!this.isStopped()) {            try {    // 每隔500ms刷一次盘                if (flushCommitLogTimed) {                    Thread.sleep(500);                } else {                    // 根上面同步刷盘调用的是同一个方法,区别在于这里没有将hasNotified变为true,也就是还是默认的false,那么waitForRunning方法内部的第一个判断就不会走,就不会return掉,就会进行下面的await方法阻塞,默认阻塞时间是500毫秒。也就是默认500ms刷一次盘。                    this.waitForRunning(500);                }                // 调用mappedFileQueue的flush方法                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);            } catch (Throwable e) {            }        }    }}

4.2、总结

核心类#方法:FlushRealTimeService#run()

  • 判断flushCommitLogTimed是不是true,默认false,是true则直接sleep(500ms)然后进行mappedFileQueue.flush()刷盘。

  • 若是false,则进入waitForRunning(500),这里是和同步刷盘的区别关键所在,同步刷盘之前将hasNotified变为true了,所以直接一套小连招:return+doCommit了 ,异步这里直接调用的waitForRunning(500),在这之前没任何对hasNotified的操作,所以不会return,而是会继续走下面的waitPoint.await(500, TimeUnit.MILLISECONDS);进行阻塞500毫秒,500毫秒后自动唤醒然后进行flush刷盘。也就是异步刷盘的话默认500ms刷盘一次。

转载地址:http://zyrqj.baihongyu.com/

你可能感兴趣的文章
[LeetCode javaScript] 107. 二叉树的层次遍历 II
查看>>
[LeetCode javaScript] 637. 二叉树的层平均值
查看>>
[LeetCode javaScript] 1. 两数之和
查看>>
[LeetCode javaScript] 14. 最长公共前缀
查看>>
[LeetCode javaScript] 26. 删除排序数组中的重复项
查看>>
[LeetCode javaScript] 8. 字符串转换整数 (atoi)
查看>>
[LeetCode javaScript] 28. 实现strStr()
查看>>
cv2.error: OpenCV(3.4.2) c:\projects\opencv-python\opencv\modules\imgproc\src\color.hpp:25
查看>>
前端网页学习7(css背景属性)
查看>>
前端网页学习8(css三大特性:层叠性,继承性,优先级)
查看>>
前端网页学习9(css盒子)
查看>>
python学习8(列表)
查看>>
JavaScript学习(new1)
查看>>
http GET 和 POST 请求的优缺点、区别以及误区
查看>>
JVM的4种垃圾回收算法、垃圾回收机制
查看>>
什么是分布式事务
查看>>
常用的分布式事务解决方案
查看>>
设计模式:单例模式 (关于饿汉式和懒汉式)
查看>>
一致性Hash算法
查看>>
更新Navicat Premium 后打开数据库出现1146 - Table 'performance_schema.session_variables' doesn't exist
查看>>