怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

本篇文章給大家分享的是有關(guān)怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn),小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

成都創(chuàng)新互聯(lián)專注于治多企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),成都做商城網(wǎng)站。治多網(wǎng)站建設(shè)公司,為治多等地區(qū)提供建站服務(wù)。全流程按需網(wǎng)站開發(fā),專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

一、前言

共享內(nèi)存廣泛用于redis,Kafka,RabbitMQ 等高性能組件中,本文主要提供一個共享內(nèi)存在廣告埋點數(shù)據(jù)采集的實戰(zhàn)場景。

二、共享內(nèi)存原理

1、原理

在Linux中,每個進程都有屬于自己的進程控制塊(PCB)和地址空間(Addr Space),并且都有一個與之對應(yīng)的頁表,負責將進程的虛擬地址與物理地址進行映射,通過內(nèi)存管理單元(MMU)進行管理。兩個不同的虛擬地址通過頁表映射到物理空間的同一區(qū)域,它們所指向的這塊區(qū)域即共享內(nèi)存。

當兩個進程通過頁表將虛擬地址映射到物理地址時,在物理地址中有一塊共同的內(nèi)存區(qū),即共享內(nèi)存,這塊內(nèi)存可以被兩個進程同時看到。這樣當一個進程進行寫操作,另一個進程讀操作就可以實現(xiàn)進程間通信。但是,我們要確保一個進程在寫的時候不能被讀,因此我們使用信號量來實現(xiàn)同步與互斥。

對于一個共享內(nèi)存,實現(xiàn)采用的是引用計數(shù)的原理,當進程脫離共享存儲區(qū)后,計數(shù)器減一,掛架成功時,計數(shù)器加一,只有當計數(shù)器變?yōu)榱銜r,才能被刪除。當進程終止時,它所附加的共享存儲區(qū)都會自動脫離。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

2、與傳統(tǒng)文件對比

共享內(nèi)存可以說是最有用的進程間通信方式,也是最快的IPC形式, 因為進程可以直接讀寫內(nèi)存,而不需要任何 數(shù)據(jù)的拷貝。對于像管道和消息隊列等通信方式,則需要在內(nèi)核和用戶空間進行四次的數(shù)據(jù)拷貝 共享內(nèi)存則只拷貝兩次數(shù)據(jù): 一次從輸入文件到共享內(nèi)存區(qū),另一次從共享內(nèi)存區(qū)到輸出文件。

實際上,進程之間在共享內(nèi) 存時,并不總是讀寫少量數(shù)據(jù)后就解除映射,有新的通信時,再重新建立共享內(nèi)存區(qū)域。而是保持共享區(qū)域,直 到通信完畢為止,這樣,數(shù)據(jù)內(nèi)容一直保存在共享內(nèi)存中,并沒有寫回文件。共享內(nèi)存中的內(nèi)容往往是在解除映 射時才寫回文件的。因此,采用共享內(nèi)存的通信方式效率是非常高的。

傳統(tǒng)文件

UNIX 訪問文件的傳統(tǒng)方法是用 open 打開它們,如果有多個進程訪問同一個文件,則每一個進程在自己的地址空間都包含有該文件的副本,這不必要地浪費了存儲空間。

下圖說明了兩個進程同時讀一個文件的同一頁的情形。系統(tǒng)要將該頁從磁盤讀到高速緩沖區(qū)中,每個進程再執(zhí)行一個存儲器內(nèi)的復制操作將數(shù)據(jù)從高速緩沖區(qū)讀到自己的地址空間。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

共享存儲映射

現(xiàn)在考慮另一種處理方法:進程 A 和進程 B 都將該頁映射到自己的地址空間,當進程 A 第一次訪問該頁中的數(shù)據(jù)時, 它生成一個缺頁中斷。內(nèi)核此時讀入這一頁到內(nèi)存并更新頁表使之指向它。以后,當進程B訪問同一頁面而出現(xiàn)缺頁中斷時,該頁已經(jīng)在內(nèi)存,內(nèi)核只需要將進程 B 的頁表登記項指向次頁即可。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

3、mmap()

(1)mmap()系統(tǒng)調(diào)用

mmap()系統(tǒng)調(diào)用使得進程之間通過映射同一個普通文件實現(xiàn)共享內(nèi)存。普通文件被映射到進程地址空間后,進程可以向訪問普通內(nèi)存一樣對文件進行訪問,不必再調(diào)用read(),write()等操作。

mmap()系統(tǒng)調(diào)用形式如下:

void* mmap ( void * addr , size_t len , int prot , int flags , int fd , off_t offset )

mmap的作用是映射文件描述符fd指定文件的 [off,off + len]區(qū)域至調(diào)用進程的[addr, addr + len]的內(nèi)存區(qū)域:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

  • 數(shù)fd為即將映射到進程空間的文件描述字,一般由open()返回,同時,fd可以指定為-1,此時須指定flags參數(shù)中的,MAP_ANON,表明進行的是匿名映射(不涉及具體的文件名,避免了文件的創(chuàng)建及打開,很顯然只能用于具有親緣關(guān)系的進程間通信)。

  • len是映射到調(diào)用進程地址空間的字節(jié)數(shù),它從被映射文件開頭offset個字節(jié)開始算起。

  • prot 參數(shù)指定共享內(nèi)存的訪問權(quán)限。可取如下幾個值的或:PROT_READ(可讀) , PROT_WRITE (可寫), PROT_EXEC (可執(zhí)行), PROT_NONE(不可訪問)。

  • flags由以下幾個常值指定:MAP_SHARED , MAP_PRIVATE , MAP_FIXED,其中,MAP_SHARED , MAP_PRIVATE必選其一,而MAP_FIXED則不推薦使用。

  • offset參數(shù)一般設(shè)為0,表示從文件頭開始映射。

  • 參數(shù)addr指定文件應(yīng)被映射到進程空間的起始地址,一般被指定一個空指針,此時選擇起始地址的任務(wù)留給內(nèi)核來完成。函數(shù)的返回值為最后文件映射到進程空間的地址,進程可直接操作起始地址為該值的有效地址。

(2)mmap()返回地址的訪問

對mmap()返回地址的訪問,linux采用的是頁式管理機制。

對于用mmap()映射普通文件來說,進程會在自己的地址空間新增一塊空間,空間大小由mmap()的len參數(shù)指定,注意,進程并不一定能夠?qū)θ啃略隹臻g都能進行有效訪問。

進程能夠訪問的有效地址大小取決于文件被映射部分的大小。

簡單的說,能夠容納文件被映射部分大小的最少頁面?zhèn)€數(shù)決定了進程從mmap()返回的地址開始,能夠有效訪問的地址空間大小。

超過這個空間大小,內(nèi)核會根據(jù)超過的嚴重程度返回發(fā)送不同的信號給進程??捎萌缦聢D示說明:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

2、分區(qū)讀寫

 為了要確保一個進程在寫的時候不能被讀,我們使用idx來標記可讀塊。

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

3、規(guī)則,指標和值

下圖描述的是從連續(xù)內(nèi)存空間轉(zhuǎn)化成【規(guī)則,維度,值】語義的過程:

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

4、源碼分析

怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)

5、general.proto 

通用監(jiān)控上報協(xié)議:

general.proto
 
syntax = "proto2";
package general;
message Data {
    map<string, string> kv = 1;
}
message GeneralData {
    optional string rule_id = 1;
    repeated Data data = 2;
    optional int64 count = 3;
    optional int64 left_size = 4;
    optional int32 version = 5;
}

6、constant.go 配置參數(shù)

| 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |

 

package moni_shm
 
const (
   OssShmId           uint32 = 0x3eeff00
   MagicNum1          uint32 = 0x650a218
   MagicNum2          uint32 = 0x138a4f2
   CreateShmLock             = "/var/run/.oss_shm_lock"
   OssMapOneAttrCnt          = 1024 * 128      //1024 個規(guī)則
   OssOneAttrEntryCnt        = 128             //每個規(guī)則有128個指標
   EntrySz                   = 4
   OssMapCnt                 = 2
 
   OneAttrSz = OssOneAttrEntryCnt * EntrySz
   OssMapSz  = OssMapOneAttrCnt * OneAttrSz
   OssAttrSz = OssMapSz*OssMapCnt + 4 + 4 + 64*4 + 4
 
   defaultIntervalSec = 60
   defaultTopic       = "moni_general_shared_memory"
)

7、util.go 工具類

內(nèi)存清零工具和"整頁"分配:

cd package moni_shm
import (
    "unsafe"
)
//取整分配
func align(actual, to uint64) uint64 {
    return (actual + to - 1) / to * to
}
//連續(xù)空間清0
func zero(ptr uintptr, bts uint64) {
    if 0 == bts {
        return
    }
    const sz = 4096
    var next uint64
    cnt := 0
    for ; next+sz <= bts; {  //按頁清零
        arr := (*[sz]byte)(unsafe.Pointer(ptr))
        for i := range *arr {
            (*arr)[i] = 0
        }
        next += sz
        ptr += uintptr(sz)
        cnt++
    }
    if next == bts {
        return
    }
    var i uintptr
    for i = 0; i < uintptr(bts-next); i++ { //剩余空間清零
        *(*byte)(unsafe.Pointer(ptr + i)) = 0
    }
}

8、mgr.go 采集邏輯

共享內(nèi)存采集邏輯對應(yīng) “規(guī)則指標和值”:

var (
    _basePtr     uintptr = 0
    _shmUtil             = NewShmUtil(OssShmId, OssAttrSz)
    _intervalSec         = defaultIntervalSec
    _topic               = defaultTopic
    _on          bool    = false
)
func Stat(on bool) {
    _on = on
}
func Start() {
    go collect() //開始采集
}
func tryInitBaseptr() error {
    var err error
    if _basePtr == 0 {
        _basePtr, err = _shmUtil.GetData() //獲取當前共享內(nèi)存數(shù)據(jù)塊首地址
        if nil != err {
            logrus.Warnf("init base ptr failed, retrying: %v", err)
        }
    }
    return err
}
func collect() {
    var (
        cost  time.Duration
        start time.Time
        first = true
    )
    for {
        if !first {
            time.Sleep(time.Second*(time.Duration(_intervalSec)) - cost) //周期對齊
        }
        first = false
        start = time.Now()
        if !_on {
            cost = time.Since(start)
            continue
        }
        if _basePtr == 0 {
            if err := tryInitBaseptr(); nil != err {
                cost = time.Since(start)
                continue
            }
        }
        d := collectOnce()
        for _, v := range d {
            moni_report.ProductReportData(*v)
        }
        cost = time.Since(start)
    }
}
 
func collectOnce() []*moni_report.ReportData {
   now := time.Now()
   var ret []*moni_report.ReportData
   data := make(map[uint32]*general.GeneralData)
 
   d := SwitchAndFetch(_basePtr)
   logrus.Infof("sending %d data from shm", len(d))
 
   for _, v := range d {
      ruleId := strconv.FormatUint(uint64(v[0]), 10)
      dim := strconv.FormatUint(uint64(v[1]), 10)
      value := strconv.FormatUint(uint64(v[2]), 10)
 
      if _, ok := data[v[0]]; !ok {
         data[v[0]] = &general.GeneralData{
            RuleId: proto.String(ruleId),
            Data:   []*general.Data{},
         }
      }
 
      data[v[0]].Data = append(data[v[0]].Data, &general.Data{
         Kv: map[string]string{
            dim:         value,
            "timestamp": strconv.FormatInt(now.Unix()*1000, 10),
            "ip":        viper.GetString("host.inner_ip"),
         },
      })
   }
   logrus.Infof("collect format shm data:%v", data)
   for _, v := range data {
      bts, err := proto.Marshal(v)
      if nil != err {
         logrus.Errorf("marshal shm data failed: %v", err)
         continue
      }
      ret = append(ret, &moni_report.ReportData{
         DataBytes: bts,
         Topic:     _topic,
      })
   }
 
   return ret
}

9、shmutil.go 共享內(nèi)存操作

每60秒根據(jù)idx值切換可讀區(qū),采集后上報后,清零,切換到下一區(qū)。

package moni_shm
import (
    "fmt"
    "log"
    "os"
    "syscall"
    "unsafe"
    "github.com/sirupsen/logrus"
)
const (
    IpcCreate = 00001000
)
var (
    ErrNotCreated   = fmt.Errorf("shm not created")
    ErrCreateFailed = fmt.Errorf("shm create failed")
)
type shmOpt func(*ShmUtil)
func WithCreate(b bool) shmOpt {
    return func(u *ShmUtil) {
        u.create = b
    }
}
/*共享內(nèi)存數(shù)據(jù)結(jié)構(gòu)
     |1page mprotect|page align data|1page mprotect|
     | 4k protect | magincNum1(4byte) | idx(4byte) | OssMapSz(1024*128byte)*2 | 4*64byte預留長度 | magincNum2(4byte) | 4k protect |
*/
type ShmUtil struct {
    pageSz int
    dataSz uint64
    total  uint64
    shmKey uint32
    create bool
    base uintptr
    data uintptr
}
func NewShmUtil(key uint32, sz uint64, cfgs ...shmOpt) *ShmUtil {
    if key == 0 {
        panic("invalid shm key: 0")
    }
    ret := &ShmUtil{
        dataSz: sz,
        shmKey: key,
    }
    ret.pageSz = os.Getpagesize() //獲取頁大小
    ret.dataSz = align(ret.dataSz, uint64(ret.pageSz)) //按頁分配“包體”大小
    ret.total = ret.dataSz + uint64(ret.pageSz)*2     // 總空間大小=包體大小 + 頭尾各2頁保護地址
    for _, c := range cfgs {
        c(ret)
    }
    return ret
}
func (s *ShmUtil) attachShm(flag int) error {
    created := false
    shmid, _, errno := syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag))     //使用已存在的共享內(nèi)存,返回共享內(nèi)存標識符
    if 0 != errno {
        return errno
    }
    if shmid < 0 {
        if !s.create {  //不允創(chuàng)建,直接返回
            return ErrNotCreated
        }
        shmid, _, errno = syscall.Syscall(syscall.SYS_SHMGET, uintptr(s.shmKey), uintptr(s.total), uintptr(flag|IpcCreate))  //新創(chuàng)建共享內(nèi)存
        if 0 != errno {
            return fmt.Errorf("shm create: %v", errno)
        }
        if shmid < 0 {
            return ErrCreateFailed
        }
        created = true
    }
    addr, _, errno := syscall.Syscall(syscall.SYS_SHMAT, shmid, 0, 0)  //掛接共享內(nèi)存到當前進程
    if 0 != errno {
        return fmt.Errorf("shmat: %v", errno)
    }
    if created {
        zero(addr, s.total)//新創(chuàng)建的共享內(nèi)存,初始化共享內(nèi)存數(shù)據(jù)
    }
    s.base = addr //記錄共享內(nèi)存首地址 用于之后的釋放
    s.data = s.base + uintptr(s.pageSz) //寫數(shù)據(jù)的起始地址
     
    _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.base, uintptr(s.pageSz), 0)
    if 0 != errno { //鎖定共享內(nèi)存頭,鎖指定的內(nèi)存區(qū)間必須包含整個內(nèi)存頁(4K)
        s.detach()
        return fmt.Errorf("mprotect head: %v", errno)
    }
    _, _, errno = syscall.Syscall(syscall.SYS_MPROTECT, s.data+uintptr(s.dataSz), uintptr(s.pageSz), 0) //鎖指定共享內(nèi)存尾,區(qū)間開始的地址start必須是一個內(nèi)存頁的起始地址,并且區(qū)間長度len必須是頁大小的整數(shù)倍。
    if 0 != errno {
        s.detach()
        return fmt.Errorf("mprotect tail: %v", errno)
    }
    return nil
}
func (s *ShmUtil) detach() { //進程去關(guān)聯(lián)共享內(nèi)存
    if 0 != s.base {
        syscall.Syscall(syscall.SYS_SHMDT, s.base, 0, 0)
        s.base = 0
        s.data = 0
    }
}
/*
  獲取內(nèi)存并且返回數(shù)據(jù)段起始位置
  s.create 決定是否新申請共享內(nèi)存
*/
func (s *ShmUtil) GetData() (uintptr, error) {
    if s.data != 0 {
        return s.data, nil
    }
    if err := s.attachShm(0666); nil != err { //初始化共享內(nèi)存,并關(guān)聯(lián)到進程
        return 0, err
    }
    return s.data, nil
}
func SwitchAndFetch(ptr uintptr) [][3]uint32 { //從共享內(nèi)存讀取 [][3]uint32{ossid,key,value}
    if ptr == 0 {
        return nil
    }
    m1 := (*uint32)(unsafe.Pointer(ptr))
    m2 := (*uint32)(unsafe.Pointer(ptr + 8 + OssMapSz*2 + 4*64))   
    if MagicNum1 != *m1 || MagicNum2 != *m2 {
        logrus.Errorf("magic 1 in header: wrote:%v\tread:%v\n", MagicNum1, *m1)
        logrus.Errorf("magic 2 in tail:   wrote:%v\tread:%v\n", MagicNum2, *m2)
        return nil
    }
    idx := (*uint32)(unsafe.Pointer(ptr + 4)) //切換塊標志
    old := *idx
    *idx = 1 - *idx
    ret := PartialRead(ptr, old)  //讀取當前idx塊數(shù)據(jù)
    zero(ptr+8+uintptr(old)*OssMapSz, OssMapSz) //讀完清0
    return ret
}
//根據(jù)idx輪流讀數(shù)據(jù)區(qū)域
func PartialRead(ptr uintptr, idx uint32) [][3]uint32 { //根據(jù)idx獲取塊起始地址
    startPtr := ptr + 8 + uintptr(idx)*OssMapSz
    ret := ReadOssMap(startPtr)
    log.Printf("result: %v\n", ret)
    return ret
}
func ReadOssMap(ptr uintptr) [][3]uint32 { //1個周期內(nèi)的指標總?cè)萘繛?nbsp;128*1024 = 128k  = 13W
    var ret [][3]uint32
    var i uint32 = 0
    for i = 0; i < OssMapOneAttrCnt; i++ {  //1個周期最多支持1024個業(yè)務(wù)
        for _, v := range ReadOneAttr(ptr) {
            ret = append(ret, [3]uint32{i, v[0], v[1]}) // [osID,keyID,value]
        }
        ptr += OneAttrSz  // OneAttrSz = OssOneAttrEntryCnt * EntrySz= 128*4
    }
    return ret
}
func ReadOneAttr(ptr uintptr) [][2]uint32 {
    var ret [][2]uint32
    var i uint32 = 0
    for i = 0; i < OssOneAttrEntryCnt; i++ { //目前默認一個業(yè)務(wù)下最多有128單維度指標, OssOneAttrEntryCnt = 128
        v := *(*uint32)(unsafe.Pointer(ptr))
        if v != 0 {
            ret = append(ret, [2]uint32{i, v}) // [keyID, value]
        }
        ptr += EntrySz  // 4yte 讀取一個指標
    }
    return ret
}

以上就是怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn),小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

新聞名稱:怎么解析共享內(nèi)存原理與VCS監(jiān)控采集實戰(zhàn)
網(wǎng)頁URL:http://bm7419.com/article18/pscsgp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機網(wǎng)站建設(shè)網(wǎng)頁設(shè)計公司、定制網(wǎng)站、關(guān)鍵詞優(yōu)化、營銷型網(wǎng)站建設(shè)、微信小程序

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管