golang協(xié)程池設(shè)計詳解

Why Pool

橋西網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)公司于2013年開始到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。

go自從出生就身帶“高并發(fā)”的標(biāo)簽,其并發(fā)編程就是由groutine實現(xiàn)的,因其消耗資源低,性能高效,開發(fā)成本低的特性而被廣泛應(yīng)用到各種場景,例如服務(wù)端開發(fā)中使用的HTTP服務(wù),在golang net/http包中,每一個被監(jiān)聽到的tcp鏈接都是由一個groutine去完成處理其上下文的,由此使得其擁有極其優(yōu)秀的并發(fā)量吞吐量

for {
    // 監(jiān)聽tcp
    rw, e := l.Accept()
    if e != nil {
      .......
    }
    tempDelay = 0
    c := srv.newConn(rw)
    c.setState(c.rwc, StateNew) // before Serve can return
    // 啟動協(xié)程處理上下文
    go c.serve(ctx)
}

雖然創(chuàng)建一個groutine占用的內(nèi)存極小(大約2KB左右,線程通常2M左右),但是在實際生產(chǎn)環(huán)境無限制的開啟協(xié)程顯然是不科學(xué)的,比如上圖的邏輯,如果來幾千萬個請求就會開啟幾千萬個groutine,當(dāng)沒有更多內(nèi)存可用時,go的調(diào)度器就會阻塞groutine最終導(dǎo)致內(nèi)存溢出乃至嚴(yán)重的崩潰,所以本文將通過實現(xiàn)一個簡單的協(xié)程池,以及剖析幾個開源的協(xié)程池源碼來探討一下對groutine的并發(fā)控制以及多路復(fù)用的設(shè)計和實現(xiàn)。

一個簡單的協(xié)程池

過年前做過一波小需求,是將主播管理系統(tǒng)中信息不完整的主播找出來然后再到其相對應(yīng)的直播平臺爬取完整信息并補全,當(dāng)時考慮到每一個主播的數(shù)據(jù)都要訪問一次直播平臺所以就用應(yīng)對每一個主播開啟一個groutine去抓取數(shù)據(jù),雖然這個業(yè)務(wù)量還遠遠遠遠達不到能造成groutine性能瓶頸的地步,但是心里總是不舒服,于是放假回來后將其優(yōu)化成從協(xié)程池中控制groutine數(shù)量再開啟爬蟲進行數(shù)據(jù)抓取。思路其實非常簡單,用一個channel當(dāng)做任務(wù)隊列,初始化groutine池時確定好并發(fā)量,然后以設(shè)置好的并發(fā)量開啟groutine同時讀取channel中的任務(wù)并執(zhí)行, 模型如下圖

golang協(xié)程池設(shè)計詳解

實現(xiàn)

type SimplePool struct {
  wg  sync.WaitGroup
  work chan func() //任務(wù)隊列
}

func NewSimplePoll(workers int) *SimplePool {
  p := &SimplePool{
    wg:  sync.WaitGroup{},
    work: make(chan func()),
  }
  p.wg.Add(workers)
  //根據(jù)指定的并發(fā)量去讀取管道并執(zhí)行
  for i := 0; i < workers; i++ {
    go func() {
      defer func() {
        // 捕獲異常 防止waitGroup阻塞
        if err := recover(); err != nil {
          fmt.Println(err)
          p.wg.Done()
        }
      }()
      // 從workChannel中取出任務(wù)執(zhí)行
      for fn := range p.work {
        fn()
      }
      p.wg.Done()
    }()
  }
  return p
}
// 添加任務(wù)
func (p *SimplePool) Add(fn func()) {
  p.work <- fn
}

// 執(zhí)行
func (p *SimplePool) Run() {
  close(p.work)
  p.wg.Wait()
}

測試

測試設(shè)定為在并發(fā)數(shù)量為20的協(xié)程池中并發(fā)抓取一百個人的信息, 因為代碼包含較多業(yè)務(wù)邏輯所以sleep 1秒模擬爬蟲過程,理論上執(zhí)行時間為5秒

func TestSimplePool(t *testing.T) {
  p := NewSimplePoll(20)
  for i := 0; i < 100; i++ {
    p.Add(parseTask(i))
  }
  p.Run()
}

func parseTask(i int) func() {
  return func() {
    // 模擬抓取數(shù)據(jù)的過程
    time.Sleep(time.Second * 1)
    fmt.Println("finish parse ", i)
  }
}

golang協(xié)程池設(shè)計詳解

這樣一來最簡單的一個groutine池就完成了

go-playground/pool

上面的groutine池雖然簡單,但是對于每一個并發(fā)任務(wù)的狀態(tài),pool的狀態(tài)缺少控制,所以又去看了一下go-playground/pool的源碼實現(xiàn),先從每一個需要執(zhí)行的任務(wù)入手,該庫中對并發(fā)單元做了如下的結(jié)構(gòu)體,可以看到除工作單元的值,錯誤,執(zhí)行函數(shù)等,還用了三個分別表示,取消,取消中,寫 的三個并發(fā)安全的原子操作值來標(biāo)識其運行狀態(tài)。

// 需要加入pool 中執(zhí)行的任務(wù)
type WorkFunc func(wu WorkUnit) (interface{}, error)

// 工作單元
type workUnit struct {
  value   interface{}  // 任務(wù)結(jié)果 
  err    error     // 任務(wù)的報錯
  done    chan struct{} // 通知任務(wù)完成
  fn     WorkFunc  
  cancelled atomic.Value  // 任務(wù)是否被取消
  cancelling atomic.Value  // 是否正在取消任務(wù)
  writing  atomic.Value  // 任務(wù)是否正在執(zhí)行
}

接下來看Pool的結(jié)構(gòu)

type limitedPool struct {
  workers uint      // 并發(fā)量 
  work  chan *workUnit // 任務(wù)channel
  cancel chan struct{}  // 用于通知結(jié)束的channel
  closed bool      // 是否關(guān)閉
  m    sync.RWMutex  // 讀寫鎖,主要用來保證 closed值的并發(fā)安全
}

初始化groutine池, 以及啟動設(shè)定好數(shù)量的groutine

// 初始化pool,設(shè)定并發(fā)量
func NewLimited(workers uint) Pool {
  if workers == 0 {
    panic("invalid workers '0'")
  }
  p := &limitedPool{
    workers: workers,
  }
  p.initialize()
  return p
}

func (p *limitedPool) initialize() {
  p.work = make(chan *workUnit, p.workers*2)
  p.cancel = make(chan struct{})
  p.closed = false
  for i := 0; i < int(p.workers); i++ {
    // 初始化并發(fā)單元
    p.newWorker(p.work, p.cancel)
  }
}

// passing work and cancel channels to newWorker() to avoid any potential race condition
// betweeen p.work read & write
func (p *limitedPool) newWorker(work chan *workUnit, cancel chan struct{}) {
  go func(p *limitedPool) {

    var wu *workUnit

    defer func(p *limitedPool) {
      // 捕獲異常,結(jié)束掉異常的工作單元,并將其再次作為新的任務(wù)啟動
      if err := recover(); err != nil {

        trace := make([]byte, 1<<16)
        n := runtime.Stack(trace, true)

        s := fmt.Sprintf(errRecovery, err, string(trace[:int(math.Min(float64(n), float64(7000)))]))

        iwu := wu
        iwu.err = &ErrRecovery{s: s}
        close(iwu.done)

        // need to fire up new worker to replace this one as this one is exiting
        p.newWorker(p.work, p.cancel)
      }
    }(p)

    var value interface{}
    var err error

    for {
      select {
      // workChannel中讀取任務(wù)
      case wu = <-work:

        // 防止channel 被關(guān)閉后讀取到零值
        if wu == nil {
          continue
        }

        // 先判斷任務(wù)是否被取消
        if wu.cancelled.Load() == nil {
          // 執(zhí)行任務(wù)
          value, err = wu.fn(wu)
          wu.writing.Store(struct{}{})
          
          // 任務(wù)執(zhí)行完在寫入結(jié)果時需要再次檢查工作單元是否被取消,防止產(chǎn)生競爭條件
          if wu.cancelled.Load() == nil && wu.cancelling.Load() == nil {
            wu.value, wu.err = value, err
            close(wu.done)
          }
        }
      // pool是否被停止
      case <-cancel:
        return
      }
    }

  }(p)
}

往POOL中添加任務(wù),并檢查pool是否關(guān)閉

func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
  w := &workUnit{
    done: make(chan struct{}),
    fn:  fn,
  }

  go func() {
    p.m.RLock()
    if p.closed {
      w.err = &ErrPoolClosed{s: errClosed}
      if w.cancelled.Load() == nil {
        close(w.done)
      }
      p.m.RUnlock()
      return
    }
    // 將工作單元寫入workChannel, pool啟動后將由上面newWorker函數(shù)中讀取執(zhí)行
    p.work <- w
    p.m.RUnlock()
  }()

  return w
}

在go-playground/pool包中, limitedPool的批量并發(fā)執(zhí)行還需要借助batch.go來完成

// batch contains all information for a batch run of WorkUnits
type batch struct {
  pool  Pool     // 上面的limitedPool實現(xiàn)了Pool interface
  m    sync.Mutex  // 互斥鎖,用來判斷closed
  units  []WorkUnit  // 工作單元的slice, 這個主要用在不設(shè)并發(fā)限制的場景,這里忽略
  results chan WorkUnit // 結(jié)果集,執(zhí)行完后的workUnit會更新其value,error,可以從結(jié)果集channel中讀取
  done  chan struct{} // 通知batch是否完成
  closed bool
  wg   *sync.WaitGroup
}
// go-playground/pool 中有設(shè)置并發(fā)量和不設(shè)并發(fā)量的批量任務(wù),都實現(xiàn)Pool interface,初始化batch批量任務(wù)時會將之前創(chuàng)建好的Pool傳入newBatch
func newBatch(p Pool) Batch {
  return &batch{
    pool:  p,
    units:  make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
    results: make(chan WorkUnit),
    done:  make(chan struct{}),
    wg:   new(sync.WaitGroup),
  }
}

// 往批量任務(wù)中添加workFunc任務(wù)
func (b *batch) Queue(fn WorkFunc) {

  b.m.Lock()
  if b.closed {
    b.m.Unlock()
    return
  }
  //往上述的limitPool中添加workFunc
  wu := b.pool.Queue(fn)

  b.units = append(b.units, wu) // keeping a reference for cancellation purposes
  b.wg.Add(1)
  b.m.Unlock()
  
  // 執(zhí)行完后將workUnit寫入結(jié)果集channel
  go func(b *batch, wu WorkUnit) {
    wu.Wait()
    b.results <- wu
    b.wg.Done()
  }(b, wu)
}

// 通知批量任務(wù)不再接受新的workFunc, 如果添加完workFunc不執(zhí)行改方法的話將導(dǎo)致取結(jié)果集時done channel一直阻塞
func (b *batch) QueueComplete() {
  b.m.Lock()
  b.closed = true
  close(b.done)
  b.m.Unlock()
}

// 獲取批量任務(wù)結(jié)果集
func (b *batch) Results() <-chan WorkUnit {
  go func(b *batch) {
    <-b.done
    b.m.Lock()
    b.wg.Wait()
    b.m.Unlock()
    close(b.results)
  }(b)
  return b.results
}

測試

func SendMail(int int) pool.WorkFunc {
  fn := func(wu pool.WorkUnit) (interface{}, error) {
    // sleep 1s 模擬發(fā)郵件過程
    time.Sleep(time.Second * 1)
    // 模擬異常任務(wù)需要取消
    if int == 17 {
      wu.Cancel()
    }
    if wu.IsCancelled() {
      return false, nil
    }
    fmt.Println("send to", int)
    return true, nil
  }
  return fn
}

func TestBatchWork(t *testing.T) {
  // 初始化groutine數(shù)量為20的pool
  p := pool.NewLimited(20)
  defer p.Close()
  batch := p.Batch()
  // 設(shè)置一個批量任務(wù)的過期超時時間
  t := time.After(10 * time.Second)
  go func() {
    for i := 0; i < 100; i++ {
      batch.Queue(SendMail(i))
    }
    batch.QueueComplete()
  }()
  // 因為 batch.Results 中要close results channel 所以不能將其放在LOOP中執(zhí)行
  r := batch.Results()
LOOP:
  for {
    select {
    case <-t:
    // 登臺超時通知
      fmt.Println("recived timeout")
      break LOOP
   
    case email, ok := <-r:
    // 讀取結(jié)果集
      if ok {
        if err := email.Error(); err != nil {
          fmt.Println("err", err.Error())
        }
        fmt.Println(email.Value())
      } else {
        fmt.Println("finish")
        break LOOP
      }
    }
  }
}

  

golang協(xié)程池設(shè)計詳解

golang協(xié)程池設(shè)計詳解

接近理論值5s, 通知模擬被取消的work也正常取消

go-playground/pool在比起之前簡單的協(xié)程池的基礎(chǔ)上, 對pool, worker的狀態(tài)有了很好的管理。但是,但是問題來了,在第一個實現(xiàn)的簡單groutine池和go-playground/pool中,都是先啟動預(yù)定好的groutine來完成任務(wù)執(zhí)行,在并發(fā)量遠小于任務(wù)量的情況下確實能夠做到groutine的復(fù)用,如果任務(wù)量不多則會導(dǎo)致任務(wù)分配到每個groutine不均勻,甚至可能出現(xiàn)啟動的groutine根本不會執(zhí)行任務(wù)從而導(dǎo)致浪費,而且對于協(xié)程池也沒有動態(tài)的擴容和縮小。所以我又去看了一下ants的設(shè)計和實現(xiàn)。

ants

ants是一個受fasthttp啟發(fā)的高性能協(xié)程池, fasthttp號稱是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各種池化技術(shù)(這個日后再開新坑去讀源碼), ants相比之前兩種協(xié)程池,其模型更像是之前接觸到的數(shù)據(jù)庫連接池,需要從空余的worker中取出一個來執(zhí)行任務(wù), 當(dāng)無可用空余worker的時候再去創(chuàng)建,而當(dāng)pool的容量達到上線之后,剩余的任務(wù)阻塞等待當(dāng)前進行中的worker執(zhí)行完畢將worker放回pool, 直至pool中有空閑worker。 ants在內(nèi)存的管理上做得很好,除了定期清除過期worker(一定時間內(nèi)沒有分配到任務(wù)的worker),ants還實現(xiàn)了一種適用于大批量相同任務(wù)的pool, 這種pool與一個需要大批量重復(fù)執(zhí)行的函數(shù)鎖綁定,避免了調(diào)用方不停的創(chuàng)建,更加節(jié)省內(nèi)存。

先看一下ants的pool 結(jié)構(gòu)體 (pool.go)

type Pool struct {
  // 協(xié)程池的容量 (groutine數(shù)量的上限)
  capacity int32
  // 正在執(zhí)行中的groutine
  running int32
  // 過期清理間隔時間
  expiryDuration time.Duration
  // 當(dāng)前可用空閑的groutine
  workers []*Worker
  // 表示pool是否關(guān)閉
  release int32
  // lock for synchronous operation.
  lock sync.Mutex
  // 用于控制pool等待獲取可用的groutine
  cond *sync.Cond
  // 確保pool只被關(guān)閉一次
  once sync.Once
  // worker臨時對象池,在復(fù)用worker時減少新對象的創(chuàng)建并加速worker從pool中的獲取速度
  workerCache sync.Pool
  // pool引發(fā)panic時的執(zhí)行函數(shù)
  PanicHandler func(interface{})
}

接下來看pool的工作單元 worker (worker.go)

type Worker struct {
  // worker 所屬的poo;
  pool *Pool
  // 任務(wù)隊列
  task chan func()
  // 回收時間,即該worker的最后一次結(jié)束運行的時間
  recycleTime time.Time
}

執(zhí)行worker的代碼 (worker.go)

func (w *Worker) run() {
  // pool中正在執(zhí)行的worker數(shù)+1
  w.pool.incRunning()
  go func() {
    defer func() {
      if p := recover(); p != nil {
        //若worker因各種問題引發(fā)panic, 
        //pool中正在執(zhí)行的worker數(shù) -1,     
        //如果設(shè)置了Pool中的PanicHandler,此時會被調(diào)用
        w.pool.decRunning()
        if w.pool.PanicHandler != nil {
          w.pool.PanicHandler(p)
        } else {
          log.Printf("worker exits from a panic: %v", p)
        }
      }
    }()
    
    // worker 執(zhí)行任務(wù)隊列
    for f := range w.task {
      //任務(wù)隊列中的函數(shù)全部被執(zhí)行完后,
      //pool中正在執(zhí)行的worker數(shù) -1, 
      //將worker 放回對象池
      if f == nil {
        w.pool.decRunning()
        w.pool.workerCache.Put(w)
        return
      }
      f()
      //worker 執(zhí)行完任務(wù)后放回Pool 
      //使得其余正在阻塞的任務(wù)可以獲取worker
      w.pool.revertWorker(w)
    }
  }()
}

了解了工作單元worker如何執(zhí)行任務(wù)以及與pool交互后,回到pool中查看其實現(xiàn), pool的核心就是取出可用worker提供給任務(wù)執(zhí)行 (pool.go)

// 向pool提交任務(wù)
func (p *Pool) Submit(task func()) error {
  if 1 == atomic.LoadInt32(&p.release) {
    return ErrPoolClosed
  }
  // 獲取pool中的可用worker并向其任務(wù)隊列中寫入任務(wù)
  p.retrieveWorker().task <- task
  return nil
}


// **核心代碼** 獲取可用worker
func (p *Pool) retrieveWorker() *Worker {
  var w *Worker

  p.lock.Lock()
  idleWorkers := p.workers
  n := len(idleWorkers) - 1
 // 當(dāng)前pool中有可用worker, 取出(隊尾)worker并執(zhí)行
  if n >= 0 {
    w = idleWorkers[n]
    idleWorkers[n] = nil
    p.workers = idleWorkers[:n]
    p.lock.Unlock()
  } else if p.Running() < p.Cap() {
    p.lock.Unlock()
    // 當(dāng)前pool中無空閑worker,且pool數(shù)量未達到上線
    // pool會先從臨時對象池中尋找是否有已完成任務(wù)的worker,
    // 若臨時對象池中不存在,則重新創(chuàng)建一個worker并將其啟動
    if cacheWorker := p.workerCache.Get(); cacheWorker != nil {
      w = cacheWorker.(*Worker)
    } else {
      w = &Worker{
        pool: p,
        task: make(chan func(), workerChanCap),
      }
    }
    w.run()
  } else {
    // pool中沒有空余worker且達到并發(fā)上限
    // 任務(wù)會阻塞等待當(dāng)前運行的worker完成任務(wù)釋放會pool
    for {
      p.cond.Wait() // 等待通知, 暫時阻塞
      l := len(p.workers) - 1
      if l < 0 {
        continue
      }
      // 當(dāng)有可用worker釋放回pool之后, 取出
      w = p.workers[l]
      p.workers[l] = nil
      p.workers = p.workers[:l]
      break
    }
    p.lock.Unlock()
  }
  return w
}

// 釋放worker回pool
func (p *Pool) revertWorker(worker *Worker) {
  worker.recycleTime = time.Now()
  p.lock.Lock()
  p.workers = append(p.workers, worker)
  // 通知pool中已經(jīng)獲取鎖的groutine, 有一個worker已完成任務(wù)
  p.cond.Signal()
  p.lock.Unlock()
}

在批量并發(fā)任務(wù)的執(zhí)行過程中, 如果有超過5納秒(ants中默認(rèn)worker過期時間為5ns)的worker未被分配新的任務(wù),則將其作為過期worker清理掉,從而保證pool中可用的worker都能發(fā)揮出最大的作用以及將任務(wù)分配得更均勻
(pool.go)

// 該函數(shù)會在pool初始化后在協(xié)程中啟動
func (p *Pool) periodicallyPurge() {
  // 創(chuàng)建一個5ns定時的心跳
  heartbeat := time.NewTicker(p.expiryDuration)
  defer heartbeat.Stop()

  for range heartbeat.C {
    currentTime := time.Now()
    p.lock.Lock()
    idleWorkers := p.workers
    if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
      p.lock.Unlock()
      return
    }
    n := -1
    for i, w := range idleWorkers {
      // 因為pool 的worker隊列是先進后出的,所以正序遍歷可用worker時前面的往往里當(dāng)前時間越久
      if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
        break
      }  
      // 如果worker最后一次運行時間距現(xiàn)在超過5納秒,視為過期,worker收到nil, 執(zhí)行上述worker.go中 if n == nil 的操作
      n = i
      w.task <- nil
      idleWorkers[i] = nil
    }
    if n > -1 {
      // 全部過期
      if n >= len(idleWorkers)-1 {
        p.workers = idleWorkers[:0]
      } else {
      // 部分過期
        p.workers = idleWorkers[n+1:]
      }
    }
    p.lock.Unlock()
  }
}

測試

func TestAnts(t *testing.T) {
  wg := sync.WaitGroup{}
  pool, _ := ants.NewPool(20)
  defer pool.Release()
  for i := 0; i < 100; i++ {
    wg.Add(1)
    pool.Submit(sendMail(i, &wg))
  }
  wg.Wait()
}

func sendMail(i int, wg *sync.WaitGroup) func() {
  return func() {
    time.Sleep(time.Second * 1)
    fmt.Println("send mail to ", i)
    wg.Done()
  }
}

golang協(xié)程池設(shè)計詳解

這里雖只簡單的測試批量并發(fā)任務(wù)的場景, 如果大家有興趣可以去看看ants的壓力測試, ants的吞吐量能夠比原生groutine高出N倍,內(nèi)存節(jié)省10到20倍, 可謂是協(xié)程池中的神器。

借用ants作者的原話來說:

然而又有多少場景是單臺機器需要扛100w甚至1000w同步任務(wù)的?基本沒有??!結(jié)果就是造出了屠龍刀,可是世界上沒有龍?。∫彩菬o情…

Over

一口氣從簡單到復(fù)雜總結(jié)了三個協(xié)程池的實現(xiàn),受益匪淺, 感謝各開源庫的作者, 雖然世界上沒有龍,但是屠龍技是必須練的,因為它就像存款,不一定要全部都用了,但是一定不能沒有!

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。

網(wǎng)站名稱:golang協(xié)程池設(shè)計詳解
標(biāo)題URL:http://bm7419.com/article16/gejegg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站營銷、網(wǎng)站導(dǎo)航、網(wǎng)站排名、標(biāo)簽優(yōu)化、域名注冊、

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)