精品国产人成在线_亚洲高清无码在线观看_国产在线视频国产永久2021_国产AV综合第一页一个的一区免费影院黑人_最近中文字幕MV高清在线视频

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

共享內存的原理和廣告埋點數據采集實戰分析

454398 ? 來源:Chinaunix ? 作者:cluo ? 2020-09-30 14:28 ? 次閱讀

一、前言

共享內存廣泛用于Redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個共享內存在廣告埋點數據采集的實戰場景。

二、共享內存原理

1、原理

Linux中,每個進程都有屬于自己的進程控制塊(PCB)和地址空間(Addr Space),并且都有一個與之對應的頁表,負責將進程的虛擬地址與物理地址進行映射,通過內存管理單元(MMU)進行管理。兩個不同的虛擬地址通過頁表映射到物理空間的同一區域,它們所指向的這塊區域即共享內存。

當兩個進程通過頁表將虛擬地址映射到物理地址時,在物理地址中有一塊共同的內存區,即共享內存,這塊內存可以被兩個進程同時看到。這樣當一個進程進行寫操作,另一個進程讀操作就可以實現進程間通信。但是,我們要確保一個進程在寫的時候不能被讀,因此我們使用信號量來實現同步與互斥。

對于一個共享內存,實現采用的是引用計數的原理,當進程脫離共享存儲區后,計數器減一,掛架成功時,計數器加一,只有當計數器變為零時,才能被刪除。當進程終止時,它所附加的共享存儲區都會自動脫離。

2、與傳統文件對比

共享內存可以說是最有用的進程間通信方式,也是最快的IPC形式, 因為進程可以直接讀寫內存,而不需要任何 數據的拷貝。對于像管道和消息隊列等通信方式,則需要在內核和用戶空間進行四次的數據拷貝 共享內存則只拷貝兩次數據: 一次從輸入文件到共享內存區,另一次從共享內存區到輸出文件。

實際上,進程之間在共享內 存時,并不總是讀寫少量數據后就解除映射,有新的通信時,再重新建立共享內存區域。而是保持共享區域,直 到通信完畢為止,這樣,數據內容一直保存在共享內存中,并沒有寫回文件。共享內存中的內容往往是在解除映 射時才寫回文件的。因此,采用共享內存的通信方式效率是非常高的。

傳統文件

UNIX 訪問文件的傳統方法是用 open 打開它們,如果有多個進程訪問同一個文件,則每一個進程在自己的地址空間都包含有該文件的副本,這不必要地浪費了存儲空間。

下圖說明了兩個進程同時讀一個文件的同一頁的情形。系統要將該頁從磁盤讀到高速緩沖區中,每個進程再執行一個存儲器內的復制操作將數據從高速緩沖區讀到自己的地址空間。

共享存儲映射

現在考慮另一種處理方法:進程 A 和進程 B 都將該頁映射到自己的地址空間,當進程 A 第一次訪問該頁中的數據時, 它生成一個缺頁中斷。內核此時讀入這一頁到內存并更新頁表使之指向它。以后,當進程B訪問同一頁面而出現缺頁中斷時,該頁已經在內存,內核只需要將進程 B 的頁表登記項指向次頁即可。

3、mmap()

(1)mmap()系統調用

mmap()系統調用使得進程之間通過映射同一個普通文件實現共享內存。普通文件被映射到進程地址空間后,進程可以向訪問普通內存一樣對文件進行訪問,不必再調用read(),write()等操作。

mmap()系統調用形式如下:

1void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )

mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區域至調用進程的[addr, addr + len]的內存區域:

數fd為即將映射到進程空間的文件描述字,一般由open()返回,同時,fd可以指定為-1,此時須指定flags參數中的,MAP_ANON,表明進行的是匿名映射(不涉及具體的文件名,避免了文件的創建及打開,很顯然只能用于具有親緣關系的進程間通信)。

len是映射到調用進程地址空間的字節數,它從被映射文件開頭offset個字節開始算起。

prot 參數指定共享內存的訪問權限。可取如下幾個值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執行), PROT_NONE(不可訪問)。

flags由以下幾個常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。

offset參數一般設為0,表示從文件頭開始映射。

參數addr指定文件應被映射到進程空間的起始地址,一般被指定一個空指針,此時選擇起始地址的任務留給內核來完成。函數的返回值為最后文件映射到進程空間的地址,進程可直接操作起始地址為該值的有效地址。

(2)mmap()返回地址的訪問

對mmap()返回地址的訪問,linux采用的是頁式管理機制。

對于用mmap()映射普通文件來說,進程會在自己的地址空間新增一塊空間,空間大小由mmap()的len參數指定,注意,進程并不一定能夠對全部新增空間都能進行有效訪問。

進程能夠訪問的有效地址大小取決于文件被映射部分的大小。

簡單的說,能夠容納文件被映射部分大小的最少頁面個數決定了進程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。

超過這個空間大小,內核會根據超過的嚴重程度返回發送不同的信號給進程。可用如下圖示說明:

三、VCS 共享內存采集實戰

VCS(vivo control system): 負責全網所有類型的監控指標采集,為上游運維平臺提供底層命令通道能力和全網插件升級管控能力。

1、數據結構

2、分區讀寫

為了要確保一個進程在寫的時候不能被讀,我們使用idx來標記可讀塊。

3、規則,指標和值

下圖描述的是從連續內存空間轉化成【規則,維度,值】語義的過程:

4、源碼分析

5、general.proto

通用監控上報協議:

general.proto

syntax = “proto2”;

package general;

message Data {

map kv = 1;

}

message GeneralData {

optional string rule_id = 1;

repeated Data data = 2;

optional int64 count = 3;

optional int64 left_size = 4;

optional int32 version = 5;

}

6、constant.go 配置參數

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |

package moni_shm

const (

OssShmId uint32 = 0x3eeff00

MagicNum1 uint32 = 0x650a218

MagicNum2 uint32 = 0x138a4f2

CreateShmLock = “/var/run/.oss_shm_lock”

OssMapOneAttrCnt = 1024 * 128 //1024 個規則

OssOneAttrEntryCnt = 128 //每個規則有128個指標

EntrySz = 4

OssMapCnt = 2

OneAttrSz = OssOneAttrEntryCnt * EntrySz

OssMapSz = OssMapOneAttrCnt * OneAttrSz

OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4

defaultIntervalSec = 60

defaultTopic = “moni_general_shared_memory”

7、util.go 工具類

內存清零工具和“整頁”分配:

cd package moni_shm

import (

“unsafe”

//取整分配

func align(actual, to uint64) uint64 {

return (actual + to - 1) / to * to

}

//連續空間清0

func zero(ptr uintptr, bts uint64) {

if 0 == bts {

return

}

const sz = 4096

var next uint64

cnt := 0

for ; next+sz 《= bts; { //按頁清零

arr := (*[sz]byte)(unsafe.Pointer(ptr))

for i := range *arr {

(*arr)[i] = 0

}

next += sz

ptr += uintptr(sz)

cnt++

}

if next == bts {

return

}

var i uintptr

for i = 0; i 《 uintptr(bts-next); i++ { //剩余空間清零

*(*byte)(unsafe.Pointer(ptr + i)) = 0

}

}

8、mgr.go 采集邏輯

共享內存采集邏輯對應 “規則指標和值”:

var (

_basePtr uintptr = 0

_shmUtil = NewShmUtil(OssShmId, OssAttrSz)

_intervalSec = defaultIntervalSec

_topic = defaultTopic

_on bool = false

func Stat(on bool) {

_on = on

}

func Start() {

go collect() //開始采集

}

func tryInitBaseptr() error {

var err error

if _basePtr == 0 {

_basePtr, err = _shmUtil.GetData() //獲取當前共享內存數據塊首地址

if nil != err {

logrus.Warnf(“init base ptr failed, retrying: %v”, err)

}

}

return err

}

func collect() {

var (

cost time.Duration

start time.Time

first = true

for {

if !first {

time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對齊

}

first = false

start = time.Now()

if !_on {

cost = time.Since(start)

continue

}

if _basePtr == 0 {

if err := tryInitBaseptr(); nil != err {

cost = time.Since(start)

continue

}

}

d := collectOnce()

for _, v := range d {

moni_report.ProductReportData(*v)

}

cost = time.Since(start)

}

}

func collectOnce() []*moni_report.ReportData {

now := time.Now()

var ret []*moni_report.ReportData

data := make(map[uint32]*general.GeneralData)

d := SwitchAndFetch(_basePtr)

logrus.Infof(“sending %d data from shm”, len(d))

for _, v := range d {

ruleId := strconv.FormatUint(uint64(v[0]), 10)

dim := strconv.FormatUint(uint64(v[1]), 10)

value := strconv.FormatUint(uint64(v[2]), 10)

if _, ok := data[v[0]]; !ok {

data[v[0]] = &general.GeneralData{

RuleId: proto.String(ruleId),

Data: []*general.Data{},

}

}

data[v[0]].Data = append(data[v[0]].Data, &general.Data{

Kv: map[string]string{

dim: value,

“timestamp”: strconv.FormatInt(now.Unix()*1000, 10),

“ip”: viper.GetString(“host.inner_ip”),

},

})

}

logrus.Infof(“collect format shm data:%v”, data)

for _, v := range data {

bts, err := proto.Marshal(v)

if nil != err {

logrus.Errorf(“marshal shm data failed: %v”, err)

continue

}

ret = append(ret, &moni_report.ReportData{

DataBytes: bts,

Topic: _topic,

})

}

return ret

}

9、shmutil.go 共享內存操作

每60秒根據idx值切換可讀區,采集后上報后,清零,切換到下一區。

package moni_shm

import (

“fmt”

“log”

“os”

“syscall”

“unsafe”

“github.com/sirupsen/logrus”

const (

IpcCreate = 00001000

var (

ErrNotCreated = fmt.Errorf(“shm not created”)

ErrCreateFailed = fmt.Errorf(“shm create failed”)

type shmOpt func(*ShmUtil)

func WithCreate(b bool) shmOpt {

return func(u *ShmUtil) {

u.create = b

}

}

/*共享內存數據結構

|1page mprotect|page align data|1page mprotect|

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |

*/

type ShmUtil struct {

pageSz int

dataSz uint64

total uint64

shmKey uint32

create bool

base uintptr

data uintptr

}

func NewShmUtil(key uint32, sz uint64, cfgs 。。.shmOpt) *ShmUtil {

if key == 0 {

panic(“invalid shm key: 0”)

}

ret := &ShmUtil{

dataSz: sz,

shmKey: key,

}

ret.pageSz = os.Getpagesize() //獲取頁大小

ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁分配“包體”大小

ret.total = ret.dataSz + uint64(ret.pageSz)*2 // 總空間大小=包體大小 + 頭尾各2頁保護地址

for _, c := range cfgs {

c(ret)

}

return ret

}

func (s *ShmUtil) attachShm(flag int) error {

created := false

shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag)) //使用已存在的共享內存,返回共享內存標識符

if 0 != errno {

return errno

}

if shmid 《 0 {

if !s.create { //不允創建,直接返回

return ErrNotCreated

}

shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate)) //新創建共享內存

if 0 != errno {

return fmt.Errorf(“shm create: %v”, errno)

}

if shmid 《 0 {

return ErrCreateFailed

}

created = true

}

addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0) //掛接共享內存到當前進程

if 0 != errno {

return fmt.Errorf(“shmat: %v”, errno)

}

if created {

zero(addr, s.total)//新創建的共享內存,初始化共享內存數據

}

s.base = addr //記錄共享內存首地址 用于之后的釋放

s.data = s.base + uintptr(s.pageSz) //寫數據的起始地址

_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)

if 0 != errno { //鎖定共享內存頭,鎖指定的內存區間必須包含整個內存頁(4K)

s.detach()

return fmt.Errorf(“mprotect head: %v”, errno)

}

_, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內存尾,區間開始的地址start必須是一個內存頁的起始地址,并且區間長度len必須是頁大小的整數倍。

if 0 != errno {

s.detach()

return fmt.Errorf(“mprotect tail: %v”, errno)

}

return nil

}

func (s *ShmUtil) detach() { //進程去關聯共享內存

if 0 != s.base {

syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)

s.base = 0

s.data = 0

}

}

/*

獲取內存并且返回數據段起始位置

s.create 決定是否新申請共享內存

*/

func (s *ShmUtil) GetData() (uintptr, error) {

if s.data != 0 {

return s.data, nil

}

if err := s.attachShm(0666); nil != err { //初始化共享內存,并關聯到進程

return 0, err

}

return s.data, nil

}

func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內存讀取 [][3]uint32{ossid,key,value}

if ptr == 0 {

return nil

}

m1 := (*uint32)(unsafe.Pointer(ptr))

m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))

if MagicNum1 != *m1 || MagicNum2 != *m2 {

logrus.Errorf(“magic 1 in header: wrote:%v\tread:%v\n”, MagicNum1, *m1)

logrus.Errorf(“magic 2 in tail: wrote:%v\tread:%v\n”, MagicNum2, *m2)

return nil

}

idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標志

old := *idx

*idx = 1 - *idx

ret := PartialRead(ptr, old) //讀取當前idx塊數據

zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0

return ret

}

//根據idx輪流讀數據區域

func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據idx獲取塊起始地址

startPtr := ptr + 8 + uintptr(idx)*OssMapSz

ret := ReadOssMap(startPtr)

log.Printf(“result: %v\n”, ret)

return ret

}

func ReadOssMap(ptr uintptr) [][3]uint32 { //1個周期內的指標總容量為 128*1024 = 128k = 13W

var ret [][3]uint32

var i uint32 = 0

for i = 0; i 《 OssMapOneAttrCnt; i++ { //1個周期最多支持1024個業務

for _, v := range ReadOneAttr(ptr) {

ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]

}

ptr += OneAttrSz // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4

}

return ret

}

func ReadOneAttr(ptr uintptr) [][2]uint32 {

var ret [][2]uint32

var i uint32 = 0

for i = 0; i 《 OssOneAttrEntryCnt; i++ { //目前默認一個業務下最多有128單維度指標, OssOneAttrEntryCnt = 128

v := *(*uint32)(unsafe.Pointer(ptr))

if v != 0 {

ret = append(ret, [2]uint32{i, v}) // [keyID, value]

}

ptr += EntrySz // 4yte 讀取一個指標

}

return ret

}

四、總結

本文通過共享內存的原理和詳細分析了一個共享內存在生產上的應用場景,希望能為大家拋磚引玉。
編輯:hfy

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • Linux
    +關注

    關注

    87

    文章

    11232

    瀏覽量

    208950
  • 內存管理
    +關注

    關注

    0

    文章

    168

    瀏覽量

    14128
  • 共享內存
    +關注

    關注

    0

    文章

    16

    瀏覽量

    8309
收藏 人收藏

    評論

    相關推薦

    PLC數據采集模塊選型指南

    數據類型 :模擬信號(如溫度、壓力)或數字信號(如開關狀態、計數器)。 數據量 :需要采集數據點數量。 采樣率 :數據更新的頻率。 精
    的頭像 發表于 11-26 11:46 ?222次閱讀

    振弦式土壓力計的數據采集方法

    空隙或松動。可以采用預、鉆孔等方式進行安裝,并使用合適的固定材料將土壓力計固定在安裝位置上。   連接數據采集設備   將振弦式土壓力計與數據采集設備連接起來。常見的數據采集設備有振
    發表于 10-25 14:26

    IOT數據采集平臺的功能特點

    的深遠影響。 IOT數據采集平臺的定義 IOT數據采集平臺是一種專門用于物聯網數據采集、處理和分析的平臺。它通過連接各種工業設備、傳感器、儀器儀表、工業機器人等,實現對設備
    的頭像 發表于 09-25 13:28 ?413次閱讀

    工控數據采集物聯網平臺是什么

    工控數據采集物聯網平臺是一種集成化的軟件系統,它主要用于在工業環境中收集、處理、分析和管理來自各種設備和傳感器的數據。這種平臺結合了物聯網(IoT)技術,能夠實現對工業設備的遠程監控、管理和控制
    的頭像 發表于 07-23 15:29 ?335次閱讀

    多通道數據采集儀怎么用的

    連接、軟件設置、數據采集數據分析等方面的內容。 一、多通道數據采集儀概述 1.1 多通道數據采集儀的定義 多通道數據采集儀是一種能夠同時
    的頭像 發表于 07-02 09:08 ?528次閱讀

    plc物聯網數據采集平臺是什么

    PLC物聯網數據采集平臺是基于物聯網技術,將多個PLC設備連接到云端的數據采集與管理系統。通過采集分析PLC產生的數據,實現對生產過程的實
    的頭像 發表于 06-24 15:18 ?716次閱讀

    工業數據采集網關功能優勢

    隨著工業4.0和物聯網(IoT)技術的快速發展,工業數據采集網關已成為現代工業生產中不可或缺的一部分。這些網關設備充當了連接現場設備與上層管理系統的橋梁,它們實時收集、傳輸和分析來自生產線的數據,為
    的頭像 發表于 05-11 17:51 ?814次閱讀
    工業<b class='flag-5'>數據采集</b>網關功能優勢

    數據采集邊緣網關解決企業數據采集痛點的關鍵

    網關 應運而生,成為解決企業數據采集痛點的關鍵所在。 一、企業背景與痛點分析 在當前信息化、智能化的時代背景下,許多企業面臨著海量數據采集和處理的難題。這些企業通常擁有多個分散的業務場景,如工廠生產線、物流倉庫
    的頭像 發表于 04-07 13:56 ?358次閱讀

    數據采集器是什么設備 數據采集器屬于什么設備類型

    數據采集器是一種用于采集和記錄數據的設備。它可以連接到各種傳感器、儀器或其他數據源,收集數據并將其存儲在內部存儲器或外部設備中,以供后續
    的頭像 發表于 02-04 10:27 ?3900次閱讀

    plc數據采集模塊的缺點 plc數據采集模塊與數據采集卡的區別

    PLC(可編程邏輯控制器)數據采集模塊是用于連接傳感器、執行器和機器設備,收集實時數據的設備。雖然PLC數據采集模塊在工業自動化領域得到了廣泛應用,但它仍然存在一些缺點,而與之相比,數據采集
    的頭像 發表于 01-19 14:20 ?1673次閱讀

    工業物聯網數據采集遠程監控系統如何實現

    工業4.0時代對于數據采集實時監控的需求越來越高,可以實現多個設備、多種數據、多個功能的數據共享數據分析,從而幫助對生產、能耗、工藝等方面
    的頭像 發表于 12-25 15:28 ?1136次閱讀
    工業物聯網<b class='flag-5'>數據采集</b>遠程監控系統如何實現

    數據采集卡怎么用 數據采集卡怎么讀取數據

    將物理量(如溫度、壓力、光強等)轉換為數字信號,供電腦進行處理和分析。 以下是使用和讀取數據采集卡的詳細步驟: 1. 硬件連接: 首先,確保數據采集卡與電腦通過適當的接口連接,如USB、PCI等。根據
    的頭像 發表于 12-15 09:50 ?1719次閱讀

    labview用隊列實現DAQ高速數據采集分析,程序卡頓報錯

    電壓數據采集與實時分析,程序用消費者生產者模式,數據采集生產者,數據分析用消費者,采樣率50k,運行程序就開始采集,但是速率明顯不是設置的5
    發表于 12-15 09:04

    數據采集網關:工業數據采集上云

    實現數據的整合、轉換和分析數據采集網關功能數據采集網關具備了強大的數據采集能力。它可以從各種數據
    的頭像 發表于 12-12 16:46 ?759次閱讀
    <b class='flag-5'>數據采集</b>網關:工業<b class='flag-5'>數據采集</b>上云

    專注數據采集分析系統研發 做設備與MES系統中轉站

    數據采集是實現MES系統與設備對接的核心環節。通過采集設備產生的實時數據,將其傳輸給MES系統進行處理和分析數據采集可以通過直接連接設備的
    發表于 12-01 17:09