The X-Files: Avoiding Concurrency Boilerplate With golang.org/­​x/​sync

Or abstracting common synchronization patterns in Go

This post is part of The X-Files, a blog series exploring the Go sub-repositories (golang.org/x/*).

Go makes concurrent programming dead simple with the go keyword. And, with its “share memory by communicating” mantra, channels perform as a thread-safe mechanism for IO between threads. These primitives more than meet the demands of most parallel tasks.

But sometimes workloads need extra coordination, particularly around error propagation or synchronization. For example, goroutines often need access to a shared yet thread-unsafe resource. Enter the standard library’s sync package with WaitGroup, Once, and Mutex. For the brave of heart, sync/atomic beckons. These utilities along with channels annihilate data races, but can result in a lot of nuanced boilerplate. Wouldn’t it makes sense to lift this code into portable abstractions?

Unfortunately, the sync package doesn’t provide for many higher-level patterns. But where the standard library is lacking, golang.org/x/sync picks up the slack. In this episode of The X-Files, I’ll cover the four tools provided by this subrepo and how they can fit into any project’s concurrent workflow.

The Action/Executor Pattern

The Action/Executor pattern from the Gang of Four, also known as the Command pattern, is pretty powerful. It abstracts a behavior (the action) from how it’s actually run (the executor). Here’s the basic interface for the two components:

// An Action performs a single arbitrary task.
type Action interface {
	// Execute performs the work of an Action. This method should make a best
	// effort to be cancelled if the provided ctx is cancelled.
	Execute(ctx context.Context) error
}

// An Executor performs a set of Actions. It is up to the implementing type
// to define the concurrency and open/closed failure behavior of the actions.
type Executor interface {
	// Execute performs all provided actions by calling their Execute method.
	// This method should make a best-effort to cancel outstanding actions if the
	// provided ctx is cancelled.
	Execute(ctx context.Context, actions []Action) error
}

// ActionFunc permits using a standalone function as an Action.
type ActionFunc func(context.Context) error

// Execute satisfies the Action interface, delegating the call to the
// underlying function.
func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) }

The Action interface type performs some arbitrary task; an Executor handles running a set of Actions together. It would be a nominal effort to create a sequential Executor. A concurrent implementation would be more useful, but requires more finesse. Namely, it’s imperative that this Executor handles errors well and synchronizes the Action lifecycles.

Error Propagation & Cancellation with errgroup

When I first started writing Go, I sought any excuse to use the concurrency features. It even led me towards writing a post about tee-ing an io.Reader for multiple goroutines to simultaneously consume. In my excitement, though, I failed to consider what happens if more than one goroutine errors out. Spoiler alert: the program would panic, as noticed two years after publication.

Capturing the first error and cancelling outstanding goroutines is an incredibly common pattern. A parallel Executor could be plumbed together with a mess of WaitGroups and channels to achieve this behavior. Or, it could use the errgroup subpackage, which abstracts this in an elegant way.

// Parallel is a concurrent implementation of Executor
type Parallel struct{}

// Execute performs all provided actions in concurrently, failing closed on the
// first error or if ctx is cancelled.
func (p Parallel) Execute(ctx context.Context, actions []Action) error {
	grp, ctx := errgroup.WithContext(ctx)

	for _, a := range actions {
		grp.Go(p.execFn(ctx, a))
	}

	return grp.Wait()
}

// execFn binds the Context and Action to the proper function signature for the
// errgroup.Group.
func (p Parallel) execFn(ctx context.Context, a Action) func() error {
	return func() error { return a.Execute(ctx) }
}

When first calling Parallel.Execute, an instance of errgroup.Group is created from the provided context.Context.1 This permits the caller to stop the whole shebang at any time. Then, instead of starting vanilla goroutines, each Action runs via grp.Go. The Group spawns goroutines with the boilerplate to capture errors from each.

Finally, grp.Wait() blocks until all the scheduled Actions complete. The method returns either the first error it receives from the Actions or nil. If any of the Actions produce an error, the ctx is cancelled and propagated through to the others. This allows them to short-circuit and return early.

One limitation hinted at above, however, is that errgroup.Group fails closed. Meaning if a single Action returns an error, the Group cancels all others, swallowing any additional errors. This may not be desirable, in which case a different pattern is necessary to fail open.

Looking at this code, it should be plain that errgroup cuts down on a lot of error-prone boilerplate. At the time of writing, there are just over 100 public packages importing Group to synchronize their goroutines. Notably, containerd leverages it for dispatching handlers and diff-ing directories.

Controlling Concurrent Access with semaphore

Parallel can create an unbounded quantity of goroutines for each call to Execute. This could have dangerous consequences if the Executor happens to be resource constrained or if an Action is reentrant. To prevent this, limiting in-flight calls and Actions will keep the system predictable.

In the previous X-Files post, I covered a usecase for the rate package to control access to a resource. While  rate.Limiter is good at managing temporal access to something, it’s not always appropriate for limiting concurrent use. In other words, a rate.Limiter permits some number of actions in a given time window, but cannot control if they all happen at the same instant. A mutex or weighted semaphore, alternatively, ignores time and concerns itself only with simultaneous actions. And, as it were, the semaphore subrepo has this usecase covered!

Before diving in, it’s worth mentioning semaphore.Weighted and sync.RWMutex from the standard library share some characteristics, but solve different problems. The two are often confused, and sometimes treated (incorrectly) as interchangeable. The following points should help clarify their differences in Go:

  • Ownership
    • Mutex: Only one caller can take a lock at a time. Because of this, mutexes can control ownership of a resource.
    • Semaphore: Multiple callers up to a specified weight can take a lock. This behaves like RWMutex.RLock, but enforces an upper bound. Since many callers could access the resource(s), semaphores do not control ownership. Instead, they act as a signal between tasks or regulate flow.
  • Blocking
    • Mutex: Obtaining a lock blocks forever until an unlock occurs in another thread. This has the potential to cause hard-to-debug deadlocks or goroutine leaks.
    • Semaphore: Like rate.Limiter, semaphore.Weighted permits cancellation via context.Context. It also supports a TryAcquire method which immediately returns if the desired weight is unavailable.

Since the goal here is to control the flow of actions through an Executor, a semaphore is more appropriate. Below demonstrates using a semaphore.Weighted to decorate any Executor.2

type flow struct {
	maxActions int64
	actions    *semaphore.Weighted
	calls      *semaphore.Weighted
	ex         Executor
}

// ControlFlow decorates an Executor, limiting it to a maximum concurrent
// number of calls and actions.
func ControlFlow(e Executor, maxCalls, maxActions int64) Executor {
	return &flow{
		maxActions: maxActions,
		calls:      semaphore.NewWeighted(maxCalls),
		actions:    semaphore.NewWeighted(maxActions),
		ex:         e,
	}
}

// Execute attempts to acquire the semaphores for the concurrent calls and
// actions before delegating to the decorated Executor. If Execute is called
// with more actions than maxActions, an error is returned.
func (f *flow) Execute(ctx context.Context, actions []Action) error {
	qty := int64(len(actions))

	if qty > f.maxActions {
		return fmt.Errorf("maximum %d actions allowed", f.maxActions)
	}

	// limit concurrent calls to Executor.Execute
	if err := f.calls.Acquire(ctx, 1); err != nil {
		return err
	}
	defer f.calls.Release(1)

	// limit total in-flight Actions, independent of Execute calls
	if err := f.actions.Acquire(ctx, qty); err != nil {
		return err
	}
	defer f.actions.Release(qty)

	// delegate Actions to decorated Executor
	return f.ex.Execute(ctx, actions)
}

The ControlFlow function composes around an existing Executor. It constructs the unexported flow struct with two semaphores: one for calls to Execute, the other for total Actions. As calls come in, flow checks if the number of Actions exceeds the max, erroring early if necessary. It then attempts to acquire the semaphores for 1 call and n Actions. This not only maintains an upper limit on concurrent Executes but also simultaneous Actions. Assuming there were no errors acquiring, the weights are released in defers. Finally, flow delegates the Actions to the underlying Executor.

It’s arguable whether the semaphore package is necessary or not. For instance, buffered channels can behave like Weighted; the Effective Go document even demonstrates utilizing one in this way. Yet, creating the max Actions semaphore would not be possible to do atomically with a channel. A different and more robust solution for limiting Actions would be a worker pool Executor. That said, there’s value in having the flow control independent of the Executor.

semaphore is nonexistent in the standard library and sees almost no use in open-sourced packages. Google Cloud’s PubSub SDK mirrors the same use of semaphores as in the above example, controlling the number of in-flight messages as well as their cumulative size. Besides this use case, I’d suggest passing on this package in favor of the worker pool.

Deduping Actions with singleflight

While ControlFlow limits concurrency, it has no concept of duplicates. Executing multiple expensive actions that are idempotent is a waste of resources and avoidable. An example of this occurs on the front-end, where browsers emit scroll or resize events for every frame that action is taking place. To prevent janky behavior, developers typically debounce their event handlers by limiting how often it triggers within the same period.

Having a similar functionality in an Executor would be a nice addition. And, as might be expected, the singleflight package provides this facility. Like errgroup, a singleflight.Group encapsulates work occurring across goroutines. It doesn’t spawn the goroutines itself; instead, goroutines should share a singleflight.Group. If two goroutines execute the same action, the Group blocks one call while the other runs. Once finished, both receive the same result, error, and a flag indicating if the result is shared. Mechanically, this is similar to sync.Once, but it permits subsequent operations with the same identifier to occur.

Right now, the Action interface has no notion of identification nor are interfaces comparable.  Creating a NamedAction is the first step in supporting singleflight:

// A NamedAction describes an Action that also has a unique
// identifier. This interface is used by the Debounce Executor
// to prevent duplicate actions from running concurrently.
type NamedAction interface {
	Action

	// ID returns the name for this Action. Identical actions
	// should return the same ID value.
	ID() string
}

type namedAction struct {
	ActionFunc
	name string
}

func (a namedAction) ID() string { return a.name }

// Named creates a NamedAction from fn, with n as its name. This
// function is just a helper to simplify creating NamedActions.
func Named(n string, fn ActionFunc) NamedAction {
	return namedAction{
		ActionFunc: fn,
		name:       n,
	}
}

Here, the NamedAction interface composes around an Action and attaches the ID method. ID should return a string uniquely identifying the Action. The Named function provides a handy helper to construct a NamedAction from an ActionFunc. Now, a debounce Executor can detect when an Action has a name and execute it within a singleflight.Group:

// Debounce wraps e, preventing duplicate NamedActions from running
// concurrently, even from separate calls to Execute.
func Debounce(e Executor) Executor {
	return debouncer{
		ex: e,
		sf: new(singleflight.Group),
	}
}

type debouncer struct {
	ex Executor
	sf *singleflight.Group
}

// Execute attaches a singleflight.Group to any NamedActions, effectively debouncing
// identical Actions if ran concurrently.
func (d debouncer) Execute(ctx context.Context, actions []Action) error {
	wrapped := make([]Action, len(actions))

	for i, a := range actions {
		if na, ok := a.(NamedAction); ok {
			// compose the NamedAction with the singleflight.Group
			wrapped[i] = debouncedAction{
				NamedAction: na,
				sf:          d.sf,
			}
		} else {
			// otherwise, pass it through untouched
			wrapped[i] = actions[i]
		}
	}

	// delegate wrapped Actions to decorated Executor
	return d.ex.Execute(ctx, wrapped)
}

type debouncedAction struct {
	NamedAction
	sf *singleflight.Group
}

func (da debouncedAction) Execute(ctx context.Context) error {
	// map the composed Action's Execute function with the expected signature
	// for singleflight.Group.Do.
	fn := func() (interface{}, error) {
		return nil, da.NamedAction.Execute(ctx)
	}

	_, err, _ := da.sf.Do(da.ID(), fn)

	return err
}

When Debounce receives a set of Actions, it checks for any NamedAction, wrapping them with a shared singleflight.Group. This new debounceAction delegates execution through to the group’s Do method. Now, any concurrent identical Actions are only run once. Note that this only prevents the same Action occurring at the same time; later events with the same name will still occur. For this to be effective, the underlying Executor must operate in parallel.

Honestly, this is a pretty limited application of singleflight.Group, considering the Actions only return errors. A more common use case is to have a group front reads to a database. Without singleflight, this would result in one query per request. With singleflight, any concurrent calls for the same record could be shared, significantly reducing load on the underlying store. If the result is a pointer, however, users of that value must be careful not to mutate it without creating a copy.

The singleflight package originally supported deduping of key fetches in groupcache. Today, it’s used for DNS lookups in the net stdlib package. The Docker builder also uses it in its FSCache implementation.

Avoiding the Mutex Dance with sync.Map (né syncmap)

While Action authors can attach logging and metrics ad hoc, doing it globally for all would be a way better approach. Another decorator that emits metrics for each operation performed by an Executor can achieve this, including name-scoped stats for each NamedAction. But first, the decorator needs a way to provide this data:

// StatSource creates metrics with the given name. The returned metrics should be
// concurrency-safe.
type StatSource interface {
	Timer(name string) Timer
	Counter(name string) Counter
}

// Timer emits the duration of a particular event. The duration value is
// typically used to measure latencies and create histograms thereof.
type Timer func(duration time.Duration)

// Counter emits any number of events happening at a given time. For example,
// Counters are often used to measure RPS.
type Counter func(delta int)

// A StatSet is the cached value.
type statSet struct {
	// Latency measures how long an Action takes
	Latency Timer
	// Success is incremented when an Action does not return an error
	Success Counter
	// Error is incremented when an Action results in an error
	Error Counter
}

// Cache describes a read-through cache to obtain
type statCache interface {
	// get returns a shared statSet for the given name, either from the cache or
	// a provided StatSource.
	get(name string) *statSet
}

The stats library, abstracted behind a StatSource interface, will return Timers and Counters. These metric types are cacheable for reuse between Actions. For each Action name, the statCache will hold a statSet, containing the metrics of interest. If the statCache.get encounters a name it hasn’t seen, it will create a new statSet from StatSource.

Now with the boilerplate out of the way, the metrics Executor will have more-or-less the same structure as Debounce:

type metrics struct {
	ex Executor
	stats statCache
}

// Execute emits latency, success, and error metrics for every action delegated to the
// decorated Executor. For NamedActions, additional name-scoped stats are also emitted.
func (m *metrics) Execute(ctx context.Context, actions []Action) error {
	wrapped := make([]Action, len(actions))
	global := m.stats.get("all_actions")

	for i, a := range actions {
		if na, ok := a.(NamedAction); ok {
			// composed the NamedAction with global and name-scoped stats
			wrapped[i] = namedStatAction{
				NamedAction: na,
				global:      global,
				stats:       m.stats.get(na.ID()),
			}
		} else {
			// otherwise, just compose with global stats
			wrapped[i] = statAction{
				Action: a,
				global: global,
			}
		}
	}

	// delegate wrapped Actions to decorated Executor
	return m.ex.Execute(ctx, wrapped)
}

type namedStatAction struct {
	NamedAction
	global *statSet
	stats  *statSet
}

func (a namedStatAction) Execute(ctx context.Context) error {
	return captureMetrics(ctx, a.NamedAction, a.global, a.stats)
}

type statAction struct {
	Action
	global *statSet
}

func (a statAction) Execute(ctx context.Context) error {
	return captureMetrics(ctx, a.Action, a.global, nil)
}

func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error {
	// execute the action, timing its latency
	start := time.Now()
	err := a.Execute(ctx)
	lat := time.Now().Sub(start)

	// create our counter values for error/success
	var errored, succeeded int
	if err != nil {
		errored = 1
	} else {
		succeeded = 1
	}

	// emit the global stats
	global.Latency(lat)
	global.Success(succeeded)
	global.Error(errored)

	// if there are name-scoped stats, emit those, too
	if stats != nil {
		stats.Latency(lat)
		stats.Success(succeeded)
		stats.Error(errored)
	}

	return err
}

So, how should statCache be implemented? At face value, this seems like a perfect usecase for a map–easy enough! However, this is one of Go’s most common gotchas: maps are not safe for concurrent use. With just a map, concurrent calls to the Executor would result in data races reading and writing to the cache. All’s not lost, though. Developers can synchronize access to their fields with the careful use of a sync.RWMutex. Below is a statCache implementation that leverages these two together with what I call the mutex dance.

// mutexCache implements statCache, backed by a map and sync.RWMutex
type mutexCache struct {
	src    StatSource
	mtx    sync.RWMutex
	lookup map[string]*statSet
}

func (mc *mutexCache) get(name string) *statSet {
	// take a read lock to see if the set already exists
	mc.mtx.RLock()
	set, ok := mc.lookup[name]
	mc.mtx.RUnlock()

	if ok { // the set exists, return it
		return set
	}

	// need to take a write lock to update the map
	mc.mtx.Lock()
	// While waiting for the write lock, another goroutine may have created the
	// set. Here, we check again after obtaining the lock before making a new one
	if set, ok = mc.lookup[name]; !ok {
		set = newStatSet(mc.src, name)
		mc.lookup[name] = set
	}
	mc.mtx.Unlock()

	return set
}

First, the cache obtains a read lock on its mutex to see if the set already exists in its map. Multiple callers to get can share a read lock so there is no steady-state contention to access the map. However, if the set does not exist, the caller must obtain a write lock to make changes to the underlying map. Only one write lock can exist concurrently, so execution stops until all read locks (or another write lock) are unlocked.

With the write lock in hand, it needs to check the map again for the set. This is because, while waiting for the lock, another goroutine may have added the set to the map. If it still does not exist, the set is added, the lock is released, and the value is returned from the cache.

This nuanced dance of locking and unlocking mutexes is error prone. One false step and incredibly hard to debug deadlocks will creep into the program. Without generics, code generation or some nasty reflection, generalizing this for any arbitrary map is also just not possible. Is there a better way?

Well, kind of: sync.Map. Originally a part of the sub-repository as syncmap.Map, this concurrency-safe map implementation was added to the standard library in Go 1.9. It was first proposed to address scaling issues with sync.RWMutex protected maps on machines with many (many) CPU cores. Since then, sync.Map has replaced map-based caches throughout the standard library. Check out its use in encoding/json, mime, and reflect.3

Below, is a reimplementation of statCache using sync.Map:

// syncMapCache implements statCache, backed by a sync.Map
type syncMapCache struct {
	src    StatSource
	lookup sync.Map
}

func (smc *syncMapCache) get(name string) *statSet {
	val, _ := smc.lookup.Load(name)
	if set, ok := val.(*statSet); ok {
		return set
	}

	// create a new statSet, but don't store it if one was added since the last
	// load. This is not ideal since we can't atomically create the set and
	// write it.
	set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name))
	return set.(*statSet)
}

A few things should stick out from this new implementation. First, sync.Map abstracts away all the synchronization boilerplate. While it gains readability and reliability, though, it also loses compiler-time type safety. sync.Map deals solely with interface{} as it’s not a builtin. Also, as the comment implies, this version cannot atomically create the set and commit it to the map. While LoadOrStore does not insert the new set if it already exists, it still pays the cost of calling newStatSet, which could be expensive.

So which is “better?” Depends on the goal. If readability and maintainability is the primary concern, use sync.Map. If it’s type-safety, I would recommend the mutex-based cache. If it’s performance, measure. And be sure to benchmark on hardware that is or resembles a production environment. Supposing this Mid-2015 MacBook Pro (4 cores, 8 threads) is close enough, here’s some benchmarks comparing the two implementations:

BenchmarkMutexCache/10-8            10000000	       180 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutexCache/100-8           10000000	       187 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutexCache/1000-8          10000000	       214 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutexCache/10000-8         10000000	       231 ns/op	       0 B/op	       0 allocs/op
BenchmarkMutexCache/100000-8         5000000	       254 ns/op	       2 B/op	       0 allocs/op
BenchmarkMutexCache/1000000-8        1000000	      1159 ns/op	     102 B/op	       1 allocs/op
BenchmarkMutexCache/10000000-8       1000000	      1481 ns/op	     184 B/op	       2 allocs/op
BenchmarkMutexCache/100000000-8      1000000	      1655 ns/op	     187 B/op	       3 allocs/op

BenchmarkSyncMapCache/10-8           5000000	       221 ns/op	       0 B/op	       0 allocs/op
BenchmarkSyncMapCache/100-8         10000000	       235 ns/op	       0 B/op	       0 allocs/op
BenchmarkSyncMapCache/1000-8        10000000	       235 ns/op	       0 B/op	       0 allocs/op
BenchmarkSyncMapCache/10000-8       10000000	       246 ns/op	       0 B/op	       0 allocs/op
BenchmarkSyncMapCache/100000-8       5000000	       264 ns/op	       5 B/op	       0 allocs/op
BenchmarkSyncMapCache/1000000-8      1000000	      1378 ns/op	     146 B/op	       3 allocs/op
BenchmarkSyncMapCache/10000000-8     1000000	      1939 ns/op	     237 B/op	       5 allocs/op
BenchmarkSyncMapCache/100000000-8    1000000	      2090 ns/op	     241 B/op	       6 allocs/op

The benchmark takes a corpus of N names (ranging from 10 to 100 million), chooses one at random, fetches the set with that name from the cache, and emits a stat. It also runs in parallel, simulating the contention of  multiple goroutines executing actions. As the population of names grows, the read:write ratio shrinks, going from ~100% reads (N = 10) to far more writes (N = 100 million).

 The results show the sync.Map implementation is systematically ~20% slower than the mutex solution. This should be expected, given the use of interface{} and the extra internal bookkeeping. I’d likely choose to use the mutex-based solution as library code should provide as little overhead as possible.

Wrapping Up

The sync subrepository has a ton of utility, but I’m surprised to see it shows up in very few open source projects. Hopefully these examples will conjure some ideas where it could be useful. The code for this article is both available in the inlined gist snippets, as well as an importable executor package. Let me know how it goes!


  1. Note that Parallel.execFn morphs the Action.Execute method to the expected signature of Group.Go↩︎

  2. This Executor and the following two use the Decorator pattern. Two-for-one Gang of Four! ↩︎

  3. The internals of sync.Map are fascinating. It possesses separate fast and slow paths via an atomically accessed read-only map and a mutex protected dirty copy. Heuristics driven by fast path misses control when the dirty map is persisted onto the read-only map. I recommend giving the implementation a read. ↩︎

Thanks for reading The X-Files!

This post is part of The X-Files, a blog series exploring the Go sub-repositories (golang.org/x/*). For more sub-repo goodness, check out the series so far:

  1. Controlling Throughput with rate.Limiter
  2. Avoiding Concurrency Boilerplate With golang.org/­​x/​sync (This Article!)
  3. With many more to come!