Skip to content

Latest commit

 

History

History
179 lines (150 loc) · 7.11 KB

concurrency.pipeline.md

File metadata and controls

179 lines (150 loc) · 7.11 KB

Overview

  1. Idioms for frustration-free go channel pipelines
    1. including context, error handling, timeouts, OpenTelemetry, etc.

Idioms for frustration-free pipelines

  1. Debugging
    1. See the debugging guide
  2. High-level Architecture
    1. One Source (first step/stage, streams data out to a channel)
    2. One Sink (last step/stage, consumes final results from a channel)
    3. Multiple intermediate processors connected via channels
    4. Everything running at the same time :-)
  3. Tools
    1. Use errgroup (an Official library)
      1. ... because it gives you functionality like Kotlin's coroutine scope
        1. Otherwise, you must manage your own WaitGroups
      2. ... because it propagates errors up to group.Wait()
        1. Otherwise you must add error channels and if blocks or select blocks everywhere
      3. ... because it handles context cancellation for the whole group of tasks
      4. ... because it handles rate limiting
      5. ... because the source code is simple, correct, tested, well documented, recommended in several books
        1. like this and this
  4. Control
    1. Make one control-flow-managing func
    2. Construct & setup the errGroup here
    3. Use g.Go(...) to start and to wait for subtasks
    4. See example below
  5. Subtasks
    1. See example below
    2. Spawn tasks using g.Go(...), not go
    3. The only counter-case is when you Wait() on the errGroup, like this:
      go func() {  // <-- only time you must use 'go' keyword directly
          err := g.Wait()
          // ... either handle the error here, or call g.Wait() again outside this goroutine
          
          close(finalChannelWhichSinkReads)
      }()
    4. Most of your functions should be "regular" go functions
      1. meaning they neither accept nor return a channel
      2. Counter-examples:
        1. functions that produce values slowly (meaning slow IO)
        2. functions that produce too many values to keep in memory
        3. These functions should accept the outCh and errCh as parameters (and return nothing)
  6. Context
    1. Propagate the errGroup to subtasks using context.Context
    2. Pass context parameter into subtasks (not errGroup parameter)
    3. Subtasks get the current errGroup from the context argument
      1. eg. g := ErrGroupFromContext(ctx)
    4. See example below
  7. Channels
    1. Ensure every channel has a closing strategy
    2. Only Sender closes the channel (never the reader/consumer)
    3. Only close when completely done writing, use defer
    4. Use buffering so channels aren't blocked and to avoid exhausting memory
  8. Waiting
    1. Let errgroup manage waiting for you
    2. Do NOT use your own WaitGroup,
  9. Errors
    1. Let the errGroup mange errors, just check err := g.Wait()
    2. you can call err := g.Wait() multiple times
      1. Useful if you need to Wait() and handle errors in different goroutines
  10. Cancellation
    1. Let the errGroup manage cancellation
    2. You can bind the errGroup to a context with a deadline
  11. Tracing
    1. Create spans inside, at the start of (some) subtasks
    2. Use context.Context to propagate SpanContext
    3. See tracing doc
    4. See example below

Example Subtask

g.Go(func() error {

    // -- start a span (for tracing)
    ctx, span := otel.Tracer("").Start(ctx, "doSomethingInteresing")
    defer span.End()

    // -- sender (me) responsible for closing outCh channel
    defer close(outCh)
    
    // -- do a subtask, subtask might start sub-subtasks using g.Go(...)
    result, err := doASubtaskHere(ctx, arg1, arg2, ...)
    if err != nil {

        // -- add helpful messages for tracing errors to their source
        otzap.AddErrorEvent(span, "failed to doSomething because Foo", err)
        
        // -- propagate error thru errGroup (errGroup handles cancellation)
        return err
    }
    
    // -- send the subtask result
    outCh <- result
})

Example: Propagate errGroup thru context.Context

type errGroupContextKeyType int

const currentErrGroupKey errGroupContextKeyType = iota

// ErrGroupFromContext returns the errGroup associated with the context, or nil.
func ErrGroupFromContext(ctx context.Context) *errgroup.Group {
    if g, ok := ctx.Value(currentErrGroupKey).(*errgroup.Group); ok {
        return g
    }

    return nil
}

// WithErrGroup returns a new context containing the given errgroup.Group.
func WithErrGroup(parent context.Context, g *errgroup.Group) context.Context {
    return context.WithValue(parent, currentErrGroupKey, g)
}

// NewErrGroup simplifies linking the errGroup and context.
func NewErrGroup(parent context.Context) (*errgroup.Group, context.Context) {
    g, ctx := errgroup.WithContext(parent)
    return g, WithErrGroup(ctx, g)
}

Example: Merge channels

// MergeChannels consumes all input channels,
// merges messages into single output channel
//
// use ctx for timeout, cancellation, pipeline termination
func MergeChannels[T any](
    ctx context.Context,
    chs []<-chan T,        
    outCh chan<- T,
) {
    g := ErrGroupFromContext(ctx)

    taskCount := len(chs)
    done := make(chan struct{}, taskCount)

    for _, current := range chs {
        ch := current // exclusive ref
        g.Go(func() error {

            // -- consume channel
            for req := range ch {
                outCh <- req
            }

            done <- struct{}{}
            return nil
        })
    }

    // -- Cleanup
    g.Go(func() error {
        defer close(doneWriting)
        defer close(outCh)

        for i := 0; i < taskCount; i++ {
            <-done // wait for signal on each subtask
        }

        // -- Invariant: consumed all input channels
        return nil
    })
}

Other Resources

  1. https://go.dev/blog/pipelines
  2. See concurrency.debug.md