敦煌网站建设,手机怎么制作网站教程步骤,青岛网站推广优化公司,网站首页流程图目录
1、项目介绍
2、高并发内存池整体框架设计
3、thread cache
1thread cache 哈希桶对齐规则
2Thread Cache类设计
4、Central Cache
1Central Cache类设计
5、page cache
1Page Cache类设计 6、性能分析
1定长内存池实现…目录
1、项目介绍
2、高并发内存池整体框架设计
3、thread cache
1thread cache 哈希桶对齐规则
2Thread Cache类设计
4、Central Cache
1Central Cache类设计
5、page cache
1Page Cache类设计 6、性能分析
1定长内存池实现
2基数树 7、项目源码及项目总结 1、项目介绍 当前项目是实现一个高并发的内存池他的原型是google的一个开源项目tcmalloctcmalloc全称Thread-Caching Malloc即线程缓存的malloc实现了高效的多线程内存管理用于替代系统的内存 分配相关的函数malloc、free。 另一方面tcmalloc是全球大厂google开源的可以认为当时顶尖的C高手写出来的他的知名度也是非常高的不少公司都在用它Go语言直接用它做了自己内存分配器。 本文是把tcmalloc最核心的框架简化后拿出来模拟实现出一个自己的高并发内存池目的是学习tcamlloc的精华。应用技术 数据结构链表、哈希桶、操作系统内存管理、单例模式、多线程、互斥锁等。什么是内存池 想必大家看到这几个字也应该自己能想出个大概简单来说内存池是指程序预先从操作系统申请一块足够大内存然后自己管理。此后当程序中需要申请内存的时候不是直接向操作系统申请而是直接从内存池中获取同理当程序释放内存的时候并不真正将内存返回给操作系统而是返回内存池。当程序退出(或者特定时间)时内存池才将之前申请的内存真正释放。
内存池主要解决了效率问题避免频繁找操作系统申请内存。 其次如果作为系统的内存分配器的角度还需要解决一下内存碎片的问题。 那么什么是内存碎片呢 内存碎片分为外碎片和内碎片内碎片我们在下文项目中具体解释这里我们简单概述一下这里我们主要看比较容易理解的外碎片如图 现有768Byte的空间但是如果我们要申请超过512Byete的空间却申请不出来,因为这两块空间碎片化不连续了。
内碎片内部碎片是由于一些对齐的需求导致分配出去的空间中一些内存无法被利用。(具体见下文哈希桶对齐规则) 注我们下面实现的内存池主要是是尝试解决的是外部碎片的问题同时也尽可能的减少内部碎片的产生。 malloc C/C中我们要动态申请内存都是通过malloc去申请内存但是我们要知道实际我们不是直接去堆获取内存的 而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间然后“零售”给程 序用。当全部“售完”或程序有大量的内存需求时再根据实际需求向操作系统“进货”。2、高并发内存池整体框架设计 现代很多的开发环境都是多核多线程在申请内存的场景下必然存在激烈的锁竞争问题。malloc本身其实已经很优秀那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹所以这次我们实现的内存池需要考虑以下几方面的问题。 性能问题。 多线程环境下锁竞争问题。 内存碎片问题。 concurrent memory pool主要由3个部分构成 1. thread cache线程缓存是每个线程独有的用于小于256KB的内存的分配线程从这里申请内存不需要加锁每个线程独享一个thread cache这也就是这个并发线程池高效的地方。 2. central cache中心缓存是所有线程所共享thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象避免一个线程占用了太多的内存而其他线程的内存吃紧达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的所以从这里取内存对象是需要加锁首先这里用的是桶锁其次只有thread cache的对象没有内存时才会找central cache所以这里竞争不会很激烈。 3. page cache页缓存是在central cache缓存上面的一层缓存存储的内存是以页为单位存储及分配的central cache没有内存对象时从page cache分配出一定数量的page并切割成定长大小的小块内存分配给central cache。当一个span的几个跨度页的对象都回收以后page cache 会回收central cache满足条件的span对象并且合并相邻的页组成更大的页缓解内存碎片的问题。3、thread cache thread cache是哈希桶结构每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象这样每个线程在这里获取对象和释放对象时是无锁的。 thread cache支持小于等于256KB内存的申请如果我们将每种字节数的内存块都用一个自由链表进行管理的话那么此时我们就需要20多万个自由链表光是存储这些自由链表的头指针就需要消耗大量内存这显然是得不偿失的。 这时我们可以选择做一些平衡的牺牲让这些字节数按照某种规则进行对齐(见下文对齐规则)例如我们让这些字节数都按照8字节进行向上对齐(考虑到32位和64位下指针大小)那么thread cache的结构就是下面这样的此时当线程申请1~8字节的内存时会直接给出8字节而当线程申请9~16字节的内存时会直接给出16字节以此类推。 申请内存 1. 当内存申请size256KB先获取到线程本地存储的thread cache对象计算size映射的哈希桶自由链表下标i。 2. 如果自由链表_freeLists[i]中有对象则直接Pop一个内存对象返回。 3. 如果_freeLists[i]中没有对象时则批量从central cache中获取一定数量的对象插入到自由链表并返回一个对象。 释放内存 1. 当释放内存小于256k时将内存释放回thread cache计算size映射自由链表桶位置i将对象Push到_freeLists[i]。 2. 当链表的长度过长则回收一部分内存对象到central cache。 通过上图分析我们需要一个自由链表来管理内存块下面我们来对自由链表进行封装仅写出当前一些很容易想到的接口后序需要我们在进行添加。 class FreeList {
private:void* _freelist nullptr;size_t _size 0; //记录链表长度size_t _MaxSize 1; //控制慢增长
public:void push(void* obj) {assert(obj);//头插CurNext(obj) _freelist;_freelist obj;_size;}void* popFront() {assert(_freelist);void* cur _freelist;_freelist CurNext(_freelist);--_size;return cur;}bool Empty() {return _freelist nullptr;}size_t Size() {return _size;}size_t MaxSize() {return _MaxSize;}
}; 1thread cache 哈希桶对齐规则 字节数对齐数哈希桶下标区间桶数量[1,128]8byte对齐freelist[0,16)16 [1281,1024] 16byte对齐freelist[16,72)56[10241,8*1024]128byte对齐freelist[72,128)56[8*10241,64*1024]1024byte对齐freelist[128,184)56[64*10241,256*1024]8*1024byte对齐freelist[184,208)24 上文中我们已经提到过内碎片这个概念我们应该尽可能减少内碎片的产生。按照上面对齐规则的话整体控制在10%左右的内碎片浪费第一个区间我们不做考虑因为1字节就算对齐到2字节也会产生50%的空间浪费我们从第二个区间开始计算比如我们申请130个字节实际给到的是145字节实际多给了15字节。15/145 ≈ 0.103浪费了大概在10左右下面几个区间大家可以自己计算下浪费率也是大概在10%左右。 有了对齐规则我们还需要计算出相应的内存对应在哪一个桶中如上表中每个区间都有一定桶的数量如[1,128]有16个桶那么我们申请1~8字节都对应在下标0号桶中9~16字节都对应在下标1号桶中于是我们需要一个函数来处理计算。 对齐规则和下标映射规则编写 为了后序使用方便我们将其封装在一个类当中。 static const size_t PAGE_SHIFT 13;
static const size_t MAX_BYTES 256 * 1024; //最大字节数//计算对象大小对齐规则
class SizeClass {
public://20 8 -- 24//110 8 -- 112//容易想到的//size_t _AlignSize(size_t bytes, size_t alignNum) {// size_t alignSize;// if (bytes % alignNum 0) {// alignSize bytes;// }// else {// alignSize (bytes / alignNum 1) * alignNum;// }// return alignSize;//}static inline size_t _AlignSize(size_t bytes, size_t alignNum){return ((bytes alignNum - 1) ~(alignNum - 1));}//对齐大小计算static inline size_t AlignSize(size_t bytes) {//assert(bytes MAX_BYTES);if (bytes 128) {return _AlignSize(bytes, 8);}else if (bytes 1024) {return _AlignSize(bytes, 16);}else if (bytes 8 * 1024) {return _AlignSize(bytes, 128);}else if (bytes 64 * 1024) {return _AlignSize(bytes, 1024);}else if (bytes 256 * 1024) {return _AlignSize(bytes, 8 * 1024);}else {return _AlignSize(bytes, 1 PAGE_SHIFT); //页对齐}}//容易想到的//size_t _Index(size_t bytes, size_t alignNum) {// if (bytes % alignNum 0) {// return bytes / alignNum - 1; //下标从0开始// }// else {// return bytes / alignNum;// }//}//20 3 -- 1//130 4 -- 8//好的实现方法static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes (1 align_shift) - 1) align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes MAX_BYTES);static int group_array[4] { 16, 56, 56, 56 }; // 每个区间有多少个链if (bytes 128) {return _Index(bytes, 3); //传2的次方}else if (bytes 1024) {return _Index(bytes - 128, 4) group_array[0];}else if (bytes 8 * 1024) {return _Index(bytes - 1024, 7) group_array[1] group_array[0];}else if (bytes 64 * 1024) {return _Index(bytes - 8 * 1024, 10) group_array[2] group_array[1] group_array[0];}else if (bytes 256 * 1024) {return _Index(bytes - 64 * 1024, 13) group_array[3] group_array[2] group_array[1] group_array[0];}else {assert(false);}return -1;}
}; 2Thread Cache类设计 通过上述内存申请分析可以想到我们需要申请内存函数和释放内存函数释放内存函数分两种情况链表长度较短直接将对象挂在链表中链表长度过长释放链表以及从中心缓存central cache获取对象的一个函数。如下定义 class ThreadCache {
private:FreeList _freelists[NFREELIST];
public:void* Allocate(size_t size); //申请内存void Deallocate(void* ptr, size_t size); //释放内存void ListTooLong(FreeList list, size_t size); //链表太长释放链表//从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
};//TLS thread local storage -- 线程局部存储
static _declspec(thread) ThreadCache* pTLSThreadCache nullptr; 接口实现
//申请内存
void* ThreadCache::Allocate(size_t size) {assert(size MAX_BYTES);//传过来申请字节数计算出对齐大小(实际给到的内存大小)size_t alignSize SizeClass::AlignSize(size);size_t index SizeClass::Index(size);if (!_freelists[index].Empty()) {return _freelists[index].popFront();}else {//从中心缓存获取对象//...}
}
//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size) {assert(ptr);assert(size MAX_BYTES);//找到对应桶位置进行插入size_t index SizeClass::Index(size);_freelists[index].push(ptr);//当链表长度大于一次申请的最大内存值将其还给central cacheif (_freelists[index].Size() _freelists[index].MaxSize()) {ListTooLong(_freelists[index], size);}
}
//链表太长释放链表
void ThreadCache::ListTooLong(FreeList list, size_t size) {//首先从原链表中将这段链表删除,接着还给中心缓存//在freelist中增加区间删除函数void* start nullptr;void* end nullptr;list.PopRange(start, end, list.Size());//将链表还给中心缓存//...
}//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {//慢反馈调节算法//1、最开始不会一次向central cache批量要太多因为要太多有可能会用不完//2、如果不断size大小内存需求那么batchNum就会不断增长直到上限//3、size越小一次向central cache要的batchNum越小//4、size越大一次向central cache要的batchNum越大size_t batchNum std::min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freelists[index].MaxSize() batchNum) {_freelists[index].MaxSize() 1;}//调用cnetral cache中获取对象接口//...
} 线程局部存储TLS(Thread Local Storage) 要实现每个线程无锁的访问属于自己的thread cache我们需要用到线程局部存储TLS(Thread Local Storage)这是一种变量的存储方法使用该存储方法的变量在它所在的线程是全局可访问的但是不能被其他线程访问到这样就保持了数据的线程独立性。但不是每个线程被创建时就立马有了属于自己的thread cache而是当该线程调用相关申请内存的接口时才会创建自己的thread cache。 在FreeList类中添加PopRange函数 void PopRange(void* start, void* end, size_t n) {assert(n _size);start _freelist;end start;for (int i 0; i n - 1; i) {end CurNext(end);}_freelist CurNext(end);CurNext(end) nullptr;_size - n;} 在SizeClass类中添加NumMoveSize函数 // 一次thread cache从中心缓存获取多少个//也就是计算可以给到你几个对象//其中上限512,下限2也可以理解为限制桶中链表的长度static size_t NumMoveSize(size_t size){assert(size 0);// [2, 512]一次批量移动多少个对象的(慢启动)上限值//对象越小计算出的上限越高//对象越大计算出的上限越低int num MAX_BYTES / size;if (num 2)num 2;if (num 512)num 512;return num;} 4、Central Cache
central cache也是一个哈希桶结构他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。申请内存 1. 当thread cache中没有内存时就会批量向central cache申请一些内存对象这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法central cache也有一个哈希映射的spanlistspanlist中挂着span从span中取出对象给thread cache这个过程是需要加锁的不过这里使用的是一个桶锁尽可能提高效率。 2. central cache映射的spanlist中所有span的都没有内存以后则需要向page cache申请一个新的span对象拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。 3. central cache的中挂的span中use_count记录分配了多少个对象出去分配一个对象给thread cache就use_count。释放内存1. 当thread_cache过长或者线程销毁则会将内存释放回central cache中的释放回来时 --use_count。当use_count减到0时则表示所有对象都回到了span则将span释放回page cachepage cache中会对前后相邻的空闲页进行合并。1Central Cache类设计 根据上图我们可以知道central cache也是一个哈希桶结构通中挂的是一个个的span并且是双链表而sapn中又包含了freellist如下定义 首先定义一个SpanNode的节点来表示一个个的Span对象
struct SpanNode {PAGE_ID _pageId 0; //大块内存起始页号size_t _n 0; //页的数量SpanNode* _next nullptr;SpanNode* _prev nullptr;void* _freeList nullptr; //自由链表size_t _size 0; //切好小对象大小size_t _useCount 0; //分配给thread cache小块内存数量bool _isUse false; //是否正在被使用}; 接着实现一个双链表结构用来将Span挂接起来 class SpanList {
private:SpanNode* _head nullptr; //头节点std::mutex _mtx; //桶锁
public:SpanList() {_head new SpanNode;_head-_next _head;_head-_prev _head;}~SpanList() {delete _head;}std::mutex getMutex() {return _mtx;}SpanNode* Begin() {return _head-_next;}SpanNode* end() {return _head;}bool Empty() {return _head-_next _head;}void Insert(SpanNode* pos, SpanNode* newSpan) {assert(pos newSpan);SpanNode* prev pos-_prev;prev-_next newSpan;newSpan-_prev prev;pos-_prev newSpan;newSpan-_next pos; }void Erase(SpanNode* pos) {assert(pos);assert(pos ! _head);SpanNode* prev pos-_prev;SpanNode* next pos-_next;prev-_next next;next-_prev prev;}SpanNode* PopFront() {SpanNode* ret _head-_next;Erase(ret);return ret;}void PushFront(SpanNode* spanNode) {Insert(Begin(), spanNode);}
}; Central Cache类定义 和Thread Cache不同的是每个线程中都会有一个Thread Cache对象以防止各个线程互相抢占内存的问题。而Central Cache为所有线程所共享且全局只有一个Central Cache对象故我们应该将其实现为单例模式(其中懒汉模式涉及线程安全问题故我们将其实现为懒汉模式)。其主要作用是向下为Thread Cache提供内存向上向Page Cache申请内存。回收内存也是同理。故我们需要定义一下几个接口从Page Cache获取非空spanNode、为Thread Cache提供内存、将内存释放回Page Cahce三个函数。如下定义//单例模式懒汉
class CentralCache {
private:SpanList _SpanLists[NFREELIST];
private:CentralCache() {}CentralCache(const CentralCache) delete;static CentralCache _sInst; //类外初始化
public:static CentralCache* GetInStance() {return _sInst;}// 获取一个非空的spanNodeSpanNode* GetOneSpan(SpanList list, size_t size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void* start, void* end, size_t batchNum, size_t size);// 将一定数量的对象释放到spanvoid ReleaseListToSpans(void* start, size_t byte_size);
}; 接口实现 // 获取一个非空的spanNode
SpanNode* CentralCache::GetOneSpan(SpanList list, size_t size) {//查看当前桶中的每个SpanNode节点是否有未分配的对象SpanNode* it list.Begin();while (it ! list.end()) {if (it-_freeList ! nullptr) {return it;}else {it it-_next;}}//走到这里说明当前桶中每个节点没有未分配的对象了,找pageCache要//...}// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void* start, void* end, size_t batchNum, size_t size) {size_t index SizeClass::Index(size);//这里会涉及多个线程同时从中心缓存获取内存的情况故应该加锁_SpanLists[index].getMutex().lock();SpanNode* spanNode GetOneSpan(_SpanLists[index], size);assert(spanNode);assert(spanNode-_freeList);//从spanNode中获取batchNum个对象,如果不够有多少拿多少start spanNode-_freeList;end start;int i 0;int ActualNum 1;while (i batchNum - 1 CurNext(end) ! nullptr) {end CurNext(end);i;ActualNum;}//从spanNode将这段链表删除,并统计出该spanNode中自由链表已经使用的数量spanNode-_freeList CurNext(end);CurNext(end) nullptr;spanNode-_useCount ActualNum;_SpanLists[index].getMutex().unlock();return ActualNum;
}关于当链表太长时要将内存还给SpanNode时情况稍微有点复杂需要好好思考一下。因为还回来的链表中的每个节点的地址我们没办法确定是否连续的。这就导致有可能换回来一个链表但其中的节点分布于_SpanList的多个SpanNode节点中这就需要对其筛选让其进入不同的桶中这部分代码我们放到Page Cache中来实现因为Central Cache中的SpanNode节点都是Page Cache分配给他的这里我们先把大概逻辑顺一下如下图 测试代码如下 void TestAddressPage() {PAGE_ID id1 3000;PAGE_ID id2 3001;char* p1 (char*)(id1 PAGE_SHIFT);char* p2 (char*)(id2 PAGE_SHIFT);while (p1 p2) {cout (void*)p1 : ((PAGE_ID)p1 PAGE_SHIFT) endl;p1 8;}
} 执行结果 由上图我们可以看出事实和我们的推论是一样的。释放内存代码如下图 //将一定数量的对象释放到span
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) {size_t index SizeClass::Index(byte_size);_SpanLists[index].getMutex().lock();//将传过来的链表一个个头插进对应的spanNode中while (start) {void* next CurNext(start);//通过链表每个节点的地址来获取对应的spanNode地址SpanNode* SpanNode PageCache::GetInstance()-MapObjectToSpan(start);CurNext(start) SpanNode-_freeList;SpanNode-_freeList start;SpanNode-_useCount--; //每头插回来一个已使用数量减一//当SpanNode已使用数量为0时说明该节点中的自由链表节点都还回来了,继续归还给PageCache处理if (SpanNode-_useCount 0) {//归还给PageCache//...}start next;}_SpanLists[index].getMutex().unlock();
} 5、page cache 首先central cache的映射规则与thread cache保持一致而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法比如1号桶挂的都是1页的span2号桶挂的都是2页的span以此类推。 其次central cache每个桶中的span被切成了一个个对应大小的对象以供thread cache申请。而page cache当中的span是没有被进一步切小的因为page cache服务的是central cache当central cache没有span时向page cache申请的是某一固定页数的span而如何切分申请到的这个span就应该由central cache自己来决定。 注上图中1page和3page桶下面挂的链在最开始的时候是没有的他们都是由128page我们每次向系统申请的是固定大小128page切分后挂到上面的切分逻辑见下文代码。
申请内存 1. 当central cache向page cache申请内存时page cache先检查对应位置有没有span如果没有则向更大页寻找一个span如果找到则分裂成两个。比如申请的是4页page4页page后面没有挂span则向后面寻找更大的span假设在10页page位置找到一个span则将10页page span分裂为一个4页page span和一个6页page span。 2. 如果找到_spanList[128]都没有合适的span则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中再重复1中的过程。 3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶但是他们是有本质区别的central cache中哈希桶是按跟thread cache一样的大小对齐关系映射的他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的也就是说第i号桶中挂的span都是i页内存。 释放内存 1. 如果central cache释放回一个span则依次寻找span的前后page id的没有在使用的空闲span看是否可以合并如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span减少内存碎片。1Page Cache类设计 通过上面的分析我们可以知道Page Cache和Central Cache一样是被多个线程共享的故也应该将其设计为单例模式上面在讲Central Cache释放逻辑是提到我们要根据自由链的节点地址找到其对应SpanNode节点故我们还应该添加其映射关系这里我们采用unordered_map进行映射。此外我们还需要对Central Cache提供对象的接口、回收SpanNode的接口、获取映射关系的接口如下定义 static const size_t NPAGES 129; //下标是从0开始的class PageCache {
private:SpanList _pageLists[NPAGES];std::unordered_mapPAGE_ID, SpanNode* _idSpanNodeMap;std::mutex _PageMtx;
private:PageCache() {};PageCache(const PageCache) delete;static PageCache _sInst;
public:static PageCache* GetInstance() {return _sInst;}std::mutex GetMutex() {return _PageMtx;}//获取n页SpanNodeSpanNode* NewSpan(size_t n);//获取从对象到SpanNode的映射SpanNode* MapObjectToSpanNode(void* obj);//释放空闲SpanNode回到PageCache并合并相邻的SpanNodevoid ReleaseSpanNodeToPageCache(SpanNode* SpanNode);
}; 代码实现 当Page Cache中无内存时我们需要向系统取申请内存故我们应该提供一个向系统申请内存的接口如下 windows和Linux下如何直接向堆申请页为单位的大块内存// 去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr VirtualAlloc(0, kpage 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr nullptr)throw std::bad_alloc();return ptr;
}inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// linux下sbrk unmmap等
#endif
} static const size_t NPAGES 129; //下标是从0开始的PageCache PageCache::_sInst;//获取n页SpanNode
SpanNode* PageCache::NewSpan(size_t n) {assert(n 0);//大于128页直接向堆申请内存if (n NPAGES - 1) {void* ptr SystemAlloc(n);SpanNode* spanNode new SpanNode;spanNode-_pageId (PAGE_ID)ptr PAGE_SHIFT; //右移13位相当于除8kspanNode-_n n;_idSpanNodeMap[spanNode-_pageId] spanNode; //建立映射方便释放内存return spanNode;}//检查对应桶中是否有sapnNodeif (!_pageLists[n].Empty()) {SpanNode* nSpanNode _pageLists[n].PopFront();//将其中节点建立映射关系映射for (PAGE_ID i 0; i nSpanNode-_n; i) {_idSpanNodeMap[nSpanNode-_pageId i] nSpanNode;}return nSpanNode;}//检查后面桶中是否有SpanNode节点有则进行切分for (size_t i n 1; i NPAGES; i) {if (!_pageLists[i].Empty()) {//找到第i个桶不为空时先将其取出切分出n个后在放回相应桶中SpanNode* ISpanNode _pageLists[i].PopFront();//开辟出一个节点进行切分,然后返回SpanNode* NSpanNode new SpanNode;NSpanNode-_n n;NSpanNode-_pageId ISpanNode-_pageId;ISpanNode-_pageId n;ISpanNode-_n - n;_pageLists[ISpanNode-_n].PushFront(ISpanNode);//存储ISpanNode起始页号映射方便回收_idSpanNodeMap[ISpanNode-_pageId] ISpanNode;_idSpanNodeMap[ISpanNode-_pageId ISpanNode-_n - 1] ISpanNode;for (PAGE_ID i 0; i NSpanNode-_n; i) {_idSpanNodeMap[NSpanNode-_pageId i] NSpanNode;}return NSpanNode;}}//走到这里说明后面的桶中没有剩余的SpanNode节点,找系统申请一页SpanNode* bigSpanNode new SpanNode;void* ptr SystemAlloc(NPAGES - 1);bigSpanNode-_pageId (PAGE_ID)ptr PAGE_SHIFT; bigSpanNode-_n NPAGES - 1;_pageLists[bigSpanNode-_n].PushFront(bigSpanNode); //将申请的大页span放入哈希桶中return NewSpan(n); //重新调用进行切分}
//获取从对象到SpanNode的映射
SpanNode* PageCache::MapObjectToSpanNode(void* obj) {PAGE_ID id (PAGE_ID)obj PAGE_SHIFT;std::unique_lockstd::mutex lock(PageCache::GetInstance()-GetMutex()); //出作用域自动解锁auto ret _idSpanNodeMap.find(id);if (ret ! _idSpanNodeMap.end()) {//找到了进行返回return ret-second;}else {//找不到说明出问题了直接报错assert(false); return nullptr;}
} //释放空闲SpanNode回到PageCache并合并相邻的SpanNode
void PageCache::ReleaseSpanNodeToPageCache(SpanNode* spanNode) {//大于128页直接还给堆if (spanNode-_n NPAGES - 1) {//根据页号算出响应的地址,然后进行释放void* ptr (void*)(spanNode-_pageId PAGE_SHIFT);SystemFree(ptr);return;}//向前合并while (1) {PAGE_ID prev spanNode-_pageId - 1;//找不到跳出循环auto ret _idSpanNodeMap.find(prev);if (ret _idSpanNodeMap.end()) {break;}//prevSpanNode正在被使用跳出循环SpanNode* prevSpanNode ret-second;if (prevSpanNode-_isUse true) {break;}//合并出超出128kb的spanNode不进行管理if (prevSpanNode-_n spanNode-_n NPAGES - 1) {break;}//进行合并spanNode-_pageId prevSpanNode-_pageId;spanNode-_n prevSpanNode-_n;_pageLists[prevSpanNode-_n].Erase(prevSpanNode);delete prevSpanNode;}//向后合并while (1) {PAGE_ID next spanNode-_pageId - 1;//找不到跳出循环auto ret _idSpanNodeMap.find(next);if (ret _idSpanNodeMap.end()) {break;}//prevSpanNode正在被使用跳出循环SpanNode* nextSpanNode ret-second;if (nextSpanNode-_isUse true) {break;}//合并出超出128kb的spanNode不进行管理if (nextSpanNode-_n spanNode-_n NPAGES - 1) {break;}//进行合并spanNode-_n nextSpanNode-_n;_pageLists[nextSpanNode-_n].Erase(nextSpanNode);delete nextSpanNode;}//放回哈希桶中_pageLists[spanNode-_n].PushFront(spanNode);spanNode-_isUse false;存储ISpanNode起始页号映射方便回收_idSpanNodeMap[spanNode-_pageId] spanNode;_idSpanNodeMap[spanNode-_pageId spanNode-_n - 1] spanNode;
} 6、性能分析 参考了一段别人的测试代码进行测试 测试代码链接 运行结果由下图可见我们当前实现的内存池效率和malloc比起来差了许多。这个时候不要去盲目改代码我们可以在VS中找到性能分析来分析我们的程序看看是什么原因导致的如下图 由此我们可以看出我们大部分时间都浪费在锁上面了而且是map映射的时候那么有什么办法呢
其实我们除了上面的锁浪费时间外还有程序中的new我们可以替换成一个定长内存池因为其效率比new要高一些大家可以自行测试一下。
1定长内存池实现
malloc其实就是一个通用的内存池在什么场景下都可以使用但这也意味着malloc在什么场景下都不会有很高的性能因为malloc并不是针对某种场景专门设计的。定长内存池就是针对固定大小内存块的申请和释放的内存池由于定长内存池只需要支持固定大小内存块的申请和释放因此我们性能方面要比malloc高一些并且在实现定长内存池时不需要考虑内存碎片等问题因为我们申请/释放的都是固定大小的内存块。
代码如下
templateclass T
class FiedMemoryPool {
private:char* _memory nullptr; //指向大块内存的指针void* _freeList nullptr;//还内存链接自由链表头指针size_t _residueBytes 0; //剩余字节数
public:T* New() {T* obj nullptr;//优先把还回来的对象重复利用if (_freeList) {void* next *((void**)_freeList);obj (T*)_freeList;_freeList next;}else {//剩余内存不够一个对象大小时重新开辟内存if (_residueBytes sizeof(T)) {_residueBytes 128 * 1024;_memory (char*)SystemAlloc(_residueBytes PAGE_SHIFT);if (_memory nullptr) {throw std::bad_alloc();}}//给目标分配内存obj (T*)_memory;size_t objSize sizeof(T) sizeof(void*) ? sizeof(void*) : sizeof(T); //至少开辟一个指针大小_memory objSize;_residueBytes - objSize;}//用定位new显示调用其构造函数new (obj)T;return obj;}void Delete(T* obj) {//显示调用其析构函数obj-~T();//头插进自由链表*(void**)obj _freeList;_freeList obj;}
};
有了定长内存池我们只需要将代码中new出来的对象替换成用定长内存池来申请就可以了new 底层依然是调用malloc由于这部分替换起来比较简单就不在详细讲述了具体代码参考文章末尾项目源代码。 2基数树
基数树之所以能够提升效率是因为基数树在使用时是不用加锁的基数树的空间一旦开辟好了就不会发生变化因此无论什么时候去读取某个页的映射都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作因为我们只有在释放对象时才需要读取映射。 基数树代码链接
有了基数树我们只需要将PageCache中unorder_map定义的对象用基数树来定义就可以了具体代码实现参考文章末尾源代码实现。
使用上面两种方法对项目进行优化后在进行测试如图 由上图很明显可以看出此时我们实现的内存池在并发环境下效率更胜一筹。 7、项目源码
项目源码
项目到这里就算完成了感觉有用的话期待大家点赞关注项目中有哪些地方有疑问的话欢迎大家评论留言。