1什么是內存池
1.1池化技術
所謂“池化技術”,就是程序先向系統申請過量的資源,然后自己管理,以備不時之需。之所以要申請過 量的資源,是因為每次申請該資源都有較大的開銷,不如提前申請好了,這樣使用時就會變得非常快 捷,大大提高程序運行效率。 在計算機中,有很多使用“池”這種技術的地方,除了內存池,還有連接池、線程池、對象池等。以服務 器上的線程池為例,它的主要思想是:先啟動若干數量的線程,讓它們處于睡眠狀態,當接收到客戶端 的請求時,喚醒池中某個睡眠的線程,讓它來處理客戶端的請求,當處理完這個請求,線程又進入睡眠 狀態。
1.2內存池
內存池是指程序預先從操作系統申請一塊足夠大內存,此后,當程序中需要申請內存的時候,不是直接 向操作系統申請,而是直接從內存池中獲取;同理,當程序釋放內存的時候,并不真正將內存返回給操 作系統,而是返回內存池。當程序退出(或者特定時間)時,內存池才將之前申請的內存真正釋放。減少用戶層向系統層的內存申請調用,實現高效;
2.開胃菜-設計一個定長內存池
此次設計的定長內存池ObjectPool結構如下:(其單位小內存塊的大小為T類型的大小)
- 需要申請內存時,一次性申請一個大塊空間,記錄起始位置_memory;
- 需要使用size大小時,將大塊內存抽象切割成小塊,將起始位置_memory向后移動size大小;
- 釋放內存時,將需要釋放的空間掛入自由鏈表_freeList,可供下次申請使用;注意:這個鏈表不是通過內部指針連接下一個,而是通過前一塊空間的前4or8個字節記錄后一個小空間的起始地址抽象連接的;
這樣做的目的是循環利用預先開辟的一大塊空間,減少用戶層申請內存時與系統層的交互,提高效率;
代碼實現:
ObjectPool.h
#define _CRT_SECURE_NO_WARNINGS 1
#include
using namespace std;
#ifdef _WIN32
#include
#else
// 包Linux相關頭文件,增加代碼的可移植性;
#endif
// 直接去堆上按頁申請空間 擺脫malloc;
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
template
class ObjectPool
{
char* _memory = nullptr; // 指向大塊內存的指針
size_t _remainBytes = 0; // 大塊內存在切分過程中剩余字節數
void* _freeList = nullptr; // 還回來過程中鏈接的自由鏈表的頭指針
public:
T* New()
{
T* obj = nullptr;
//如果之前有申請的空間被釋放,那就先從free了的空間拿;(內存池技術)
if (_freeList != nullptr)
{
//頭刪
//注意!*((void**)_freeList)相當訪取_freeList前4or8個字節操作(由32位系統->指針4byte and 64位系統->指針8byte決定);
void* next = *((void**)_freeList);
obj = (T*)_freeList;
_freeList = next;
}
else
{
if (_remainBytes < sizeof(T))
{
// 剩余內存不夠一個對象大小時,則重新開大塊空間
_remainBytes = 128 * 1024;
//_memory = (char*)malloc(_remainBytes);
//使用SystemAlloc直接向堆申請內存,脫離malloc,方便體現malloc和該ObjectPool的差別;
_memory = (char*)SystemAlloc(_remainBytes >> 13);//char*類型內存可以使其+1or-1的單位操作為1字節,方便內存管理;
if (_memory == nullptr)
{
throw std::bad_alloc();
}
}
//防止某個T類還沒當前系統下一個指針大小大 那么就裝不下后一個的地址了,這里做特殊處理;至少保證一個對象內足夠裝的下一個指針大小;
int Objsize = sizeof(T) < sizeof(void*) ? sizeof(void*): sizeof(T);
obj = (T*)_memory;
_memory += Objsize;
_remainBytes -= Objsize;
}
// 定位new,內置類型不處理,自定義類型調用其構造函數;
new(obj)T;
return obj;
}
void Delete(T* obj)
{
//內置類型不處理,自定義類型調用其構析構函數;
obj->~T();
//頭插(此處不是真正的刪除,而是標志為未使用,掛入自由鏈表)
*(void**)obj = _freeList;
_freeList = obj;
}
};
ObjectPool.cpp測試
#include
#include "ConcurrentAlloc.h"
//用于測試的類型樹節點;
struct TreeNode
{
int _val;
TreeNode* _left;
TreeNode* _right;
TreeNode()
:_val(0)
, _left(nullptr)
, _right(nullptr)
{}
};
void TestObjectPool()
{
// 申請釋放的輪次
const size_t Rounds = 5;
// 每輪申請釋放多少次
const size_t N = 100000;
std::vector v1;
v1.reserve(N);
size_t begin1 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v1.push_back(new TreeNode);
}
for (int i = 0; i < N; ++i)
{
delete v1[i];
}
v1.clear();
}
size_t end1 = clock();
std::vector v2;
v2.reserve(N);
ObjectPool TNPool;
size_t begin2 = clock();
for (size_t j = 0; j < Rounds; ++j)
{
for (int i = 0; i < N; ++i)
{
v2.push_back(TNPool.New());
}
for (int i = 0; i < N; ++i)
{
TNPool.Delete(v2[i]);
}
v2.clear();
}
size_t end2 = clock();
cout << "new cost time:" << end1 - begin1 << endl;
cout << "object pool cost time:" << end2 - begin2 << endl;
}
int mian()
{
TestObjectPool();
return 0;
}*>*>
運行結果:
顯然,定長內存池的New空間比new(底層封裝的是malloc申請空間)更高效
3.TCmalloc(高并發內存池)整體框架介紹:
谷歌開源項目:TCMalloc (google-perftools) 是用于優化C++寫的多線程應用,比glibc 2.3的malloc快。這個模塊可以用來讓MySQL在高并發下內存占用更加穩定。
ThreadCache
thread cache:線程緩存是每個線程獨有的,用于小于256KB的內存的分配(設計規定),線程從這里申請內存不需要加鎖,每個線程獨享一個cache,這也就是這個并發線程池高效的地方。核心結構是FreeList _freeLists[NFREELIST];eg:_freeLists[2]代表該size對應哈希桶中有n個未使用的obj size大小的小內存塊,當需要申請時優先從_freeList中拿無人使用的被掛起的obj空間;
CentralCache
central cache:中心緩存是所有線程所共享,thread cache是按需從central cache中獲取的對 象。central cache合適的時機回收thread cache中的對象,避免一個線程占用了太多的內存,而 其他線程的內存吃緊,達到內存分配在多個線程中更均衡的按需調度的目的。central cache是存在競爭的,所以從這里取內存對象是需要加鎖,首先這里用的是桶鎖,其次只有thread cache的 沒有內存對象時才會找central cache,所以這里競爭不會很激烈。其核心結構是SpanList _spanLists[NFREELIST];eg:_spanLists[2]代表該size對應鏈桶中的n個span,每個span下面掛了n個大小為size的obj小內存塊;(size大小是指通過測試和計算,將所需內存大小x字節依據某種對齊規則對其為size字節,使span的鏈式桶結構數量合適,大小統一)
page cache
page cache:頁緩存是在central cache緩存上面的一層緩存,存儲的內存是以頁為單位存儲及分 配的,central cache沒有內存對象時,從page cache分配出一定數量的page,并切割成定長大小 的小塊內存,分配給central cache。當一個span的幾個跨度頁的對象都回收以后,page cache 會回收central cache滿足條件的span對象,并且合并相鄰的頁,組成更大的頁,緩解內存碎片 的問題。Page中的核心結構是SpanList _spanLists[NPAGES];eg:_spanLists[2]代表該鏈桶含有n個大小為2頁(2*8字節)的span;如果從page[1]到page[128]都沒有可用span,那么只能從系統堆直接申請一塊大空間放進去,再給central thread層用了;
本文TCmalloc整體項目圍繞上述三層結構實現,抽取原項目的小部分核心內容學習;這個項目會用到C/C++、數據結構(鏈表、哈希桶)、操作系統內存管理、單例模式、多線程、互斥鎖 等等方面的知識。
4.代碼實現:
Common.h
這是一個通用頭文件,包含各種所需要的庫文件和三層結構需要的對其數和size-申請數量轉換的函數;
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using std::cout;
using std::endl;
#ifdef _WIN32
#include
#else
// ...
#endif
static const size_t MAX_BYTES = 256 * 1024;//ThreadCache中允許申請的最大字節;
static const size_t NFREELIST = 208;//通過對齊計算的thread和central中最大哈希桶下表;
static const size_t NPAGES = 129;//最多128頁,為了方便映射哈希,從第1頁開始計算;
static const size_t PAGE_SHIFT = 13;//一頁==8kb==2^13字節大小 所以 p地址>>PAGE_SHIFT ==PAGE_ID 可相當于將某連續地址以2^13字節對齊,并標上頁號方便管理;eg:0x00000000~0x2^13這連續的2^13個地址 經過>> 為PAGE_ID==1,后連續的2^13個地址 經過>> 為PAGE_ID==2;
//多系統編程;
#ifdef _WIN64
typedef unsigned long long PAGE_ID;
#elif _WIN32
typedef size_t PAGE_ID;
#else
// linux
#endif
// 直接去堆上按頁申請空間
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
// linux下brk mmap等
#endif
if (ptr == nullptr)
throw std::bad_alloc();
return ptr;
}
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
VirtualFree(ptr, 0, MEM_RELEASE);
#else
// sbrk unmmap等
#endif
}
//NextObj(obj) 等價于 *(void**)obj,取obj前4or8個字節(存放下一個小空間地址的位置)進行操作,增加語義;
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 管理切分好的小對象的自由鏈表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
// 頭插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
//從central中申請一批obj大小的內存塊,range插入;
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
///threadcache還一段list給central cache
void PopRange(void*& start, void*& end, size_t n)//n==list.size
{
assert(n <= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
// 頭刪
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;//慢開始算法--從1啟動;
size_t _size = 0;//當前_freeList桶里的obj小空間個數
};
// 計算對象大小的對齊映射規則 方便控制哈希桶用的數量(不要太多) 便于管理obj小內存空間
class SizeClass
{
public:
// 整體控制在最多10%左右的內碎片浪費的對齊規則;
// [1,128] 8byte對齊 freelist[0,16) eg:1~128字節的obj 按照8byte對齊(eg:1~7字節的對象都放入_freeList[0]) 則1~128字節的obj 需要的桶index為0~16即可;
// [128+1,1024] 16byte對齊 freelist[16,72)
// [1024+1,8*1024] 128byte對齊 freelist[72,128)
// [8*1024+1,64*1024] 1024byte對齊 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte對齊 freelist[184,208)
/*size_t _RoundUp(size_t size, size_t alignNum)
{
size_t alignSize;
if (size % alignNum != 0)
{
alignSize = (size / alignNum + 1)*alignNum;
}
else
{
alignSize = size;
}
return alignSize;
}*/
// 1-8
//位運算提高效率
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
//通過size大小計算對其數函數;
static inline size_t RoundUp(size_t size)
{
if (size <= 128)
{
return _RoundUp(size, 8);
}
else if (size <= 1024)
{
return _RoundUp(size, 16);
}
else if (size <= 8*1024)
{
return _RoundUp(size, 128);
}
else if (size <= 64*1024)
{
return _RoundUp(size, 1024);
}
else if (size <= 256 * 1024)
{
return _RoundUp(size, 8*1024);
}
else
{
return _RoundUp(size, 1< }
}
/*size_t _Index(size_t bytes, size_t alignNum)
{
if (bytes % alignNum == 0)
{
return bytes / alignNum - 1;
}
else
{
return bytes / alignNum;
}
}*/
//位運算提高效率
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 通過size計算index位置即映射到哪一個自由鏈表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAX_BYTES);
// 每個區間有多少個鏈
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128){
return _Index(bytes, 3);//3表示按8byte對其
}
else if (bytes <= 1024){
return _Index(bytes - 128, 4) + group_array[0];//4表示按16byte對其, - 128因為前128個字節按照別的對齊規則的,剩下的這些按照自己的對其數對其最后+前面總共的桶數量即可計算自己的index;
}
else if (bytes <= 8 * 1024){
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
}
else if (bytes <= 64 * 1024){
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
}
else if (bytes <= 256 * 1024){
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
}
else{
assert(false);
}
return -1;
}
// 一次thread cache從中心緩存獲取多少個,池化技術:當thread cache沒有可用obj空間時,會向中心緩存申請一批而不是一個;
static size_t NumMoveSize(size_t size)
{
assert(size > 0);
// [2, 512],一次批量移動多少個對象的(慢啟動)上限值
// 小對象一次批量上限高
// 小對象一次批量上限低
int num = MAX_BYTES / size;
if (num < 2)
num = 2;
if (num > 512)
num = 512;
return num;
}
// 計算一次向系統獲取幾個頁;當centralCache對應index無可用span時,向pagecache按頁大小申請,之后再把申請的span切割成n個index大小的obj內存塊;
// 單個對象 8byte
// ...
// 單個對象 256KB
static size_t NumMovePage(size_t size)
{
size_t num = NumMoveSize(size);
size_t npage = num*size;
npage >>= PAGE_SHIFT;
if (npage == 0)
npage = 1;
return npage;
}
};
// 管理多個連續頁大塊內存跨度結構
struct Span
{
PAGE_ID _pageId = 0; // 大塊內存起始頁的頁號;將申請的viod*通過>>PAGE_SHIFT(8K),映射成數字方便管理和掛桶;
size_t _n = 0; // 頁的數量
Span* _next = nullptr; // 雙向鏈表的結構
Span* _prev = nullptr;
size_t _objSize = 0; // 切好的小對象的大小
size_t _useCount = 0; // 切好小塊內存,被分配給thread cache的計數,方便回收span,_useCount==0;
void* _freeList = nullptr; // 切好的小塊內存的自由鏈表
bool _isUse = false; // 是否在被使用,方便合并span,減少內存碎片;
};
// 帶頭雙向循環鏈表 封裝Span節點
class SpanList
{
public:
SpanList()
{
_head = new Span;
_head->_next = _head;
_head->_prev = _head;
}
//Begin()和End()模擬了帶頭雙向循環鏈表;
Span* Begin()
{
return _head->_next;
}
Span* End()
{
return _head;
}
bool Empty()
{
return _head->_next == _head;
}
void PushFront(Span* span)
{
Insert(Begin(), span);
}
Span* PopFront()
{
Span* front = _head->_next;
Erase(front);
return front;
}
//“雙向鏈表實現一個Insert即可高效復用”
void Insert(Span* pos, Span* newSpan)
{
assert(pos);
assert(newSpan);
Span* prev = pos->_prev;
// prev newspan pos
prev->_next = newSpan;
newSpan->_prev = prev;
newSpan->_next = pos;
pos->_prev = newSpan;
}
void Erase(Span* pos)
{
assert(pos);
assert(pos != _head);
Span* prev = pos->_prev;
Span* next = pos->_next;
prev->_next = next;
next->_prev = prev;
}
private:
Span* _head;
public:
std::mutex _mtx; // 桶鎖 可能多個threadcache同時訪問central中的同一個index桶,加鎖-線程安全
};);
ThreadCache.h
thread cache是哈希桶結構,每個桶是一個按桶位置映射大小的內存塊對象的自由鏈表。每個線程都會 有一個thread cache對象,這樣每個線程在這里獲取對象和釋放對象時是無鎖的。
申請內存:
- 當內存申請size<=256KB,先獲取到線程本地存儲的thread cache對象,計算size映射的哈希桶自 由鏈表下標i。
- 如果自由鏈表_freeLists[i]中有對象,則直接Pop一個內存對象返回。
- 如果_freeLists[i]中沒有對象時,則批量從central cache中獲取一定數量的對象,插入到自由鏈表 并返回一個對象。
釋放內存:
2. 當鏈表的長度過長,則回收一部分內存對象到central cache。
class ThreadCache
{
public:
// 申請和釋放內存對象
void* Allocate(size_t size);
void Deallocate(void* ptr, size_t size);
// 從中心緩存獲取對象
void* FetchFromCentralCache(size_t index, size_t size);
// 釋放對象時,鏈表過長時,回收內存回到中心緩存
void ListTooLong(FreeList& list, size_t size);
private:
FreeList _freeLists[NFREELIST];
};
// TLS thread local storage TLS技術,使每個線程里的ThreadCache*獨享,互不影響,實現高并發;
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
// 管理切分好的小對象的自由鏈表
class FreeList
{
public:
void Push(void* obj)
{
assert(obj);
// 頭插
//*(void**)obj = _freeList;
NextObj(obj) = _freeList;
_freeList = obj;
++_size;
}
void PushRange(void* start, void* end, size_t n)
{
NextObj(end) = _freeList;
_freeList = start;
_size += n;
}
void PopRange(void*& start, void*& end, size_t n)
{
assert(n >= _size);
start = _freeList;
end = start;
for (size_t i = 0; i < n - 1; ++i)
{
end = NextObj(end);
}
_freeList = NextObj(end);
NextObj(end) = nullptr;
_size -= n;
}
void* Pop()
{
assert(_freeList);
// 頭刪
void* obj = _freeList;
_freeList = NextObj(obj);
--_size;
return obj;
}
bool Empty()
{
return _freeList == nullptr;
}
size_t& MaxSize()
{
return _maxSize;
}
size_t Size()
{
return _size;
}
private:
void* _freeList = nullptr;
size_t _maxSize = 1;
size_t _size = 0;
};
ThreadCache.cpp
#include "CentralCache.h"
//從thradcache 從 central中要內存
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
// 慢開始反饋調節算法
// 1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完
// 2、如果你不要這個size大小內存需求,那么batchNum就會不斷增長,直到上限
// 3、size越大,一次向central cache要的batchNum就越小
// 4、size越小,一次向central cache要的batchNum就越大
size_t batchNum = min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
if (_freeLists[index].MaxSize() == batchNum)
{
_freeLists[index].MaxSize() += 1;
}
//輸出型參數 將batchNum個obj小空間從centralcache中帶出來;
void* start = nullptr;
void* end = nullptr;
size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
assert(actualNum > 0);
if (actualNum == 1)//就申請到一個obj空間 直接返回給申請的人用
{
assert(start == end);
return start;
}
else//申請了多個obj空間 只返回第一個 則 剩下的插入_freeList留著后面備用;
{
_freeLists[index].PushRange(NextObj(start), end, actualNum-1);
return start;
}
}
//多線程申請
void* ThreadCache::Allocate(size_t size)
{
assert(size <= MAX_BYTES);
size_t alignSize = SizeClass::RoundUp(size);//計算對齊數 相當于需要obj的size
size_t index = SizeClass::Index(size);//計算哈希桶index
if (!_freeLists[index].Empty())//還有剩余直接pop拿走一個
{
return _freeLists[index].Pop();
}
else
{
return FetchFromCentralCache(index, alignSize);//沒了 從central中要
}
}
//多線程釋放
void ThreadCache::Deallocate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAX_BYTES);
// 找對映射的自由鏈表桶,對象插入進入
size_t index = SizeClass::Index(size);
_freeLists[index].Push(ptr);
//釋放完了檢查下freelist里的obj,看看需不需要回收到centralcache
// 設定:當鏈表長度大于一次批量申請的內存時就開始還一段list給central cache
if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
{
ListTooLong(_freeLists[index], size);
}
}
//一段list給central cache
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
//輸出型參數;
void* start = nullptr;
void* end = nullptr;
//先從_freeList中彈出
list.PopRange(start, end, list.MaxSize());
//再收回到central中對應index的Span鏈表中
CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}
CentralCache.h
central cache也是一個哈希桶結構,他的哈希桶的映射關系跟thread cache是一樣的。不同的是他的每 個哈希桶位置掛是SpanList鏈表結構,不過每個映射桶下面的span中的大內存塊被按映射關系切成了一 個個小內存塊對象掛在span的自由鏈表中。
申請內存:
1.當thread cache中沒有內存時,就會批量向central cache申請一些內存對象,這里的批量獲取對 象的數量使用了類似網絡tcp協議擁塞控制的慢開始算法;central cache也有一個哈希映射的 spanlist,spanlist中掛著span,從span中取出對象給thread cache,這個過程是需要加鎖的,不 過這里使用的是一個桶鎖,盡可能提高效率。
2.central cache映射的spanlist中所有span的都沒有內存以后,則需要向page cache申請一個新的 span對象,拿到span以后將span管理的內存按大小切好作為自由鏈表鏈接到一起。然后從span 中取對象給thread cache。
3.central cache的中掛的span中use_count記錄分配了多少個對象出去,分配一個對象給thread cache,就++use_count。
釋放內存:
1.當thread_cache過長或者線程銷毀,則會將內存釋放回central cache中的,釋放回來時-- use_count。當use_count減到0時則表示所有對象都回到了span,則將span釋放回page cache, page cache中會對前后相鄰的空閑頁進行合并。
#include "Common.h"
// 單例模式(餓漢模式)全局就一個CentralCache;
class CentralCache
{
public:
static CentralCache* GetInstance()
{
return &_sInst;
}
// 獲取一個非空的span
Span* GetOneSpan(SpanList& list, size_t byte_size);
// 從中心緩存獲取一定數量的對象給thread cache
size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);
// 將一定數量的obj釋放到span跨度
void ReleaseListToSpans(void* start, size_t byte_size);
private:
SpanList _spanLists[NFREELIST];
private:
CentralCache()
{}
CentralCache(const CentralCache&) = delete;
static CentralCache _sInst;
};
CentralCache.cpp
//#include "PageCache.h"
CentralCache CentralCache::_sInst;
// 獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)//參數list是某一個確定的index桶
{
// 查看當前的spanlist中是否有還有未分配對象的span
Span* it = list.Begin();
while (it != list.End())
{
if (it->_freeList != nullptr)
{
return it;//這里不需要改it的span中的屬性,因為等最后分給threadcache了obj以后 才算其中的內存被分出去了 里面的usecount等才需要改;
}
else
{
it = it->_next;
}
}
// 走到這里說沒有空閑span了,只能找page cache要
// 先把central cache的桶鎖解掉,這樣如果其他線程釋放內存對象回來,不會阻塞(你要從page申請了 不能別的線程在這個桶釋放)
list._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();//整個pagecache結構可能會從index1~index129挨個操作每個桶 因此需要上大鎖;
Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_isUse = true;//分跟central的span標記已使用因為馬上就要切分給obj用了
span->_objSize = size;//每個span掛的固定小內存塊obj大小size
PageCache::GetInstance()->_pageMtx.unlock();
// 對獲取span進行切分,不需要加鎖,因為這會其他線程訪問不到這個span
// 計算span的大塊內存的起始地址和大塊內存的大小(字節數)
char* start = (char*)(span->_pageId << PAGE_SHIFT);//這里的_pageId是從底層按頁申請內存的時候地址轉換來的,現在需要用地址就轉換回去;
size_t bytes = span->_n << PAGE_SHIFT;
char* end = start + bytes;
// 把大塊內存切成自由鏈表鏈接起來
// 1、先切一塊下來去做頭,方便尾插(尾插原因,切出來一般是連續的,那么尾插給到span上掛小obj也是連續,提高內存命中率)
span->_freeList = start;
start += size;
void* tail = span->_freeList;
int i = 1;
while (start < end)
{
++i;
NextObj(tail) = start;
tail = NextObj(tail); // tail = start;
start += size;
}
NextObj(tail) = nullptr;
// 切好span以后,需要把span掛到桶里面去的時候,再加鎖
list._mtx.lock();
list.PushFront(span);
return span;
}
// 從中心緩存獲取一定數量的對象給thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();//上桶鎖
Span* span = GetOneSpan(_spanLists[index],size);
assert(span);
assert(span->_freeList);
// 從span中獲取batchNum個對象
// 如果不夠batchNum個,有多少拿多少
start = span->_freeList;
end = start;
int n = 1;//n為實際拿到的數量,start直接給了所以起始值為1;
for (int i = 0; i < batchNum - 1; i++)
{
if (NextObj(end) == nullptr) break;
end = NextObj(end);
++n;
}
//span被切出去給obj使用了 span的一些屬性得改變了;
span->_useCount += n;
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
_spanLists[index]._mtx.unlock();
return n;
}
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
size_t index = SizeClass::Index(size);
_spanLists[index]._mtx.lock();
while (start)
{
void* next = NextObj(start);
Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
//小obj頭插入span中的_freeList
NextObj(start) = span->_freeList;
span->_freeList = start;
span->_useCount--;
// 說明span的切分出去的所有小塊內存都回來了
// 這個span就可以再回收給page cache,pagecache可以再嘗試去做前后頁的合并
if (span->_useCount == 0)
{
_spanLists[index].Erase(span);
span->_freeList = nullptr;
span->_next = nullptr;
span->_prev = nullptr;
// 釋放span給page cache時,使用page cache的鎖就可以了
// 這時把桶鎖解掉 不影響其他線程對該index的central操作;
_spanLists[index]._mtx.unlock();
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);//還給page和page是否需要合并其中的span減少內存碎片都在這函數里
PageCache::GetInstance()->_pageMtx.unlock();
_spanLists[index]._mtx.lock();
}
start = next;
}
_spanLists[index]._mtx.unlock();
}
PageMap.h
使用自己定義的PageMap哈希直接映射,避免stl的線程安全,提高效率;
#include"Common.h"
// Single-level array
template
class TCMalloc_PageMap1 {
private:
static const int LENGTH = 1 << BITS;
void** array_;
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap1() {
//array_ = reinterpret_cast((*allocator)(sizeof(void*) << BITS));
size_t size = sizeof(void*) << BITS;
size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
memset(array_, 0, sizeof(void*) << BITS);
}
// Return the current value for KEY. Returns NULL if not yet set,
// or if k is out of range.
void* get(Number k) const {
if ((k >> BITS) > 0) {
return NULL;
}
return array_[k];
}
// REQUIRES "k" is in range "[0,2^BITS-1]".
// REQUIRES "k" has been ensured before.
//
// Sets the value 'v' for key 'k'.
void set(Number k, void* v) {
array_[k] = v;
}
};
// Two-level radix tree
template
class TCMalloc_PageMap2 {
private:
// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
static const int ROOT_BITS = 5;
static const int ROOT_LENGTH = 1 << ROOT_BITS;
static const int LEAF_BITS = BITS - ROOT_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodes
void* (*allocator_)(size_t); // Memory allocator
public:
typedef uintptr_t Number;
//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
explicit TCMalloc_PageMap2() {
//allocator_ = allocator;
memset(root_, 0, sizeof(root_));
PreallocateMoreMemory();
}
void* get(Number k) const {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 || root_[i1] == NULL) {
return NULL;
}
return root_[i1]->values[i2];
}
void set(Number k, void* v) {
const Number i1 = k >> LEAF_BITS;
const Number i2 = k & (LEAF_LENGTH - 1);
ASSERT(i1 < ROOT_LENGTH);
root_[i1]->values[i2] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> LEAF_BITS;
// Check for overflow
if (i1 >= ROOT_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_[i1] == NULL) {
//Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
//if (leaf == NULL) return false;
static ObjectPool leafPool;
Leaf* leaf = (Leaf*)leafPool.New();
memset(leaf, 0, sizeof(*leaf));
root_[i1] = leaf;
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
// Allocate enough to keep track of all possible pages
Ensure(0, 1 << BITS);
}
};
// Three-level radix tree
template
class TCMalloc_PageMap3 {
private:
// How many bits should we consume at each interior level
static const int INTERIOR_BITS = (BITS + 2) / 3; // Round-up
static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;
// How many bits should we consume at leaf level
static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;
static const int LEAF_LENGTH = 1 << LEAF_BITS;
// Interior node
struct Node {
Node* ptrs[INTERIOR_LENGTH];
};
// Leaf node
struct Leaf {
void* values[LEAF_LENGTH];
};
Node* root_; // Root of radix tree
void* (*allocator_)(size_t); // Memory allocator
Node* NewNode() {
Node* result = reinterpret_cast((*allocator_)(sizeof(Node)));
if (result != NULL) {
memset(result, 0, sizeof(*result));
}
return result;
}
public:
typedef uintptr_t Number;
explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {
allocator_ = allocator;
root_ = NewNode();
}
void* get(Number k) const {
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
if ((k >> BITS) > 0 ||
root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {
return NULL;
}
return reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3];
}
void set(Number k, void* v) {
ASSERT(k >> BITS == 0);
const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
const Number i3 = k & (LEAF_LENGTH - 1);
reinterpret_cast(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;
}
bool Ensure(Number start, size_t n) {
for (Number key = start; key <= start + n - 1;) {
const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);
const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);
// Check for overflow
if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)
return false;
// Make 2nd level node if necessary
if (root_->ptrs[i1] == NULL) {
Node* n = NewNode();
if (n == NULL) return false;
root_->ptrs[i1] = n;
}
// Make leaf node if necessary
if (root_->ptrs[i1]->ptrs[i2] == NULL) {
Leaf* leaf = reinterpret_cast((*allocator_)(sizeof(Leaf)));
if (leaf == NULL) return false;
memset(leaf, 0, sizeof(*leaf));
root_->ptrs[i1]->ptrs[i2] = reinterpret_cast(leaf);
}
// Advance key past whatever is covered by this leaf node
key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
}
return true;
}
void PreallocateMoreMemory() {
}
};*>*>*>*>*>*>**>
PageCache.h
申請內存:
- 當central cache向page cache申請內存時,page cache先檢查對應位置有沒有span,如果沒有 則向更大頁尋找一個span,如果找到則分裂成兩個。比如:申請的是4頁page,4頁page后面沒 有掛span,則向后面尋找更大的span,假設在10頁page位置找到一個span,則將10頁page span分裂為一個4頁page span和一個6頁page span。
- 如果找到_spanList[128]都沒有合適的span,則向系統使用mmap、brk或者是VirtualAlloc等方式 申請128頁page span掛在自由鏈表中,再重復1中的過程。 3
- 需要注意的是central cache和page cache 的核心結構都是spanlist的哈希桶,但是他們是有本質 區別的,central cache中哈希桶,是按跟thread cache一樣的大小對齊關系映射的,他的spanlist 中掛的span中的內存都被按映射關系切好鏈接成小塊內存的自由鏈表。而page cache 中的 spanlist則是按下標桶號映射的,也就是說第i號桶中掛的span都是i頁內存。
釋放內存:
如果central cache釋放回一個span,則依次尋找span的前后page id的沒有在使用的空閑span ,看是否可以合并,如果合并繼續向前尋找。這樣就可以將切小的內存合并收縮成大的span,減少內存碎片。
#include "Common.h"
#include "ObjectPool.h"
#include "PageMap.h"
class PageCache
{
public:
static PageCache* GetInstance()
{
return &_sInst;
}
// 獲取從對象到span的映射
Span* MapObjectToSpan(void* obj);
// 釋放空閑span回到Pagecache,并合并相鄰的span
void ReleaseSpanToPageCache(Span* span);
// 獲取一個K頁的span
Span* NewSpan(size_t k);
std::mutex _pageMtx;
private:
SpanList _spanLists[NPAGES];
ObjectPool _spanPool;//這里用上了之前編寫的ObjectPool定長內存池 用來New(span)脫離malloc
//std::unordered_map _idSpanMap;
//std::map _idSpanMap;
TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
PageCache()
{}
PageCache(const PageCache&) = delete;
static PageCache _sInst;
};,>,>
PageCache.cpp
PageCache PageCache::_sInst;
// 獲取一個K頁的span
Span* PageCache::NewSpan(size_t k)
{
assert(k > 0);
// 大于128 page的直接向堆申請
if (k > NPAGES - 1)
{
void* ptr = SystemAlloc(k);
//Span* span = new Span;
Span* span = _spanPool.New();
span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
span->_n = k;
//_idSpanMap[span->_pageId] = span;
_idSpanMap.set(span->_pageId, span);
return span;
}
// 先檢查第k個桶里面有沒有span
if (!_spanLists[k].Empty())
{
Span* kSpan = _spanLists[k].PopFront();
// 建立id和span的映射,方便central cache回收小塊內存時,查找對應的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
// 檢查一下后面的桶里面有沒有span,如果有可以把他它進行切分
for (size_t i = k + 1; i < NPAGES; ++i)
{
if (!_spanLists[i].Empty())
{
Span* nSpan = _spanLists[i].PopFront();//nSpan代表切割以后剩下的span,他還是未使用的,還在pagecache中是連續的,所以映射首尾即可
//Span* kSpan = new Span;
Span* kSpan = _spanPool.New();//kSpan代表從某個大內存n+k Span中切出來的kSpan,他要給到central之后進行obj切割,進而往thread給;
// 在nSpan的頭部切一個k頁下來
// k頁span返回
// nSpan再掛到對應映射的位置
kSpan->_pageId = nSpan->_pageId;
kSpan->_n = k;
nSpan->_pageId += k;
nSpan->_n -= k;
_spanLists[nSpan->_n].PushFront(nSpan);
// 存儲nSpan的首尾頁號跟nSpan映射,方便page cache回收內存時進行的合并查找
//_idSpanMap[nSpan->_pageId] = nSpan;
//_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;
_idSpanMap.set(nSpan->_pageId, nSpan);
_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);
// 建立id和kSpan的映射,方便central cache回收小塊內存時,查找對應的span
for (PAGE_ID i = 0; i < kSpan->_n; ++i)
{
//_idSpanMap[kSpan->_pageId + i] = kSpan;
_idSpanMap.set(kSpan->_pageId + i, kSpan);
}
return kSpan;
}
}
// 走到這個位置就說明后面沒有大頁的span了
// 這時就去找堆要一個128頁的span
//Span* bigSpan = new Span;
Span* bigSpan = _spanPool.New();
void* ptr = SystemAlloc(NPAGES - 1);
bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
bigSpan->_n = NPAGES - 1;
_spanLists[bigSpan->_n].PushFront(bigSpan);
return NewSpan(k);
}
//將一個obj小內存塊它對應的Span*找到(通過轉換成page_id再map出Span*)
Span* PageCache::MapObjectToSpan(void* obj)
{
PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);
/*std::unique_lock lock(_pageMtx);
auto ret = _idSpanMap.find(id);
if (ret != _idSpanMap.end())
{
return ret->second;
}
else
{
assert(false);
return nullptr;
}*/
auto ret = (Span*)_idSpanMap.get(id);
assert(ret != nullptr);
return ret;
}
//從central向page歸還span大塊內存(這個Span的 usecount==0了即分出去的小obj都還回來了),歸還Span并嘗試合并!
void PageCache::ReleaseSpanToPageCache(Span* span)
{
// 大于128 page的直接還給堆
if (span->_n > NPAGES - 1)
{
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
SystemFree(ptr);
//delete span;
_spanPool.Delete(span);
return;
}
// 對span前后的頁,嘗試進行合并,緩解內存碎片問題
while (1)
{
PAGE_ID prevId = span->_pageId - 1;
// 前面的頁號沒有,不合并了
auto ret = (Span*)_idSpanMap.get(prevId);
if (ret == nullptr)
{
break;
}
// 前面相鄰頁的span在使用,不合并了
Span* prevSpan = ret;
if (prevSpan->_isUse == true)
{
break;
}
// 合并出超過128頁的span沒辦法管理,不合并了
if (prevSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_pageId = prevSpan->_pageId;
span->_n += prevSpan->_n;
_spanLists[prevSpan->_n].Erase(prevSpan);
//delete prevSpan;
_spanPool.Delete(prevSpan);
}
// 向后合并
while (1)
{
PAGE_ID nextId = span->_pageId + span->_n;
/*auto ret = _idSpanMap.find(nextId);
if (ret == _idSpanMap.end())
{
break;
}*/
auto ret = (Span*)_idSpanMap.get(nextId);
if (ret == nullptr)
{
break;
}
Span* nextSpan = ret;
if (nextSpan->_isUse == true)
{
break;
}
if (nextSpan->_n + span->_n > NPAGES - 1)
{
break;
}
span->_n += nextSpan->_n;
_spanLists[nextSpan->_n].Erase(nextSpan);
//delete nextSpan;
_spanPool.Delete(nextSpan);
}
//合并的大內存塊 插回Pagecache中 設置為未使用(未分到central中)狀態;
_spanLists[span->_n].PushFront(span);
span->_isUse = false;
//_idSpanMap[span->_pageId] = span;
//_idSpanMap[span->_pageId+span->_n-1] = span;
//記錄前后頁號方便下次合并;
_idSpanMap.set(span->_pageId, span);
_idSpanMap.set(span->_pageId + span->_n - 1, span);
}
ConcurrentAlloc.h
#include "Common.h"
#include "ThreadCache.h"
#include "PageCache.h"
#include "ObjectPool.h"
//tcmallo
static void* ConcurrentAlloc(size_t size)
{
//一次申請內存大于thread最大的256kb,則直接向page層按頁直接申請,不需要經過Thread層;
if (size > MAX_BYTES)
{
size_t alignSize = SizeClass::RoundUp(size);
size_t kpage = alignSize >> PAGE_SHIFT;
PageCache::GetInstance()->_pageMtx.lock();
Span* span = PageCache::GetInstance()->NewSpan(kpage);
span->_objSize = size;//這個span就是為了_objsize單位內存而申請的
PageCache::GetInstance()->_pageMtx.unlock();
void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
return ptr;
}
else
{
// 通過TLS 每個線程無鎖的獲取自己的專屬的ThreadCache對象
if (pTLSThreadCache == nullptr)
{
static ObjectPool tcPool;//只需要一個tcpool,用于調用New,所以設置成靜態;
//pTLSThreadCache = new ThreadCache;
pTLSThreadCache = tcPool.New();
}
return pTLSThreadCache->Allocate(size);
}
}
//tcfree
static void ConcurrentFree(void* ptr)
{
Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
size_t size = span->_objSize;
if (size > MAX_BYTES)//證明這個Span直接是按頁從page申請的_objsize>MAX_SIZE,沒經過thread,那么直接ReleaseSpanToPageCache
{
PageCache::GetInstance()->_pageMtx.lock();
PageCache::GetInstance()->ReleaseSpanToPageCache(span);
PageCache::GetInstance()->_pageMtx.unlock();
}
else
{
assert(pTLSThreadCache);
pTLSThreadCache->Deallocate(ptr, size);
}
}
5.測試文件:
Test.cpp
// ntimes 一輪申請和釋放內存的次數
// rounds 輪次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector vthread(nworks);
std::atomic malloc_costtime = 0;
std::atomic free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&, k]() {
std::vector v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(malloc(16));
v.push_back(malloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
free(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u個線程并發執行%u輪次,每輪次malloc %u次: 花費:%u msn",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u個線程并發執行%u輪次,每輪次free %u次: 花費:%u msn",
nworks, rounds, ntimes, free_costtime.load());
printf("%u個線程并發malloc&free %u次,總計花費:%u msn",
nworks, nworks*rounds*ntimes, malloc_costtime.load() + free_costtime.load());
}
// 單輪次申請釋放次數 線程數 輪次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
std::vector vthread(nworks);
std::atomic malloc_costtime = 0;
std::atomic free_costtime = 0;
for (size_t k = 0; k < nworks; ++k)
{
vthread[k] = std::thread([&]() {
std::vector v;
v.reserve(ntimes);
for (size_t j = 0; j < rounds; ++j)
{
size_t begin1 = clock();
for (size_t i = 0; i < ntimes; i++)
{
//v.push_back(ConcurrentAlloc(16));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
}
size_t end1 = clock();
size_t begin2 = clock();
for (size_t i = 0; i < ntimes; i++)
{
ConcurrentFree(v[i]);
}
size_t end2 = clock();
v.clear();
malloc_costtime += (end1 - begin1);
free_costtime += (end2 - begin2);
}
});
}
for (auto& t : vthread)
{
t.join();
}
printf("%u個線程并發執行%u輪次,每輪次concurrent alloc %u次: 花費:%u msn",
nworks, rounds, ntimes, malloc_costtime.load());
printf("%u個線程并發執行%u輪次,每輪次concurrent dealloc %u次: 花費:%u msn",
nworks, rounds, ntimes, free_costtime.load());
printf("%u個線程并發concurrent alloc&dealloc %u次,總計花費:%u msn",
nworks, nworks*rounds*ntimes.load(), malloc_costtime + free_costtime.load());
}
int main()
{
size_t n = 10000;
cout << "==========================================================" << endl;
BenchmarkConcurrentMalloc(n, 4, 10);
cout << endl << endl;
BenchmarkMalloc(n, 4, 10);
cout << "==========================================================" << endl;
return 0;
}*>*>
malloc與tcmallo測試對比結果
總結
在整體框架編寫完畢后,進行測試對比并不能體現tcmalloc比malloc效率高,如下圖;但是在tcmalloc進行 map(Span*,PAGE_ID),這個映射采用基數樹的優化后效率大大提高,超過了malloc。
不高效存在性能瓶頸的原因
根據測試,map的find越慢,會導致其lock()越慢,因此map中的find占了整體的15%,加鎖解鎖的過程占據了程序運行的20%times,這就是未優化的性能瓶頸;優化前Span*和page_id的映射使用stl標準庫中的unorder_map,是非線程安全的,需要加鎖保證線程安全; 優化了后使用PAGE_MAP 設計原理是直接開滿page_id所有范圍的空間,采取直接映射;這樣即便是多線程也不需要加鎖;
不加鎖的原因
1.兩個線程對map中不同的位置讀寫:因為兩個線程在對基數樹map一個讀,一個寫的時候,相應空間早都開好了,不會改變結構,因此讀寫互不影響,一個在自己的地方讀,一個在自己的地方寫,而STL容器中map紅黑樹寫入會旋轉,unorder_map哈希寫入也涉及增容,結構有可能會改變,因此兩個位置的讀寫有可能互相影響導致線程不安全;
2.兩個線程對map中相同的位置寫:因為寫只會在NewSpan和ReleaseSpanToPageCache中,一個是central沒有span了從pagecache申請,一個是central 把沒使用的usecount==0的span大塊內存還給paghecache,因為page只有有span和無span兩種狀態這兩個函數不可能在兩個線程中同時執行
3.兩個線程對map中相同的位置讀:因為讀只會在ConcurrentFree和ReleaseListToSpans中,對于同一個位置index,肯定是先ConcurrentFree(void*obj),之后objfree到一定的量(_freeLists[index].Size() >= _freeLists[index].MaxSize(慢開始的maxsize))時,再調用ReleaseListToSpans將freelist[index]從threadcache還給centralcache的,也不會兩個線程同時發生;
- stl的map中的find會依次遍歷O(n),而直接映射的時間效率是O(1),相當于以部分空間換取時間的一種舉措;
優化了stl中map.find()慢的問題同時不需要加鎖了,兩個主要問題在犧牲點空間的情況下完美解決,因此性能瓶頸得到優化,tcmalloc高效與malloc。
-
內存
+關注
關注
8文章
3002瀏覽量
73886 -
操作系統
+關注
關注
37文章
6738瀏覽量
123190 -
程序
+關注
關注
116文章
3777瀏覽量
80853 -
線程
+關注
關注
0文章
504瀏覽量
19651
發布評論請先 登錄
相關推薦
評論