跳转至

06 并发等待:如何降低实时系统的响应延时?

你好,我是徐逸。

在上节课中,我们学习了如何借助资源复用来降低Golang运行时的资源损耗,从而提升单机吞吐。在资源复用的技巧里,我们重点强调了协程池这一关键技巧。

有了协程并发的运用,自然少不了并发相关的技术。所以在今天的课程里,我们就来聊一聊协程并发等待技术——WaitGroup类型和errgroup包。只有熟练地掌握这些并发技术,我们才能够在面对各种并发场景时,更快、更好地解决各种并发问题。

首先,我们来想一个问题:倘若我们需要向多个下游发起并发请求,并且必须得等待所有请求返回时,你会用Golang的哪个并发类型来实现呢?

WaitGroup类型

当面对这种场景时,常规解决方法是用Golang的基础并发类型WaitGroup。WaitGroup的作用是阻塞等待多个并发任务执行完成。WaitGroup类型主要包含下面几个方法。

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
  • 第一个是 Add方法,在任务运行之前,需要调用Add方法,用于设置需要等待完成的任务数,Add方法传进去的数值之和,需要和任务数相等。
  • 第二个是 Done方法,每个任务完成时,需要调用Done方法,用于告知WaitGroup对象已经有一个任务运行完成。
  • 第三个是 Wait方法,当需要等待所有并发任务完成时,调用Wait方法,用于阻塞主协程。

知道了WaitGroup类型的API,那我们该如何用WaitGroup来实现并发调用时的阻塞等待功能呢?

下面我为你准备了一段代码,你可以参考这段代码实现并发等待功能。

import (
    "sync"
)

var urls = []string{
    "http://www.golang.org/",
    "http://www.google.com/",
    "http://www.somestupidname.com/",
}

func TestWaitGroup(t *testing.T) {
    // 创建WaitGroup对象
    wg := sync.WaitGroup{}
    results := make([]string, len(urls))
    for index, url := range urls {
        url := url
        index := index
        // 在创建协程执行任务之前,调用Add方法
        wg.Add(1)
        go func() {
            // 任务完成后,调用Done方法
            defer wg.Done()
            // Fetch the URL.
            resp, err := http.Get(url)
            if err != nil {
                return
            }

            defer resp.Body.Close()
            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return
            }
            results[index] = string(body)

        }()
    }
    // 主协程阻塞,等待所有的任务执行完成
    wg.Wait()
}

这段代码的核心是下面几步。

  1. 首先,我们需要创建新的WaitGroup类型对象。
  2. 接着,在每次创建协程之前,需要调用Add方法设置需要执行的任务数。
  3. 然后,在每个任务运行完成后调用Done方法,告知WaitGroup对象有一个任务已经运行完成。
  4. 最后,在主协程中,我们需要调用Wait方法阻塞等待所有任务完成。

errgroup包

虽然使用WaitGroup类型可以实现并发等待功能,但是 WaitGroup 类型在错误处理方面存在一定的功能局限性。比如,当某个并发任务产生错误时,我们难以便捷地将该错误信息传递至主协程进行处理,而且无法有效地中止其他待运行或运行中的任务。

为了方便我们对并发任务的错误进行处理,Golang为我们提供了极为便捷的并发扩展库 ——errgroup 包。errgroup包的核心是 Group 类型,Group 类型是对WaitGroup的封装,在并发等待的基础功能之上,它额外提供了一系列实用的扩展功能。

错误处理:如何在主协程中获取并发任务错误信息?

首先咱们来看看错误处理功能,也就是当并发任务执行出错的时候,主协程能获取到出错信息并进行处理。

这就涉及到Group 类型提供的两个重要方法。

  • 首先是 Go 方法,该方法的参数为具有错误返回值的函数类型,在Go方法内部会在一个独立的协程中去运行所传入的这个函数,这样就能实现并发执行的效果。
  • 其次便是 Wait 方法,它与 WaitGroup 类型的 Wait 方法有点像,Group类型的 Wait 方法同样会阻塞等待所有传入到 Go 方法中的函数全部运行完毕。然而,二者的不同之处也十分明显,Group 类型的 Wait 方法具备一个 error 类型的返回值。如果传入Go 方法的函数运行返回了错误,那么此 Wait 方法将会返回该错误信息。当然,在多个传入的函数中都出现了错误时,它只会返回所遇到的第一个错误。
func (g *Group) Go(f func() error)
func (g *Group) Wait() error

在了解了 Group 类型的 API 之后,我们再看看如何使用errgroup包。

用errgroup包实现并发等待和错误处理功能的示例代码如下。

import (
    "golang.org/x/sync/errgroup"
)

func TestErrHandle(t *testing.T) {
    results := make([]string, len(urls))
    // 创建Group类型
    g := new(errgroup.Group)
    for index, url := range urls {
        // Launch a goroutine to fetch the URL.
        url := url
        index := index
        // 调用Go方法
        g.Go(func() error {
            // Fetch the URL.
            resp, err := http.Get(url)
            if err != nil {
                return err // 返回错误
            }
            defer resp.Body.Close()
            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return err // 返回错误
            }
            results[index] = string(body)
            return nil
        })
    }
    // Wait for all HTTP fetches to complete.
    // 等待所有任务执行完成,并对错误进行处理
    if err := g.Wait(); err != nil {
        fmt.Println("Failured fetched all URLs.")
    }
}

这段代码也很好理解,核心是下面几步。

  1. 第一步,我们要创建 Group 类型的对象。
  2. 第二步,在 Group 的 Go 方法中传入那些需要并发运行的函数。特别需要注意的是,这些传入的函数必须将错误返回
  3. 第三步,也是最后一步,在主协程中,我们需要调用 Group对象的 Wait 方法。通过这一调用,主协程将会阻塞等待,直至所有通过 Go 方法传入的任务都执行完毕。并且,在任务完成后,我们还能够对 Wait 方法所返回的错误进行处理。

任务取消:如何中止并发任务的运行?

除了错误处理功能,errgroup包提供的任务取消功能也相当实用。所谓任务取消功能,是指一旦在多个并发任务中有一个任务执行失败,我们能够中止那些尚未开始执行或者是正在执行过程中的其他并发任务。这样一来,就可以避免因继续执行这些不必要的任务而导致资源浪费。

而要实现任务取消功能,除了我们之前学到的Group 类型的 Go 方法和 Wait 方法,还有一个极为关键的核心方法,那就是 WithContext 函数。借助这个函数,我们能够基于context来创建 Group 对象,这样在任务执行报错时,我们就可以利用context来停止所有相关任务。

func WithContext(ctx context.Context) (*Group, context.Context)

在了解了WithContext 函数之后,现在让我们errgroup包来实现任务取消功能。

如同下面的代码,和前面实现错误处理功能不同,用errgroup包来实现任务取消功能,有两个核心要点。

第一点,需要用WithContext函数创建Group对象。

第二点,在传入Go方法的函数中,需要实现 select-done模式,也就是当函数运行时,发现context被取消,则直接返回,从而避免执行业务逻辑。

func TestCancel(t *testing.T) {
    results := make([]string, len(urls))
    // 用WithContext函数创建Group对象
    eg, ctx := errgroup.WithContext(context.Background())
    for index, url := range urls {
        url := url
        index := index
        // 调用Go方法
        eg.Go(func() error {
            select {
            case <-ctx.Done(): // select-done模式取消运行
                return errors.New("task is cancelled")
            default:
                // Fetch the URL.
                resp, err := http.Get(url)
                if err != nil {
                    return err // 返回错误
                }
                defer resp.Body.Close()
                body, err := io.ReadAll(resp.Body)
                if err != nil {
                    return err // 返回错误
                }
                results[index] = string(body)
                return nil
            }
        })
    }
    // Wait for all HTTP fetches to complete.
    // 等待所有任务执行完成,并对错误进行处理
    if err := eg.Wait(); err != nil {
        fmt.Println("Failured fetched all URLs.")
    }
}

协程数限制:如何控制并发运行的最大协程数?

除了错误处理和任务取消功能,errgroup包还可以限制同时并发运行的最大协程数。它的核心是 SetLimit方法

如果我们用SetLimit方法设置了可同时运行的最大协程数,当调用Go方法时,一旦达到了最大协程数,就会阻塞创建新协程运行任务,直到有协程运行完,才可以创建新协程。

func (g *Group) SetLimit(n int)

现在让我们基于SetLimit方法,来实现限制并发运行的最大协程数功能。如同下面的代码一样,限制并发运行的最大协程数,核心是调用SetLimit方法。

func TestLimitGNum(t *testing.T) {
    results := make([]string, len(urls))
    // 用WithContext函数创建Group对象
    eg, ctx := errgroup.WithContext(context.Background())
    // 调用SetLimit方法,设置可同时运行的最大协程数
    eg.SetLimit(2)
    for index, url := range urls {
        url := url
        index := index
        // 调用Go方法
        eg.Go(func() error {
            select {
            case <-ctx.Done(): // select-done模式取消运行
                return errors.New("task is cancelled")
            default:
                // Fetch the URL.
                resp, err := http.Get(url)
                if err != nil {
                    return err // 返回错误
                }
                defer resp.Body.Close()
                body, err := io.ReadAll(resp.Body)
                if err != nil {
                    return err // 返回错误
                }
                results[index] = string(body)
                return nil
            }
        })
    }
    // Wait for all HTTP fetches to complete.
    // 等待所有任务执行完成,并对错误进行处理
    if err := eg.Wait(); err != nil {
        fmt.Println("Failured fetched all URLs.")
    }
}

errgroup是如何实现的?

前面我介绍了errgroup在不同功能场景的使用,不过咱们还可以继续往深了挖挖,看看 errgroup 底层到底是咋实现的。要是把 errgroup 包的实现搞清楚了,那我们自己设计并发程序的本事也能往上提一大截。

首先,咱们来看看 errgroup 包 Group 类型的数据结构,它有几个重要的成员变量。

type token struct{}

type Group struct {
    cancel func(error) // 这个作用是為了前面說的 WithContext 而來的

    wg sync.WaitGroup // errGroup底层的阻塞等待功能,就是通过WaitGroup实现的

    sem chan token // 用于控制最大运行的协程数

    err     error // 最后在Wait方法中返回的error
    errOnce sync.Once // 用于安全的设置err
}
  • 先说 cancel 变量,它就是为了前面讲的 WithContext 功能设计的,专门用来控制取消任务运行。
  • sync.WaitGroup 类型的变量 wg ,Group 类型在底层是靠封装它来实现阻塞等待功能的。
  • 通道类型变量 sem ,用来控制同时能跑的最大协程数量。
  • error 类型的变量 err ,在 Group 类型的 Wait 方法里,它会被返回给调用者。
  • 最后就是sync.Once 类型变量,它的用处就是能安全地设置 err 变量。在好多协程一起并发跑的情况下,它能保证 err 变量只设置一次,而且是并发安全的。

接着,咱们来看看 WithContext函数SetLimit方法,它们分别是任务取消功能和协程数限制的核心方法。

就像下面的代码一样,WithContext函数内部会生成可以被取消的context类型对象ctx,并且会生成cancel函数变量,封装在Group对象中。SetLimit方法用于设置通道容量

func WithContext(ctx context.Context) (*Group, context.Context) {
    ctx, cancel := withCancelCause(ctx)
    return &Group{cancel: cancel}, ctx // 生成有取消功能的context
}

func (g *Group) SetLimit(n int) {
    g.sem = make(chan token, n) // 设置通道容量
}

然后,咱们来看看Go方法,这个方法是Group类型的核心方法。

func (g *Group) Go(f func() error) {
    if g.sem != nil {
        g.sem <- token{} // 通道满则阻塞,用来控制最大并发数
    }

    g.wg.Add(1)
    go func() {
        defer g.done() // 底层调用g.wg.Done()

        if err := f(); err != nil {
            g.errOnce.Do(func() { // 安全的设置err变量
                g.err = err
                if g.cancel != nil {
                    g.cancel(g.err) // 任务运行出错,调用g.cancel方法,用context控制其它任务是否中止运行
                }
            })
        }
    }()
}

func (g *Group) done() {
    if g.sem != nil {
        <-g.sem // 协程运行完,通道读
    }
    g.wg.Done()
}

咱们一起来分析下这个方法的实现。

  1. 假如我们调用SetLimit方法设置了通道容量,当需要创建协程之前,在Go方法的第3行会往通道写,当通道已经被写满时则阻塞不创建协程。当协程运行完时,在Go方法第8行的done方法内,会读通道。
  2. 第6行和第8行,实际上就是调用WaitGroup类型的Add和Done方法来实现并发等待功能。
  3. 第10-12行,当传入Go方法的函数调用出错时,通过errOnce安全地设置err变量,这个错误便是Wait方法收到的错误。
  4. 第14行,调用g.cancel函数来让ctx.Done()能读到值,达到中止其它任务运行的目的。

最后,让我们来看看Wait方法。

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
    g.wg.Wait()
    if g.cancel != nil {
            g.cancel(g.err)
    }
    return g.err
}

Wait方法比较简单,核心就是调用g.wg来阻塞等待,并将Go方法中设置的g.err错误返回,从而实现错误处理功能。

小结

在今天的课程里,我们学习了在并发等待场景里大显身手的关键技术,也就是 WaitGroup 类型以及 errgroup 包。通过并发等待,咱们可以将多个下游的串行请求变成并发请求,从而降低响应延时。

现在,让我们再来回顾一下这两个并发等待技巧知识。

  • WaitGroup 类型。当我们手头有一个规模较大的任务时,为了提高执行效率,我们可以巧妙地将其拆分成多个子任务,然后让这些子任务并发运行。而此时,如果我们还需要等待所有子任务都顺利执行完毕,那么 WaitGroup 类型就该闪亮登场啦,它能够精准地满足这一需求。
  • errgroup 包。实际上,errgroup 包可以看作是对 WaitGroup 类型的升级与封装。当我们在实际开发中,不仅需要并发运行任务,还得周全地考虑对可能出现的错误进行妥善处理、能够灵活地取消任务以及精准地控制最大协程数等复杂需求,这时errgroup 包无疑就是我们的最佳选择。

希望你能够用心去体会 WaitGroup 类型和 errgroup 包在实际场景中的应用。在今后遇到并发等待场景时,不妨试试用errgroup包,相信它会给你带来意想不到的便利与高效。

思考题

当多任务并发运行时,会遇到这样一种特殊的场景:只有当所有并发的任务都出现错误时,整个请求才会返回错误信息;而只有部分任务出错时,请求也不会返回错误。那么我们究竟该怎么在主协程里获取每个并发任务的执行出错信息呢?

欢迎你把你的答案分享在评论区,也欢迎你把这节课的内容分享给需要的朋友,我们下节课再见!

精选留言(2)
  • lJ 👍(2) 💬(1)

    可以基于 WaitGroup 和一个错误通道(error channel,容量不小于控制的协程并发数,避免阻塞)来收集并发协程中的错误信息,任务协程发生的错误写入err channel中,开启一个协程读取err channel。也可以初始化一个全局的错误切片errs,在协程出错时将err赋值给errs。 开源实现有 1、github.com/facebookgo/errgroup,扩展了标准库的sync.WaitGroup,errors收集协程执行时发生的错误 type Group struct { wg sync.WaitGroup mu sync.Mutex errors MultiError } 2、github.com/vardius/gollback,gollback.All方法返回所有的错误 本质上都是通过err切片保存任务协程中的错误信息

    2024-12-20

  • 👍(1) 💬(1)

    func TestCancel(t *testing.T) { results := make([]string, len(urls)) // 用WithContext函数创建Group对象 eg, ctx := errgroup.WithContext(context.Background()) for index, url := range urls { url := url index := index // 调用Go方法 eg.Go(func() error { select { case <-ctx.Done(): // select-done模式取消运行 return errors.New("task is cancelled") default: // Fetch the URL. resp, err := http.Get(url) if err != nil { return err // 返回错误 } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err // 返回错误 } results[index] = string(body) return nil } }) } // Wait for all HTTP fetches to complete. // 等待所有任务执行完成,并对错误进行处理 if err := eg.Wait(); err != nil { fmt.Println("Failured fetched all URLs.") } } 这个方法感觉直接用可以cancle的context会不会比较好,如果检测到错误直接cancel掉。现在感觉这个例子虽然支持cancel,虽然在讲cancel,但是例子没有支持cancel,看起来怪怪的。

    2024-12-20