当前位置: 首页 > news >正文

手机平板购物网站的设计背景郑州市域名服务公司

手机平板购物网站的设计背景,郑州市域名服务公司,网站快速优化排名app,电影网站怎么制作梳理一些比较完整#xff0c;比较复杂的业务线 消息持久化设计 RocketMQ的持久化文件结构 消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能#xff0c;很慢的操作#xff0c;所以#xff0c;对消息持久化机制的设计#xff0c;是…梳理一些比较完整比较复杂的业务线 消息持久化设计 RocketMQ的持久化文件结构 消息持久化也就是将内存中的消息写入到本地磁盘的过程。而磁盘IO操作通常是一个很耗性能很慢的操作所以对消息持久化机制的设计是一个MQ产品提升性能的关键甚至可以说是最为重要的核心也不为过。接下来梳理RocketMQ是如何在本地磁盘中保存消息的 RocketMQ消息直接采用磁盘文件保存消息默认路径在${user_home}/store目录。这些存储目录可以在broker.conf中自行指定。 存储文件主要分为三个部分 CommitLog存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成每个文件固定大小1G。以第一条消息的偏移量为文件名。 ConsumerQueue存储消息在CommitLog的索引。一个MessageQueue一个文件记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。 IndexFile为消息查询提供了一种通过key或时间区间来查询消息的方法这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程 另外还有几个辅助的存储文件主要记录一些描述消息的元数据 checkpoint数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。 config/*.json这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。 abort这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下会在启动时创建而关闭服务时删除。但是如果遇到一些服务器宕机或者kill -9这样一些非正常关闭服务的情况这个abort文件就不会删除因此RocketMQ就可以判断上一次服务是非正常关闭的后续就会做一些数据恢复的操作。 整体的消息存储结构官方做了个图进行描述 Producer发过来的所有消息不管是属于哪个TopicBroker都统一存在CommitLog文件当中然后分别构建ConsumeQueue文件和IndexFile两个索引文件用来辅助消费者进行消息检索。这种设计最直接的好处是可以较少查找目标文件的时间让消息以最快的速度落盘。对比Kafka存文件时需要寻找消息所属的Partition文件再完成写入。当Topic比较多时这样的Partition寻址就会浪费非常多的时间。所以Kafka不太适合多Topic的场景。而RocketMQ的这种快速落盘的方式在多Topic的场景下优势就比较明显了。 在文件形式上CommitLog文件的大小是固定的。文件名就是当前CommitLog文件当中存储的第一条消息的Offset。 ConsumeQueue文件主要是加速消费者进行消息索引。每个文件夹对应RocketMQ中的一个MessageQueue文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样消费者通过ConsumeQueue文件就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件中的消费进度会保存在config/consumerOffset.json文件当中。 IndexFile文件主要是辅助消费者进行消息索引。消费者进行消息消费时通过ConsumeQueue文件就足够完成消息检索了但是如果消费者指定时间戳进行消费或者要按照MessageId或者MessageKey来检索文件比如RocketMQ管理控制台的消息轨迹功能ConsumeQueue文件就不够用了。IndexFile文件就是用来辅助这类消息检索的。他的文件名比较特殊不是以消息偏移量命名而是用的时间命名。但是其实他也是一个固定大小的文件。 这是对RocketMQ存盘文件最基础的了解但是只有这样的设计是不足以支撑RocketMQ的三高性能的。RocketMQ如何保证ConsumeQueue、IndexFile两个索引文件与CommitLog中的消息对齐如何保证消息断电不丢失如何保证文件高效的写入磁盘等等。如果你想要去抓住RocketMQ这些三高问题的核心设计那么还是需要到源码当中去深究。 commitLog写入 消息存储的入口在 DefaultMessageStore.asyncPutMessage方法 CommitLog的asyncPutMessage方法中会给写入线程加锁保证一次只会允许一个线程写入。写入消息的过程是串行的一次只会允许一个线程写入。 最终进入CommitLog中的DefaultAppendMessageCallback#doAppend方法这里就是Broker写入消息的实际入口。这个方法最终会把消息追加到MappedFile映射的一块内存里并没有直接写入磁盘。而是在随后调用ComitLog#submitFlushRequest方法提交刷盘申请。刷盘完成之后内存中的文件才真正写入到磁盘当中。 在提交刷盘申请之后就会立即调用CommitLog#submitReplicaRequest方法发起主从同步申请。 文件同步刷盘与异步刷盘 入口CommitLog.submitFlushRequest 这里涉及到了对于同步刷盘与异步刷盘的不同处理机制。这里有很多极致提高性能的设计对于我们理解和设计高并发应用场景有非常大的借鉴意义。 同步刷盘和异步刷盘是通过不同的FlushCommitLogService的子服务实现的。 //org.apache.rocketmq.store.CommitLog的构造方法 if (FlushDiskType.SYNC_FLUSH defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService new GroupCommitService(); } else {this.flushCommitLogService new FlushRealTimeService(); } ​ this.commitLogService new CommitRealTimeService(); 同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘但是从源码中能看到他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法扫描文件的缓存。只要缓存当中有消息就执行一次Flush操作。 而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作只不过他的执行时机会根据配置进行灵活调整。所以可以看到这里异步刷盘和同步刷盘的最本质区别实际上是进行Flush操作的频率不同。 我们经常说使用RocketMQ的同步刷盘可以保证Broker断电时消息不会丢失。但是可以看到RocketMQ并不可能真正来一条消息就进行一次刷盘这样在海量数据下操作系统是承受不了的。而只要不是来一次消息刷一次盘那么在Broker直接断电的情况接下就总是会有内存中的消息没有刷入磁盘的情况这就会造成消息丢失。所以对于消息安全性的设计其实是重在取舍无法做到绝对。 同步刷盘和异步刷盘最终落地到FileChannel的force方法。这个force方法就会最终调用一次操作系统的fsync系统调用完成文件写入。 //org.apache.rocketmq.store.MappedFile#flush public int flush(final int flushLeastPages) {if (this.isAbleToFlush(flushLeastPages)) {if (this.hold()) {int value getReadPosition(); ​try {//We only append data to fileChannel or mappedByteBuffer, never both.if (writeBuffer ! null || this.fileChannel.position() ! 0) {this.fileChannel.force(false);} else {this.mappedByteBuffer.force();}} catch (Throwable e) {log.error(Error occurred when force data to disk., e);} ​this.flushedPosition.set(value);this.release();} else {log.warn(in flush, hold failed, flush offset this.flushedPosition.get());this.flushedPosition.set(getReadPosition());}}return this.getFlushedPosition(); } 另外一个CommitRealTimeService这个子线程则是用来写入堆外内存的。应用可以通过配置TransientStorePoolEnable参数开启堆外内存如果开启了堆外内存会在启动时申请一个跟CommitLog文件大小一致的堆外内存这部分内存就可以确保不会被交换到虚拟内存中。而CommitRealTimeService处理消息的方式则只是调用mappedFileQueue的commit方法。这个方法只是往操作系统的PagedCache里写入消息并不主动进行刷盘操作。会由操作系统通过Dirty Page机制在某一个时刻进行统一刷盘。例如我们在正常关闭操作系统时经常会等待很长时间。这里面大部分的时间其实就是在做PageCache的刷盘。 //org.apache.rocketmq.store.MappedFileQueue public boolean commit(final int commitLeastPages) {boolean result true;MappedFile mappedFile this.findMappedFileByOffset(this.committedWhere, this.committedWhere 0);if (mappedFile ! null) {int offset mappedFile.commit(commitLeastPages);long where mappedFile.getFileFromOffset() offset;result where this.committedWhere;this.committedWhere where;} ​return result; } 在梳理同步刷盘与异步刷盘的具体实现时可以看到一个小点RocketMQ是如何让两个刷盘服务间隔执行的RocketMQ提供了一个自己实现的CountDownLatch2工具类来提供线程阻塞功能使用CAS驱动CountDownLatch2的countDown操作。每来一个消息就启动一次CAS成功后调用一次countDown。而这个CountDonwLatch2在Java.util.concurrent.CountDownLatch的基础上实现了reset功能这样可以进行对象重用。 CommigLog主从复制 入口CommitLog.submitReplicaRequest 主从同步时也体现到了RocketMQ对于性能的极致追求。最为明显的RocketMQ整体是基于Netty实现的网络请求而在主从复制这一块却放弃了Netty框架转而使用更轻量级的Java的NIO来构建。 在主要的HAService中会在启动过程中启动三个守护进程。 //HAService#start public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start(); } 这其中与Master相关的是acceptSocketService和groupTransferService。其中acceptSocketService主要负责维护Master与Slave之间的TCP连接。groupTransferService主要与主从同步复制有关。而slave相关的则是haClient。 至于其中关于主从的同步复制与异步复制的实现流程还是比较复杂的有兴趣的同学可以深入去研究一下。 推荐一篇可供参考的博客 RocketMQ源码分析之主从数据复制-CSDN博客 分发ConsumeQueue和IndexFile 当CommitLog写入一条消息后在DefaultMessageStore的start方法中会启动一个后台线程reputMessageService。源码就定义在DefaultMessageStore中。这个后台线程每隔1毫秒就会去拉取CommitLog中最新更新的一批消息。如果发现CommitLog中有新的消息写入就会触发一次doDispatch。 //org.apache.rocketmq.store.DefaultMessageStore中的ReputMessageService线程类 public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {dispatcher.dispatch(req);} } dispatchList中包含两个关键的实现类CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex。源码就定义在DefaultMessageStore中。他们分别用来构建ConsumeQueue索引和IndexFile索引。 并且如果服务异常宕机会造成CommitLog和ConsumeQueue、IndexFile文件不一致有消息写入CommitLog后没有分发到索引文件这样消息就丢失了。DefaultMappedStore的load方法提供了恢复索引文件的方法入口在load方法。 过期文件删除机制 入口 DefaultMessageStore.addScheduleTask - DefaultMessageStore.this.cleanFilesPeriodically() 在这个方法中会启动两个线程cleanCommitLogService用来删除过期的CommitLog文件cleanConsumeQueueService用来删除过期的ConsumeQueue和IndexFile文件。 在删除CommitLog文件时Broker会启动后台线程每60秒检查CommitLog、ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说默认情况下 RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配置。 触发过期文件删除时有两个检查的纬度一个是是否到了触发删除的时间也就是broker.conf里配置的deleteWhen属性。另外还会检查磁盘利用率达到阈值也会触发过期文件删除。这个阈值默认是72%可以在broker.conf文件当中定制。但是最大值为95最小值为10。 然后在删除ConsumeQueue和IndexFile文件时会去检查CommitLog当前的最小Offset然后在删除时进行对齐。 需要注意的是RocketMQ在删除过期CommitLog文件时并不检查消息是否被消费过。 所以如果有消息长期没有被消费是有可能直接被删除掉造成消息丢失的。 RocketMQ整个文件管理的核心入口在DefaultMessageStore的start方法中整体流程总结如下 文件索引结构 了解了大部分的文件写入机制之后最后我们来理解一下RocketMQ的索引构建方式。 1、CommitLog文件的大小是固定的但是其中存储的每个消息单元长度是不固定的具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息长度的方法 protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength) {int bornhostLength (sysFlag MessageSysFlag.BORNHOST_V6_FLAG) 0 ? 8 : 20;int storehostAddressLength (sysFlag MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) 0 ? 8 : 20;final int msgLen 4 //TOTALSIZE 4 //MAGICCODE 4 //BODYCRC 4 //QUEUEID 4 //FLAG 8 //QUEUEOFFSET 8 //PHYSICALOFFSET 4 //SYSFLAG 8 //BORNTIMESTAMP bornhostLength //BORNHOST 8 //STORETIMESTAMP storehostAddressLength //STOREHOSTADDRESS 4 //RECONSUMETIMES 8 //Prepared Transaction Offset 4 (bodyLength 0 ? bodyLength : 0) //BODY 1 topicLength //TOPIC 2 (propertiesLength 0 ? propertiesLength : 0) //propertiesLength 0;return msgLen; } 因为消息的记录大小不固定所以RocketMQ在每次存CommitLog文件时都会去检查当前CommitLog文件空间是否足够如果不够的话就重新创建一个CommitLog文件。文件名为当前消息的偏移量。 2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样消费者通过ComsumeQueue文件就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度会保存在config/consumerOffset.json文件当中。 文件结构 每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成数据块的内容包括msgPhyOffset(8byte消息在文件中的起始位置)msgSize(4byte消息在文件中占用的长度)msgTagCode(8byte消息的tag的Hash值)。 msgTag是和消息索引放在一起的所以消费者根据Tag过滤消息的性能是非常高的。 在ConsumeQueue.java当中有一个常量CQ_STORE_UNIT_SIZE20这个常量就表示一个数据块的大小。 private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) { ​if (offset size this.maxPhysicOffset) {log.warn(Maybe try to build consume queue repeatedly maxPhysicOffset{} phyOffset{}, maxPhysicOffset, offset);return true;} ​this.byteBufferIndex.flip();//在ConsumeQueue.java当中构建一条ConsumeQueue索引的方法中记录一个单元块的数据this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode); ​final long expectLogicOffset cqOffset * CQ_STORE_UNIT_SIZE;... ... } 3、IndexFile文件主要是辅助消息检索。他的作用主要是用来支持根据key和timestamp检索消息。他的文件名比较特殊不是以消息偏移量命名而是用的时间命名。但是其实他也是一个固定大小的文件。 文件结构 他的文件结构由 indexHeader(固定40byte) slot(固定500W个每个固定20byte) index(最多500W*4个每个固定20byte) 三个部分组成。 indexFile的详细结构有大厂之前面试过参考博文 RocketMQ之底层IndexFile存储协议_rocketmq index_roykingw的博客-CSDN博客 延迟消息机制 关注重点 延迟消息是RocketMQ非常有特色的一个功能其他MQ产品中往往需要开发者使用一些特殊方法来变相实现延迟消息功能。而RocketMQ直接在产品中实现了这个功能开发者只需要设定一个属性就可以快速实现。 延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数对应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题这个主题下有18个队列对应18个延迟级别。消息发过来之后会先把消息存入Schedule_Topic主题中对应的队列。然后等延迟时间到了再转发到目标队列推送给消费者进行消费。 源码重点 延迟消息的处理入口在scheduleMessageService这个组件中。 会在broker启动时也一起加载。 1、消息写入到系统内置的Topic中 代码见CommitLog.putMessage方法。 在CommitLog写入消息时会判断消息的延迟级别然后修改Message的Topic和Queue将消息转储到系统内部的Topic中这样消息就对消费者不可见了。而原始的目标信息会作为消息的属性保存到消息当中。 //should be consistent with the old version if (tranType MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 延迟消息转到系统Topicif (msg.getDelayTimeLevel() 0) {if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());} ​ ​String topic TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); ​// Backup real topic, queueIdMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));// 修改消息的Topic和Queue转储到系统的Topic中msg.setTopic(topic);msg.setQueueId(queueId);} } 2、消息转储到目标Topic 接下来就是需要过一点时间再将消息转回到Producer提交的Topic和Queue中这样就可以正常往消费者推送了。 这个转储的核心服务是scheduleMessageService他也是Broker启动过程中的一个功能组件。随DefaultMessageStore组件一起构建。这个服务只在master节点上启动而在slave节点上会主动关闭这个服务。 //org.apache.rocketmq.store.DefaultMessageStore Override public void handleScheduleMessageService(final BrokerRole brokerRole) {if (this.scheduleMessageService ! null) {if (brokerRole BrokerRole.SLAVE) {this.scheduleMessageService.shutdown();} else {this.scheduleMessageService.start();}} } 由于RocketMQ的主从节点支持切换所以就需要考虑这个服务的幂等性。在节点切换为slave时就要关闭服务切换为master时就要启动服务。并且即便节点多次切换为master服务也只启动一次。所以在ScheduleMessageService的start方法中就通过一个CAS操作来保证服务的启动状态。 if (started.compareAndSet(false, true)) { 这个CAS操作还保证了在后面同一时间只有一个DeliverDelayedMessageTimerTask执行。这种方式给整个延迟消息服务提供了一个基础保证。 ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的DeliverDelayedMessageTimerTask.executeOnTimeup方法。 在executeOnTimeup方法中就会去扫描SCHEDULE_TOPIC_XXXX这个Topic下的所有messageQueue然后扫描这些MessageQueue对应的ConsumeQueue文件找到没有处理过的消息计算他们的延迟时间。如果延迟时间没有到就等下一秒再重新扫描。如果延迟时间到了就进行消息转储。将消息转回到原来的目标Topic下。 整个延迟消息的实现方式 ScheduleMessageService中扫描延迟消息的主要逻辑 //ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup public void executeOnTimeup() {//找到延迟队列对应的ConsumeQueue文件ConsumeQueue cq ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel)); ​if (cq null) {this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);return;}//通过计算找到这一次扫描需要处理的的ConsumeQueue文件SelectMappedBufferResult bufferCQ cq.getIndexBuffer(this.offset);... ...long nextOffset this.offset;try {int i 0;ConsumeQueueExt.CqExtUnit cqExtUnit new ConsumeQueueExt.CqExtUnit();//循环过滤ConsumeQueue文件当中的每一条消息索引for (; i bufferCQ.getSize() isStarted(); i ConsumeQueue.CQ_STORE_UNIT_SIZE) {//解析每一条ConsumeQueue记录long offsetPy bufferCQ.getByteBuffer().getLong();int sizePy bufferCQ.getByteBuffer().getInt();long tagsCode bufferCQ.getByteBuffer().getLong(); ​... ...//计算延迟时间long now System.currentTimeMillis();long deliverTimestamp this.correctDeliverTimestamp(now, tagsCode);nextOffset offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);//延迟时间没到就等下一次扫描long countdown deliverTimestamp - now;if (countdown 0) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;} ​... ...//时间到了就进行转储boolean deliverSuc;if (ScheduleMessageService.this.enableAsyncDeliver) {deliverSuc this.asyncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} else {deliverSuc this.syncDeliver(msgInner, msgExt.getMsgId(), nextOffset, offsetPy, sizePy);} ​if (!deliverSuc) {this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);return;}}//计算下一次扫描时的Offset起点nextOffset this.offset (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);} catch (Exception e) {log.error(ScheduleMessageService, messageTimeup execute error, offset {}, nextOffset, e);} finally {bufferCQ.release();}//部署下一次扫描任务this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE); } 如果清楚了ConsumeQueue文件的结构就可以很清晰的感受到RocketMQ其实就是在Broker端像一个普通消费者一样去进行消费然后扩展出了延迟消息的整个扩展功能。而这其实也是很多互联网大厂对RocketMQ进行自定义功能扩展的很好的参考。 长轮询机制 功能回顾 RocketMQ对消息消费者提供了Push推模式和Pull拉模式两种消费模式。但是这两种消费模式的本质其实都是Pull拉模式Push模式可以认为是一种定时的Pull机制。但是这时有一个问题当使用Push模式时如果RocketMQ中没有对应的数据那难道一直进行空轮询吗如果是这样的话那显然会极大的浪费网络带宽以及服务器的性能并且当有新的消息进来时RocketMQ也没有办法尽快通知客户端而只能等客户端下一次来拉取消息了。针对这个问题RocketMQ实现了一种长轮询机制 long polling。 长轮询机制简单来说就是当Broker接收到Consumer的Pull请求时判断如果没有对应的消息不用直接给Consumer响应(给响应也是个空的没意义)而是就将这个Pull请求给缓存起来。当Producer发送消息过来时增加一个步骤去检查是否有对应的已缓存的Pull请求如果有就及时将请求从缓存中拉取出来并将消息通知给Consumer。 源码重点 Consumer请求缓存代码入口PullMessageProcessor#processRequest方法 PullRequestHoldService服务会随着BrokerController一起启动。 生产者线从DefaultMessageStore.doReput进入 整个流程以及源码重点 关于零拷贝与顺序写 刷盘机制保证消息不丢失 在操作系统层面当应用程序写入一个文件时文件内容并不会直接写入到硬件当中而是会先写入到操作系统中的一个缓存PageCache中。PageCache缓存以4K大小为单位缓存文件的具体内容。这些写入到PageCache中的文件在应用程序看来是已经完全落盘保存好了的可以正常修改、复制等等。但是本质上PageCache依然是内存状态所以一断电就会丢失。因此需要将内存状态的数据写入到磁盘当中这样数据才能真正完成持久化断电也不会丢失。这个过程就称为刷盘。 Java当中使用FileOutputStream类或者BufferedWriter类进行write操作就是写入的Pagecache。 RocketMQ中通过fileChannel.commit方法写入消息也是写入到Pagecache。 PageCache是源源不断产生的而Linux操作系统显然不可能时时刻刻往硬盘写文件。所以操作系统只会在某些特定的时刻将PageCache写入到磁盘。例如当我们正常关机时就会完成PageCache刷盘。另外在Linux中对于有数据修改的PageCache会标记为Dirty(脏页)状态。当Dirty Page的比例达到一定的阈值时就会触发一次刷盘操作。例如在Linux操作系统中可以通过/proc/meminfo文件查看到Page Cache的状态。 [root192-168-65-174 ~]# cat /proc/meminfo MemTotal:       16266172 kB ..... Cached:           923724 kB ..... Dirty:                32 kB Writeback:             0 kB ..... Mapped:           133032 kB ..... 但是只要操作系统的刷盘操作不是时时刻刻执行的那么对于用户态的应用程序来说那就避免不了非正常宕机时的数据丢失问题。因此操作系统也提供了一个系统调用应用程序可以自行调用这个系统调用完成PageCache的强制刷盘。在Linux中是fsync同样我们可以用man 2 fsync 指令查看。 RocketMQ对于何时进行刷盘也设计了两种刷盘机制同步刷盘和异步刷盘。只需要在broker.conf中进行配置就行。 零拷贝加速文件读写 零拷贝(zero-copy)是操作系统层面提供的一种加速文件读写的操作机制非常多的开源软件都在大量使用零拷贝来提升IO操作的性能。对于Java应用层对应着mmap和sendFile两种方式。 理解CPU拷贝和DMA拷贝 操作系统对于内存空间是分为用户态和内核态的。用户态的应用程序无法直接操作硬件需要通过内核空间进行操作转换才能真正操作硬件。这其实是为了保护操作系统的安全。正因为如此应用程序需要与网卡、磁盘等硬件进行数据交互时就需要在用户态和内核态之间来回的复制数据。而这些操作原本都是需要由CPU来进行任务的分配、调度等管理步骤的早先这些IO接口都是由CPU独立负责所以当发生大规模的数据读写操作时CPU的占用率会非常高。 之后操作系统为了避免CPU完全被各种IO调用给占用引入了DMA(直接存储器存储)。由DMA来负责这些频繁的IO操作。DMA是一套独立的指令集不会占用CPU的计算资源。这样CPU就不需要参与具体的数据复制的工作只需要管理DMA的权限即可。 DMA拷贝极大的释放了CPU的性能因此他的拷贝速度会比CPU拷贝要快很多。但是其实DMA拷贝本身也在不断优化。 引入DMA拷贝之后在读写请求的过程中CPU不再需要参与具体的工作DMA可以独立完成数据在系统内部的复制。但是数据复制过程中依然需要借助数据总进线。当系统内的IO操作过多时还是会占用过多的数据总线造成总线冲突最终还是会影响数据读写性能。 为了避免DMA总线冲突对性能的影响后来又引入了Channel通道的方式。Channel是一个完全独立的处理器专门负责IO操作。既然是处理器Channel就有自己的IO指令与CPU无关他也更适合大型的IO操作性能更高。 这也解释了为什么Java应用层与零拷贝相关的操作都是通过Channel的子类实现的。这其实是借鉴了操作系统中的概念。 而所谓的零拷贝技术其实并不是不拷贝而是要尽量减少CPU拷贝。 再来理解下mmap文件映射机制是怎么回事 mmap机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的map方法完成映射。 以一次文件的读写操作为例应用程序对磁盘文件的读与写都需要经过内核态与用户态之间的状态切换每次状态切换的过程中就需要有大量的数据复制。 在这个过程中总共需要进行四次数据拷贝。而磁盘与内核态之间的数据拷贝在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以在这个场景下零拷贝技术优化的重点就是内核态与用户态之间的这两次拷贝。 而mmap文件映射的方式就是在用户态不再保存文件的内容而只保存文件的映射包括文件的内存起始地址文件大小等。真实的数据也不需要在用户态留存可以直接通过操作映射在内核态完成数据复制。 这个拷贝过程都是在操作系统的系统调用层面完成的在Java应用层其实是无法直接观测到的但是我们可以去JDK源码当中进行间接验证。在JDK的NIO包中java.nio.HeapByteBuffer映射的就是JVM的一块堆内内存在HeapByteBuffer中会由一个byte数组来缓存数据内容所有的读写操作也是先操作这个byte数组。这其实就是没有使用零拷贝的普通文件读写机制。 HeapByteBuffer(int cap, int lim) {            // package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb new byte[cap];offset 0;*/ } 而NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。在DirectByteBuffer中并没有一个数据结构来保存数据内容只保存了一个内存地址。所有对数据的读写操作都通过unsafe魔法类直接交由内核完成这其实就是mmap的读写机制。 最后这种mmap的映射机制由于还是需要用户态保存文件的映射信息数据复制的过程也需要用户态的参与这其中的变数还是非常多的。所以mmap机制适合操作小文件如果文件太大映射信息也会过大容易造成很多问题。通常mmap机制建议的映射文件大小不要超过2G 。而RocketMQ的CommitLog文件保持在1G固定大小也是为了方便文件映射。 梳理下sendFile机制是怎么运行的 sendFile机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的transferTo方法完成 sourceReadChannel.transferTo(0,sourceFile.length(),targetWriteChannel); 早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。但是在后期的不断改进过程中sendfile优化了实现机制在拷贝过程中并不直接拷贝文件的内容而是只拷贝一个带有文件位置和长度等信息的文件描述符FD这样就大大减少了需要传递的数据。而真实的数据内容会交由DMA控制器从页缓存中打包异步发送到socket中。 最后sendfile机制在内核态直接完成了数据的复制不需要用户态的参与所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。 顺序写加速文件写入磁盘 通常应用程序往磁盘写文件时由于磁盘空间不是连续的会有很多碎片。所以我们去写一个文件时也就无法把一个文件写在一块连续的磁盘空间中而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间每次写数据时就可以避免这些寻址操作直接在之前写入的地址后面接着写就行。 Kafka官方详细分析过顺序写的性能提升问题。Kafka官方曾说明顺序写的性能基本能够达到内存级别。而如果配备固态硬盘顺序写的性能甚至有可能超过写内存。而RocketMQ很大程度上借鉴了Kafka的这种思想。 例如可以看下org.apache.rocketmq.store.CommitLog#DefaultAppendMessageCallback中的doAppend方法。在这个方法中会以追加的方式将消息先写入到一个堆外内存byteBuffer中然后再通过fileChannel写入到磁盘。
http://www.tj-hxxt.cn/news/223473.html

相关文章:

  • 怎么做网站二级页面做义工的网站
  • 珠海市斗门建设局网站网络推广招聘
  • 中国数学外国人做视频网站房价必涨的十大城市
  • 上线了怎么建网站泰安新闻头条最新消息
  • 怎么创建卡密网站乐山市城乡规划建设局网站
  • 外卖网站怎么做北京企业网站案例
  • 高中信息技术课程做网站网站宣传图
  • 学网站维护怎么改一个网站的关键词密度
  • 个人做众筹网站合法吗wordpress更换域名后台登不进去
  • 建设银行儿童网站2345网址导航站
  • 网站设计制作厂家有哪些网站为什么要seo
  • 洛阳微信平台网站建设推荐一个免费网站
  • 小馋网站建设书专业的营销型网站建设价格
  • 苗木网站素材佛山网站建设业务员
  • 企业网站一般要素即墨区城乡建设局网站官网
  • 好的网站有哪些灰色关键词排名代发
  • wordpress付费建站北京城建建设工程有限公司网站
  • 哈尔滨网站建设吧网站页面太多怎么做网站地图
  • 网站图片移动怎么做的自己做视频的网站吗
  • 工程建设标准化期刊网站最近三天的新闻大事国内
  • 校园安全网站建设如何优化移动端网站
  • 企业网站制作 厦门如何把wordpress的文章页写成模板
  • 快速微信网站设计工信部信息备案网站查询系统
  • 网站根目录相对路径做网站赚大钱
  • 大一学生做的网站校园网站素材
  • 商城网站规划百度收录的网页数量
  • php律师网站源码免费推广软件平台
  • 做纸巾定制的网站龙口网站建设公司报价
  • 网站建设推广案例如何一个空间放两个网站
  • 网站建设工作部署会赣州建设培训网官网