Hippocampus's Garden

Under the sea, in the hippocampus's garden...

Ready Steady GO: Dispatcher-Worker with errgroup | Hippocampus's Garden

Ready Steady GO: Dispatcher-Worker with errgroup

February 14, 2021  |  2 min read  |  869 views

  • このエントリーをはてなブックマークに追加

Suppose you are asked to write a golang program that:

  1. processes multiple jobs concurrently
  2. can limit the number of goroutines
  3. immediately cancels the other jobs when an error occurs in any goroutine

To satisfy the requirement #1 and #2, you can adopt a dispatcher-worker pattern. The common pattern often utilizes sync.WaitGroup, but in this case, to satisfy the other requirement #3, you might want to use golang.org/x/sync/errgroup.

Here is my sample implementation. The job is to wait for a random milliseconds, and an error occurs when ordered to wait for more than 990 ms.

package main

import (
	"context"
	"fmt"
	"math/rand"
	"time"

	"golang.org/x/sync/errgroup"
)

const (
	numWorkers      = 10
	queueLength     = 100
	numJobs         = 100
	maxWaitMilliSec = 1000
	thres           = 990
)

type Job struct {
	id           int
	waitMilliSec int
}

type Dispatcher struct {
	queue chan *Job
	eg    *errgroup.Group
}

func NewDispatcher(eg *errgroup.Group) *Dispatcher {
	return &Dispatcher{
		queue: make(chan *Job, queueLength),
		eg:    eg,
	}
}

func (d *Dispatcher) Start(ctx context.Context) {
	for i := 0; i < numWorkers; i++ {
		d.eg.Go(func() error {
			for j := range d.queue {
				err := work(ctx, j)
				if err != nil {
					return err
				}
			}
			return nil
		})
	}
}

func (d *Dispatcher) Append(job *Job) {
	d.queue <- job
}

func work(ctx context.Context, job *Job) error {
	select {
	case <-ctx.Done():
		fmt.Printf("Canceled the job #%d\n", job.id)
		return nil
	default:
		fmt.Printf("Working on the job #%d. Wait for %d ms.\n", job.id, job.waitMilliSec)
		if job.waitMilliSec > thres {
			return fmt.Errorf("cannot wait for more than %d ms: job #%d; %d ms", thres, job.id, job.waitMilliSec)
		}
		time.Sleep(time.Duration(job.waitMilliSec) * time.Millisecond)
		return nil
	}
}

func main() {
	eg, ctx := errgroup.WithContext(context.Background())
	d := NewDispatcher(eg)

	d.Start(ctx)
	for i := 0; i < numJobs; i++ {
		milliSec := rand.Intn(maxWaitMilliSec)
		d.Append(&Job{
			id:           i,
			waitMilliSec: milliSec,
		})
	}

	close(d.queue)
	err := d.eg.Wait()
	if err != nil {
		fmt.Println(err)
	}
}

You can try the above code here. It results in:

$ go run main.go 
Working on the job #6. Wait for 425 ms.
Working on the job #7. Wait for 540 ms.
...
Working on the job #77. Wait for 463 ms.
Working on the job #78. Wait for 996 ms.
Canceled the job #79
...
Canceled the job #98
Canceled the job #99
cannot wait for more than 990 ms: job #78; 996 ms

References

[1] golang の channel を使って Dispatcher-Worker を作り goroutine 爆発させないようにする - at kaneshin
[2] Re: golang の channel を使って Dispatcher-Worker を作り goroutine 爆発させないようにする - okzkメモ
[3] https://gist.github.com/okzk/4e5afec27927668e52d5eb6c5eb1bb72
[4] https://gist.github.com/lestrrat/c9b78369cf9b9c5d9b0c909ed1e2452e
[5] Goメモ-62 (sync.WaitGroupとerrgroupパッケージ) - いろいろ備忘録日記
[6] 複数のGoroutineをWaitGroup(ErrGroup)で制御する - Hack Your Design!


  • このエントリーをはてなブックマークに追加
[object Object]

Written by Shion Honda. If you like this, please share!

Shion Honda

Hippocampus's Garden © 2025, Shion Honda. Built with Gatsby