GO跨协程异常处理

Posted by kzdgt on Thursday, July 4, 2024

Go语言的跨协程异常处理 (taoshu.in)

Go语言内置协程,这极大降低了并发编程的门槛。但这并不意味着并发编程的难度降低了。多协程的报错协同处理就是难点之一。为此官方提供了 errgroup 包。但这个包无法处理协程 panic 的问题。在 Go 语言中,我们无法在父协程里捕获子协程的异常(panic)。如果服务有异常没有处理,整个进程都会退出。这会产生很多不可预知的问题。今天就给大家分享一种跨协程的的异常处理方案。

先给出一个异常示例:

func main() {
  // 希望捕获所有所有 panic
  defer func() {
    r := recover()
    fmt.Println(r)
  }()

  // 启动新协程
  go func() {
    panic(123)
  }()
  // 等待一下,不然协程可能来不及执行
  time.Sleep(1 * time.Second)
  fmt.Println("这条消息打印不出来")
}

自己执行一下就会发现,最后的fmt.Println没有执行,也就是说开始的recover()没有捕获协程中的panic,整个进程都退出了。

一般的Go框架都是针对每一个请求启一个新协程,并统一捕获所有的panic。

如果程序员在写业务代码的时候开了新协程而且忘记了在协程中捕获panic的话,服务的进程就会因为某个未捕获的panic而退出。所以,我在Sniper一个轻量级Go业务框架的思考一文中建议少用或者不用协程

少用或不用协程对于大多数业务场景是没有问题。但有些少数场景还是要用协程的。

比如说我司有一个批量查询接口,一次最多支持查50条信息。但我们的业务要求一次要查100到1000条信息不等。如果不使用协程,需要依次执行2到10次查询,可能就超时了。

不用协程会超时,用协程可能会panic,怎么办呢?最好的办法就是引入框架方法,把Go的协程包装一下。

我们首先定义一个结构体

// Panic 子协程 panic 会被重新包装,添加调用栈信息
type Panic struct {
  R     interface{} // recover() 返回值
  Stack []byte      // 当时的调用栈
}

func (p Panic) String() string {
  return fmt.Sprintf("%v\n%s", p.R, p.Stack)
}

type PanicGroup struct {
  panics chan Panic // 协程 panic 通知信道
  dones  chan int   // 协程完成通知信道
  jobN   int32      // 协程并发数量
}

因为Go的channel无法直接初始化,所以需要定义一个工厂方法

func NewPanicGroup() *PanicGroup {
  return &PanicGroup{
    panics: make(chan Panic, 8),
    dones:  make(chan int, 8),
  }
}

针对PanicGroup我们定义GoWait两个方法。

Go方法用来启动新的协程,定义如下:

func (g *PanicGroup) Go(f func()) *PanicGroup {
  atomic.AddInt32(&g.jobN, 1)
  go func() {
    defer func() {
      if r := recover(); r != nil {
        g.panics <- Panic{R: r, Stack: debug.Stack()}
        return
      }
      g.dones <- 1
    }()
    f()
  }()

  return g // 方便链式调用
}

这里的核心思想就是启动一个带有recover()的协程,捕获f()执行过程产生的panic并写到g.dones。每个协程启动的时候将计数器加一,方便Wait方法等待。

Wait方法用来等待所有协程结束或者panic,定义如下

func (g *PanicGroup) Wait(ctx context.Context) error {
  for {
    select {
    case <-g.dones:
      if atomic.AddInt32(&g.jobN, -1) == 0 {
        return nil
      }
    case p := <-g.panics:
      panic(p)
    case <-ctx.Done():
      return ctx.Err()
    }
  }
}

如果所有协程都结束,g.jobN就会等于零,Wait就会返回。如果有协程发生panic,Wait就会通过p := <-g.panics捕获并重新拋出。这个时候,框架就能处理这个panic了。

使用起来也是非常的方便

g := NewPanicGroup()
g.Go(f1)
g.Go(f2)
g.Wait(ctx)
// 也可以链式调用
err := NewPanicGroup().Go(f1).Go(f2).Wait(ctx)

PanicGroup的本质就是各个子协程各自捕获自己的panic,并通过channel传给父协程(传递的时候带上调用栈信息)。父协程拿到panic信息后重新panic,看起来就像是在父协程拋出来一样。如此就绕过了Go不能在父协程捕获子协程panic的问题,也就避免了部分业务逻辑panic导致整个服务进程退出的问题

「真诚赞赏,手留余香」

kzdgt Blog

真诚赞赏,手留余香

使用微信扫描二维码完成支付