lyyyuna 的小花园

动静中之动, by

RSS

Golang 并发模式 - Context

发表于 2020-05

前言

本文翻译+删选+理解自 Go Concurrency Patterns: Context

在使用 Go 编写的服务器程序中,每个请求都由一个 goroutine 来处理,通常这些请求又会启动额外的 goroutines 来访问后台数据库或者调用 RPC 服务。这些与同一个请求相关的 goroutines,常常需要访问同一个特定的资源,比如用户标识,认证 token 等等。当请求取消或者超时时,所有相关的 goroutines 都应该快速退出,这样系统才能回收不用的资源。

为此,Google 公司开发了context包。该库可以跨越 API 边界,给所有 goroutines 传递请求相关的值、取消信号和超时时间。这篇文章会介绍如何使用context库,并给出一个完整的例子。

Context

context包的核心是Context结构体:

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

Done方法返回一个 channel,它会给Context上所有的函数发送取消信号,当 channel 关闭时,这些函数应该终止剩余流程立即返回。Err方法返回的错误指出了Context为什么取消。Pipelines and Cancelation中讨论了Done的具体用法。

由于接收和发送信号的通常不是同一个函数,Context并没有提供Cancel方法,基于相同的理由,Donechannel 只负责接收。尤其当父操作开启 goroutines 执行子操作时,子操作肯定不能取消父操作。作为替代,WithCancel函数可以用来取消Context

Context对 goroutines 来说是并发安全的,你可以将单个Context传递给任意数量的 goroutines,然后取消该Context给这些 goroutines 同时发送信号。

Deadline方法用于判断函数究竟要不要运行,比如截止时间将近时,运行也就没必要了。代码可依此为 I/O 操作设置超时时间。

Value方法则为Context存储了请求所有的数据,访问这些数据必须是并发安全的。

Context 派生

使用Context包提供的方法可以从已有的Context值派生出新值。这些派生出的值逻辑上构成了一棵树:当根Context取消,其派生出的子Context也会跟着取消。

Background是所有Context树的根,它永远不会被取消:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

WithCancelWithTimeout函数返回的派生Context值,可以先于父值取消。当请求的回调函数返回后,与请求相关的Context即可被取消。当有多个备份后台程序同时提供服务时,WithCancel可用于去除多余的请求。WithTimeout则可用于为请求设置超时时间。

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue则在Context上存储了请求相关的值:

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

例子:谷歌网页搜索

本例子提供一个 HTTP 服务器,接收类似/search?q=golang&timeout=1s的 GET 请求,并把查询参数值"golang"转推到谷歌网页搜索API。参数timeout告诉服务器如果谷歌 API 超时就取消请求。

代码分为三个包:

server 程序

server中,请求回调创建了一个名为ctxContext值,当回调退出时,延迟函数defer cancel()即执行取消操作。如果请求的 URL 带有 timeout 参数,那超时后Context会自动取消:

func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

回调函数从请求中获取参数信息,并调用userip包获取客户端 IP 地址。由于后台服务中会使用到客户端 IP 地址,故需要将此存储于ctx中:

    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Store the user IP in ctx for use by code in other packages.
    userIP, err := userip.FromRequest(req)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    ctx = userip.NewContext(ctx, userIP)

传入ctxquery参数调用google.Search

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(ctx, query)
    elapsed := time.Since(start)

如果搜索成功,会渲染出结果:

    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }

userip 包

userip包提供了解析用户 IP 地址函数的包,同时会将 IP 地址存储于一个Context值中。Context提供了键值对存储,键值都是interface{}类型,键必须可比较,值必须是并发安全。userip包屏蔽了实现上的细节,并以强类型方式访问Context值。

为了避免键冲突,userip包首先定义一个非导出类型key,然后用该类型定义的值作为Context的键:

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

FromRequesthttp.Request解析出客户端 IP 地址userIP

func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }

NewContextuserIP存储于新建的Context中:

func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

FromContext则相反,从Context取出 IP 地址:

func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

google 包

google.Search函数对谷歌 API 发起 HTTP 请求,并解析 JSON 结果。该函数同时接收一个Context参数ctx,如果请求处理时,ctx.Done关闭了,则立即退出。

谷歌 API 会将搜索内容和用户 IP 地址userIP作为请求参数:

func Search(ctx context.Context, query string) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
    if err != nil {
        return nil, err
    }
    q := req.URL.Query()
    q.Set("q", query)

    // If ctx is carrying the user IP address, forward it to the server.
    // Google APIs use the user IP to distinguish server-initiated requests
    // from end-user requests.
    if userIP, ok := userip.FromContext(ctx); ok {
        q.Set("userip", userIP.String())
    }
    req.URL.RawQuery = q.Encode()

Search使用了辅助函数httpDo来发起和取消 HTTP 请求,辅助函数参数有一个是处理 HTTP 响应的闭包:

    var results Results
    err = httpDo(ctx, req, func(resp *http.Response, err error) error {
        if err != nil {
            return err
        }
        defer resp.Body.Close()

        // Parse the JSON search result.
        // https://developers.google.com/web-search/docs/#fonje
        var data struct {
            ResponseData struct {
                Results []struct {
                    TitleNoFormatting string
                    URL               string
                }
            }
        }
        if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
            return err
        }
        for _, res := range data.ResponseData.Results {
            results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
        }
        return nil
    })
    // httpDo waits for the closure we provided to return, so it's safe to
    // read results here.
    return results, err

httpDo函数会在新的 goroutine 中处理 HTTP 请求和响应,同时如果ctx.Done提前关闭,函数会直接退出。

为 Context 作代码适配

许多服务器框架已经存储了请求相关值,我们可以实现Context接口的所有方法,来为Context参数适配这些现有框架。而框架的使用者在调用代码时则需要多传入一个Context

参考实现:

  1. gorilla.go适配了 Gorilla 的github.com/gorilla/context
  2. tomb.go适配了Tomb