一、前言
共享內存廣泛用于Redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個共享內存在廣告埋點數據采集的實戰場景。
二、共享內存原理
1、原理
在Linux中,每個進程都有屬于自己的進程控制塊(PCB)和地址空間(Addr Space),并且都有一個與之對應的頁表,負責將進程的虛擬地址與物理地址進行映射,通過內存管理單元(MMU)進行管理。兩個不同的虛擬地址通過頁表映射到物理空間的同一區域,它們所指向的這塊區域即共享內存。
當兩個進程通過頁表將虛擬地址映射到物理地址時,在物理地址中有一塊共同的內存區,即共享內存,這塊內存可以被兩個進程同時看到。這樣當一個進程進行寫操作,另一個進程讀操作就可以實現進程間通信。但是,我們要確保一個進程在寫的時候不能被讀,因此我們使用信號量來實現同步與互斥。
對于一個共享內存,實現采用的是引用計數的原理,當進程脫離共享存儲區后,計數器減一,掛架成功時,計數器加一,只有當計數器變為零時,才能被刪除。當進程終止時,它所附加的共享存儲區都會自動脫離。
2、與傳統文件對比
共享內存可以說是最有用的進程間通信方式,也是最快的IPC形式, 因為進程可以直接讀寫內存,而不需要任何 數據的拷貝。對于像管道和消息隊列等通信方式,則需要在內核和用戶空間進行四次的數據拷貝 共享內存則只拷貝兩次數據: 一次從輸入文件到共享內存區,另一次從共享內存區到輸出文件。
實際上,進程之間在共享內 存時,并不總是讀寫少量數據后就解除映射,有新的通信時,再重新建立共享內存區域。而是保持共享區域,直 到通信完畢為止,這樣,數據內容一直保存在共享內存中,并沒有寫回文件。共享內存中的內容往往是在解除映 射時才寫回文件的。因此,采用共享內存的通信方式效率是非常高的。
傳統文件
UNIX 訪問文件的傳統方法是用 open 打開它們,如果有多個進程訪問同一個文件,則每一個進程在自己的地址空間都包含有該文件的副本,這不必要地浪費了存儲空間。
下圖說明了兩個進程同時讀一個文件的同一頁的情形。系統要將該頁從磁盤讀到高速緩沖區中,每個進程再執行一個存儲器內的復制操作將數據從高速緩沖區讀到自己的地址空間。
共享存儲映射
現在考慮另一種處理方法:進程 A 和進程 B 都將該頁映射到自己的地址空間,當進程 A 第一次訪問該頁中的數據時, 它生成一個缺頁中斷。內核此時讀入這一頁到內存并更新頁表使之指向它。以后,當進程B訪問同一頁面而出現缺頁中斷時,該頁已經在內存,內核只需要將進程 B 的頁表登記項指向次頁即可。
3、mmap()
(1)mmap()系統調用
mmap()系統調用使得進程之間通過映射同一個普通文件實現共享內存。普通文件被映射到進程地址空間后,進程可以向訪問普通內存一樣對文件進行訪問,不必再調用read(),write()等操作。
mmap()系統調用形式如下:
1void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )
mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區域至調用進程的[addr, addr + len]的內存區域:
數fd為即將映射到進程空間的文件描述字,一般由open()返回,同時,fd可以指定為-1,此時須指定flags參數中的,MAP_ANON,表明進行的是匿名映射(不涉及具體的文件名,避免了文件的創建及打開,很顯然只能用于具有親緣關系的進程間通信)。
len是映射到調用進程地址空間的字節數,它從被映射文件開頭offset個字節開始算起。
prot 參數指定共享內存的訪問權限。可取如下幾個值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執行), PROT_NONE(不可訪問)。
flags由以下幾個常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。
offset參數一般設為0,表示從文件頭開始映射。
參數addr指定文件應被映射到進程空間的起始地址,一般被指定一個空指針,此時選擇起始地址的任務留給內核來完成。函數的返回值為最后文件映射到進程空間的地址,進程可直接操作起始地址為該值的有效地址。
(2)mmap()返回地址的訪問
對mmap()返回地址的訪問,linux采用的是頁式管理機制。
對于用mmap()映射普通文件來說,進程會在自己的地址空間新增一塊空間,空間大小由mmap()的len參數指定,注意,進程并不一定能夠對全部新增空間都能進行有效訪問。
進程能夠訪問的有效地址大小取決于文件被映射部分的大小。
簡單的說,能夠容納文件被映射部分大小的最少頁面個數決定了進程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。
超過這個空間大小,內核會根據超過的嚴重程度返回發送不同的信號給進程。可用如下圖示說明:
三、VCS 共享內存采集實戰
VCS(vivo control system): 負責全網所有類型的監控指標采集,為上游運維平臺提供底層命令通道能力和全網插件升級管控能力。
1、數據結構
2、分區讀寫
為了要確保一個進程在寫的時候不能被讀,我們使用idx來標記可讀塊。
3、規則,指標和值
下圖描述的是從連續內存空間轉化成【規則,維度,值】語義的過程:
4、源碼分析
5、general.proto
通用監控上報協議:
general.proto
syntax = “proto2”;
package general;
message Data {
map kv = 1;
}
message GeneralData {
optional string rule_id = 1;
repeated Data data = 2;
optional int64 count = 3;
optional int64 left_size = 4;
optional int32 version = 5;
}
6、constant.go 配置參數
| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |
package moni_shm
const (
OssShmId uint32 = 0x3eeff00
MagicNum1 uint32 = 0x650a218
MagicNum2 uint32 = 0x138a4f2
CreateShmLock = “/var/run/.oss_shm_lock”
OssMapOneAttrCnt = 1024 * 128 //1024 個規則
OssOneAttrEntryCnt = 128 //每個規則有128個指標
EntrySz = 4
OssMapCnt = 2
OneAttrSz = OssOneAttrEntryCnt * EntrySz
OssMapSz = OssMapOneAttrCnt * OneAttrSz
OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
defaultIntervalSec = 60
defaultTopic = “moni_general_shared_memory”
)
7、util.go 工具類
內存清零工具和“整頁”分配:
cd package moni_shm
import (
“unsafe”
)
//取整分配
func align(actual, to uint64) uint64 {
return (actual + to - 1) / to * to
}
//連續空間清0
func zero(ptr uintptr, bts uint64) {
if 0 == bts {
return
}
const sz = 4096
var next uint64
cnt := 0
for ; next+sz 《= bts; { //按頁清零
arr := (*[sz]byte)(unsafe.Pointer(ptr))
for i := range *arr {
(*arr)[i] = 0
}
next += sz
ptr += uintptr(sz)
cnt++
}
if next == bts {
return
}
var i uintptr
for i = 0; i 《 uintptr(bts-next); i++ { //剩余空間清零
*(*byte)(unsafe.Pointer(ptr + i)) = 0
}
}
8、mgr.go 采集邏輯
共享內存采集邏輯對應 “規則指標和值”:
var (
_basePtr uintptr = 0
_shmUtil = NewShmUtil(OssShmId, OssAttrSz)
_intervalSec = defaultIntervalSec
_topic = defaultTopic
_on bool = false
)
func Stat(on bool) {
_on = on
}
func Start() {
go collect() //開始采集
}
func tryInitBaseptr() error {
var err error
if _basePtr == 0 {
_basePtr, err = _shmUtil.GetData() //獲取當前共享內存數據塊首地址
if nil != err {
logrus.Warnf(“init base ptr failed, retrying: %v”, err)
}
}
return err
}
func collect() {
var (
cost time.Duration
start time.Time
first = true
)
for {
if !first {
time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對齊
}
first = false
start = time.Now()
if !_on {
cost = time.Since(start)
continue
}
if _basePtr == 0 {
if err := tryInitBaseptr(); nil != err {
cost = time.Since(start)
continue
}
}
d := collectOnce()
for _, v := range d {
moni_report.ProductReportData(*v)
}
cost = time.Since(start)
}
}
func collectOnce() []*moni_report.ReportData {
now := time.Now()
var ret []*moni_report.ReportData
data := make(map[uint32]*general.GeneralData)
d := SwitchAndFetch(_basePtr)
logrus.Infof(“sending %d data from shm”, len(d))
for _, v := range d {
ruleId := strconv.FormatUint(uint64(v[0]), 10)
dim := strconv.FormatUint(uint64(v[1]), 10)
value := strconv.FormatUint(uint64(v[2]), 10)
if _, ok := data[v[0]]; !ok {
data[v[0]] = &general.GeneralData{
RuleId: proto.String(ruleId),
Data: []*general.Data{},
}
}
data[v[0]].Data = append(data[v[0]].Data, &general.Data{
Kv: map[string]string{
dim: value,
“timestamp”: strconv.FormatInt(now.Unix()*1000, 10),
“ip”: viper.GetString(“host.inner_ip”),
},
})
}
logrus.Infof(“collect format shm data:%v”, data)
for _, v := range data {
bts, err := proto.Marshal(v)
if nil != err {
logrus.Errorf(“marshal shm data failed: %v”, err)
continue
}
ret = append(ret, &moni_report.ReportData{
DataBytes: bts,
Topic: _topic,
})
}
return ret
}
9、shmutil.go 共享內存操作
每60秒根據idx值切換可讀區,采集后上報后,清零,切換到下一區。
package moni_shm
import (
“fmt”
“log”
“os”
“syscall”
“unsafe”
“github.com/sirupsen/logrus”
)
const (
IpcCreate = 00001000
)
var (
ErrNotCreated = fmt.Errorf(“shm not created”)
ErrCreateFailed = fmt.Errorf(“shm create failed”)
)
type shmOpt func(*ShmUtil)
func WithCreate(b bool) shmOpt {
return func(u *ShmUtil) {
u.create = b
}
}
/*共享內存數據結構
|1page mprotect|page align data|1page mprotect|
| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |
*/
type ShmUtil struct {
pageSz int
dataSz uint64
total uint64
shmKey uint32
create bool
base uintptr
data uintptr
}
func NewShmUtil(key uint32, sz uint64, cfgs 。。.shmOpt) *ShmUtil {
if key == 0 {
panic(“invalid shm key: 0”)
}
ret := &ShmUtil{
dataSz: sz,
shmKey: key,
}
ret.pageSz = os.Getpagesize() //獲取頁大小
ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁分配“包體”大小
ret.total = ret.dataSz + uint64(ret.pageSz)*2 // 總空間大小=包體大小 + 頭尾各2頁保護地址
for _, c := range cfgs {
c(ret)
}
return ret
}
func (s *ShmUtil) attachShm(flag int) error {
created := false
shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag)) //使用已存在的共享內存,返回共享內存標識符
if 0 != errno {
return errno
}
if shmid 《 0 {
if !s.create { //不允創建,直接返回
return ErrNotCreated
}
shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate)) //新創建共享內存
if 0 != errno {
return fmt.Errorf(“shm create: %v”, errno)
}
if shmid 《 0 {
return ErrCreateFailed
}
created = true
}
addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0) //掛接共享內存到當前進程
if 0 != errno {
return fmt.Errorf(“shmat: %v”, errno)
}
if created {
zero(addr, s.total)//新創建的共享內存,初始化共享內存數據
}
s.base = addr //記錄共享內存首地址 用于之后的釋放
s.data = s.base + uintptr(s.pageSz) //寫數據的起始地址
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
if 0 != errno { //鎖定共享內存頭,鎖指定的內存區間必須包含整個內存頁(4K)
s.detach()
return fmt.Errorf(“mprotect head: %v”, errno)
}
_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內存尾,區間開始的地址start必須是一個內存頁的起始地址,并且區間長度len必須是頁大小的整數倍。
if 0 != errno {
s.detach()
return fmt.Errorf(“mprotect tail: %v”, errno)
}
return nil
}
func (s *ShmUtil) detach() { //進程去關聯共享內存
if 0 != s.base {
syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)
s.base = 0
s.data = 0
}
}
/*
獲取內存并且返回數據段起始位置
s.create 決定是否新申請共享內存
*/
func (s *ShmUtil) GetData() (uintptr, error) {
if s.data != 0 {
return s.data, nil
}
if err := s.attachShm(0666); nil != err { //初始化共享內存,并關聯到進程
return 0, err
}
return s.data, nil
}
func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內存讀取 [][3]uint32{ossid,key,value}
if ptr == 0 {
return nil
}
m1 := (*uint32)(unsafe.Pointer(ptr))
m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))
if MagicNum1 != *m1 || MagicNum2 != *m2 {
logrus.Errorf(“magic 1 in header: wrote:%v\tread:%v\n”, MagicNum1, *m1)
logrus.Errorf(“magic 2 in tail: wrote:%v\tread:%v\n”, MagicNum2, *m2)
return nil
}
idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標志
old := *idx
*idx = 1 - *idx
ret := PartialRead(ptr, old) //讀取當前idx塊數據
zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0
return ret
}
//根據idx輪流讀數據區域
func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據idx獲取塊起始地址
startPtr := ptr + 8 + uintptr(idx)*OssMapSz
ret := ReadOssMap(startPtr)
log.Printf(“result: %v\n”, ret)
return ret
}
func ReadOssMap(ptr uintptr) [][3]uint32 { //1個周期內的指標總容量為 128*1024 = 128k = 13W
var ret [][3]uint32
var i uint32 = 0
for i = 0; i 《 OssMapOneAttrCnt; i++ { //1個周期最多支持1024個業務
for _, v := range ReadOneAttr(ptr) {
ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]
}
ptr += OneAttrSz // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4
}
return ret
}
func ReadOneAttr(ptr uintptr) [][2]uint32 {
var ret [][2]uint32
var i uint32 = 0
for i = 0; i 《 OssOneAttrEntryCnt; i++ { //目前默認一個業務下最多有128單維度指標, OssOneAttrEntryCnt = 128
v := *(*uint32)(unsafe.Pointer(ptr))
if v != 0 {
ret = append(ret, [2]uint32{i, v}) // [keyID, value]
}
ptr += EntrySz // 4yte 讀取一個指標
}
return ret
}
四、總結
本文通過共享內存的原理和詳細分析了一個共享內存在生產上的應用場景,希望能為大家拋磚引玉。
編輯:hfy
-
Linux
+關注
關注
87文章
11232瀏覽量
208950 -
內存管理
+關注
關注
0文章
168瀏覽量
14128 -
共享內存
+關注
關注
0文章
16瀏覽量
8309
發布評論請先 登錄
相關推薦
評論