These are all my learnings gathered on concurrency in golang
I decided to properly start learning concurrency in Golang. I had previously worked with it but didn't really understood it. So I bought Concurrency in Go by Katherine Cox-Buday and started to take notes of what I was learning. This is basically a very slimmed down version of the contents of the book. I think it's something useful and decided to share it.
Everything in this article is based on this book by Katherine Cox-Buday. All the knowledge and nearly all examples comes from her book. This article is purely my annotations of the book and what I learned from it. All credits go to Katherine.
func main() {
go sayHello()
go func() {
fmt.Println("cenas")
}()
// continue do other things.
}
func sayHello() {
fmt.Println("hello world")
}
M
green threads into N
OS threads
sync
packagevar wg sync.WaitGroup
wg.Add(1) // Add an incremental to the group (plus one routine running)
go func() {
defer wg.Done() // Say that the routine is finished to the group.
fmt.Println("first goroutine sleeping")
time.Sleep(1)
} ()
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("second goroutine sleeping")
time.Sleep(1)
} ()
wg.Wait() // Waits for routines in waitgroup are finished.
fmt.Println("all goroutines are complete.")
type ConcurrentCounter struct {
mu sync.Mutex
value int
}
func (c *ConcurrentCounter) Increment() {
c.mu.Lock() // restricts the memory pointers here. Guarantees only one write to value.
c.value++
c.mu.Unlock() // Being called with defer is also nice.
}
This example makes a queue with limit of 5, and keeps adding to the queue. We want to remove from the queue every time the queue gets to length of 2, concurrently.
c := sync.NewCond(&sync.Mutex{})
queue := make([]interface{}, 0, 5)
removeFromQueue := func(delay time.Duration) {
time.Sleep(delay)
c.L.Lock() // enter the critical section.
queue = queue[1:]
fmt.Println("removed from queue")
c.L.Unlock() // exit the critical section.
c.Signal() // signal that our operation is done.
}
for i := 0; i < 5; i++ {
c.L.Lock() // Enter a critical section. Prevent concurrent writes.
for len(queue) == 2 {
c.Wait() // We suspend the main routine here. Wait for a signal to continue.
// c.Wait() calls c.L.Unlock() after it's called, and c.L.Lock() when it exits.
}
fmt.Println("adding to queue")
queue = append(queue, struct{}{})
go removeFromQueue(time.Second*1)
c.L.Unlock() // Exit critical section.
}
Output:
adding to queue
adding to queue
removed from queue
adding to queue
removed from queue
adding to queue
removed from queue
adding to queue
removed from queue
adding to queue
We can also use Broadcast()
instead of Signal()
. Broadcast()
tells all goroutines that something happened. Signal()
tells the goroutine that is waiting the longest that something happened.
var count int
increment := func() {
count++
}
var once sync.Once
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
once.Do(increment)
}()
}
wg.Wait()
fmt.Printf("count is %d\n", count)
Output:
count is 1
pool := &sync.Pool {
New: func() interface{} {
fmt.Println("creating new instance")
return struct{}{}
}
}
pool.Get() // Gets an available instance or calls pool.New()
instance := pool.Get()
pool.Put(instance) // Put previously retrieved instance back in pool. Use it to seed the pool or to reuse instances. Saves memory.
pool.Get()
Output:
creating new instance
creating new instance
var receiveStream <-chan string
var sendStream chan<- string
dataStream := make(chan string)
// valid statements
receiveStream = dataStream
sendStream = dataStream
go func() {
dataStream <- "hello world"
}()
fmt.Println(<-dataStream) // prints hello world. Waits until there is something in the channel.
close(dataStream)
and receive a the close message with data, ok := <-dataStream
, where ok is false
and data is ""
for data := range dataStream {}
, and it will end the loop as soon as dataStream
is closed. It will block the execution to wait for the data as well.make(chan int, 3)
, whereas unbuffered channels don't make(chan int)
This reduces the risk of deadlocks (by writing to a nil channel), panics (by closing a nil channel or writing to a closed channel, or closing a channel more than once)
select
statement).This way, the lifecycle of resultStream is encapsulated, easy to read and maintain.
chanOwner := func() <-chan int { //read only chan return
resultStream := make(chan int, 5)
go func() {
defer close(resultStream)
for i := 0; i <= 5; i++ {
resultStream <- i
}
} ()
return resultStream
}
resultStream := chanOwner()
for result := range resultStream {
fmt.Printf("received: %d\n", result)
}
fmt.Println("done receiving")
output:
received 0
received 1
received 2
received 3
received 4
received 5
done receiving
select
statementdone := make(chan interface{})
go func() {
time.Sleep(5*time.Second)
close(done)
} ()
workCounter := 0
for {
select {
case <-done:
break
case <-time.After(10 * time.Second): // example of a timeout with a select, if we never get anything on the done channel for 10 seconds.
break
default:
}
//simulate work
workCounter++
time.Sleep(1*time.Second)
}
fmt.Printf("achieved %d work cycles", workCounter)
output:
achieved 5 work cycles.
Means encapsulating the data, having owners and consumers, so that the owners can close the channel and guarantee that the operations are atomic.
Example:
chanOwner := func() <-chan int {
results := make(chan int, 5)
go func() {
defer close(results)
for i := 0; i < 5; i++ {
results <- i
}
}()
return results
}
consumer := func(results <-chan int) {
for result := range results {
fmt.Printf("received: %d\n", result)
}
fmt.Println("done receiving")
}
results := chanOwner()
consumer(results)
We can loop indefinitely and wait to be stopped, or loop through iteration variables, and do a select statement to return from the loop.
Example:
for {
select {
case <-done:
return
default:
}
// do non-preemptable work
}
Although goroutines are very lightweight, they are not eliminated by the garbage collector, which means that we need to prevent them to run indefinitely. We do that by using a read only channel on the owners of the goroutine, and exiting the routine once that channel is closed. The consumers, or the parent routine that has access to the routine, that knows when that routine must end, are able to do it just by closing the channel.
Example:
newRandStream := func(done <-chan interface{}) <-chan int {
randStream := make(chan int)
go func() {
defer fmt.Println("newRandStream has exited.")
defer close(randStream)
for {
select {
case randStream <- rand.Int():
case <-done:
return
}
}
} ()
return randStream
}
done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
fmt.Printf("%d: %d", i, <-randStream)
}
close(done)
//simulate on going work
time.Sleep(1 * time.Second)
Output:
3 random ints:
1: 121324212
2: 423344243
3: 123123231
newRandStream has exited.
If we didn't send the close channel, we would never have gotten the "newRandStream" message.
You can combine multiple done channels into a single done channel that closes if any of its component channels close. This is done by the use of a recursive function, and it's a good pattern to have. The next example is to check if any of X channels (or goroutines) are closed, and it's X/2 complexity.
Example:
or := func (channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...)
}
}
return orDone
}()
}
sig := func(after time.Duration) <-chan interface{} {
c := make(chan interface{})
go func() {
defer close(c)
time.Sleep(after)
}()
return c
}
start := time.Now()
<-or(
sig(time.Second * 5)
sig(time.Second * 1)
sig(time.Hour * 2)
sig(time.Minute * 1)
)
fmt.Printf("time since start: %v", time.Since(start))
output:
time since start: 1.000004s
Parent goroutines should handle the errors of the children goroutines, or any other routine that has more access to the context, or state, of the whole application. We can do that by encapsulating the return result of the return channel.
Example:
type Result struct {
Error err
Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
resp, err := http.Get(url)
result := Result{Error: err, Result: resp}
select {
case <-done:
return
case results <- result:
}
}
} ()
return results
}
done := make(chan interface{})
defer close(done)
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
errCount := 0
for result := range checkStatus(done, urls) {
if result.Error != nil {
errCount++
fmt.Printf("error: %v", err)
if errCount == 3 {
fmt.Println("too many errors, exiting process")
break
}
continue
}
fmt.Printf("Response: %d\n", result.Response.Status)
}
output:
error: a doesnt exist
Response: 200
error: b doesnt exist
error: c doesnt exist
Pipelines are something that comes from functional programming. Are functions that call another function result. They can be a Batch Pipeline (an array as input and output), or a Stream Pipeline (single input as an input and output). Here is an example of a simple pipeline:
add := func(nums []int, addition int) []int {
result := make([]int, len(nums))
for i, num := range nums {
result[i] = num + addition
}
return result
}
multiply := func(nums []int, multiplier int) []int {
result := make([]int, len(nums))
for i, num := range nums {
result[i] = num * multiplier
}
return result
}
initialArr := []int{1, 2, 3, 4}
result := add(multiply(initialArr, 2), 1)
fmt.Println(result)
Output:
[3, 5, 7, 9]
Example:
generator := func (<- done interface{}, integers... int) <-chan int {
intStream := make(chan int)
go func () {
defer close (intStream)
for _, i := range integers {
select {
case <-done:
return
case intStream <- i:
}
}
} ()
return intStream
}
multiply := func(
done <-chan interface,
intStream <-chan int,
multiplier int,
) <-chan int {
multipliedStream := make(chan int)
go func () {
defer close(multipliedStream)
for i := range intStream {
select {
case <-done:
return
case multipliedStream <- i*multiplier:
}
}
} ()
return multipliedStream
}
add := func(
done <-chan interface,
intStream <-chan int,
additive int,
) <-chan int {
additionStream := make(chan int)
go func () {
defer close(additionStream)
for i := range intStream {
select {
case <-done:
return
case additionStream <- i*additive:
}
}
} ()
return additionStream
}
done := make(chan interface)
defer close(done)
pipeline := multiply(done, add(done, multiply(done, generator(done, 1, 2, 3, 4), 2), 1), 2)
for v := range pipeline {
fmt.Println(v)
}
Output:
6
10
14
18
This is a collection of useful functions/snippets that you might commonly use or see in concurrency projects.
This function will repeat the values you pass to it infinitely until you tell it to stop.
repeat := func(
done <-chan interface{},
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
This "takes" the first number of items off an incoming stream, and then exit.
take := func(
done <-chan interface{},
valueStream <-chan interface{},
num int,
) <-chan interface {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 5) {
fmt.Printf("%v ", num)
}
Output:
1 1 1 1 1
If we expand the repeat and add a callback, we can use that to infinitely call a function and return a channel of the desired type that you want, here's an example:
repeat := func(
done <-chan interface{},
fn func() interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- fn():
}
}
}
}()
return valueStream
}
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 5) {
fmt.Println(num)
}
Output:
1234
54332
3467567
234
34456
Fan-out is the process of starting multiple goroutines to handle input from the pipeline, and fan-in is what we call the process of combining multiple results into one channel. When should we use this pattern?
Fan-out is easy, just launch multiple versions of a particular stage:
numFinders := 4
finders := make([]<-chan int, numFinders)
for i := 0; i < numFinders; i++ {
finders[i] := primeFinder(done, randIntStream)
}
The fan-in joins together (the term is multiplexing) the multiple streams of data into a single-stream. Here is an example:
fanIn := func(
done <-chan interface{},
channels ...<-chan interface{},
) <-chan interface{} {
var wg sync.WaitGroup
multiplexedStream := make(chan interface{})
multiplex := func(c <-chan interface{}) {
defer wg.Done()
for i := range c {
select {
case <-done:
return
case multiplexedStream <- c:
}
}
}
wg.Add(len(channels))
for _, c := range channels {
go multiplex(c)
}
go func() {
wg.Wait()
close(multiplexedStream)
}
return multiplexedStream
}
We can now utilize this multiplexedStream, that combines all of the channels (that are running on multiple goroutines) into one.
It's a way to improve readability. When utilising this pattern, we don't need to check if the channel is closed when reading from it, because the function does it for us:
orDone := func(done, c <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
select {
case <-done:
return
case v, ok := <-c:
if ok == false {
return
}
select {
case valStream <-v:
case <-done:
}
}
}
}()
return valStream
}
for val := range orDone(done, myChan) {
...
}
This reads from one input stream, and exposes it to two other input streams. This way you can send data to two parts of your system.
tee := func(
done <-chan interface{},
in <-chan interface{},
) (<-chan interface{}, <-chan interface{}) {
out1 := make(chan interface{})
out2 := make(chan interface{})
go func() {
defer close(out1)
defer close(out2)
for val := range orDone(done, in) {
var out1, out2 := out1, out2
for i := 0; i < 2; i++ {
select{
case out1 <- val:
out1 = nil
case out2 <- val:
out2 = nil
}
}
}
}()
return out1, out2
}
This will always wait until both out1 and out2 have written data on it. Both of these channels should always have the same data for each iteration of the input channel.
This is a useful way of combining multiple streams of data into a single one. It lets our consumers handle only one problem at the time, when we might have a channel of channels as an input.
bridge := func(
done <-chan interface{},
chanStream <-chan <-chan interface{},
) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
var stream <-chan interface{}
select {
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
}
case <-done:
return
}
for val := range orDone(done, stream) {
select {
case valStream <- val:
case <-done:
}
}
}()
return valStream
}
We can use the context package to add validations whether or not we want to close a channel and end a goroutine. This gives us some control with timeouts and cancelations.
work := func(ctx context.Context) {
defer wg.Done()
for i := 0; i < 200; i++ {
select {
case <-time.After(5 * time.Second):
fmt.Println("starting...", i)
case <-ctx.Done():
fmt.Println("context was canceled", i)
}
}
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
wg.Add(1)
go work(ctx)
wg.Wait()
fmt.Println("finished")
Output:
context was canceled
finished
These techniques will allow us to make our systems more scalable.
Example of a good error handling
// Error handler
type MyError struct {
Inner error // wraps the original error
Message string
Stacktrace string
Misc map[string]interface{} // where we can put additional details such as a hash of the stack trace, an ID, any king of contextual information
}
func wrapError(err error, messagef string, msgArgs ...interface{}) {
return MyError{
Inner: err,
Message: fmt.Sprintf(messagef, msgArgs...)
Stacktrace: string(debug.Stack()),
Misc: make(map[string]interface{})
}
}
func (err MyError) Error() string {
return err.Message
}
// Let's call this next module "LowLevel" Module
type LowLevelErr struct {
error
}
func isGloballyExec(path string) (bool, error) {
info, err := os.Stat(path)
if err != nil {
return false, LowLevelErr{(wrapError(err, err.Error()))}
}
return info.Mode().Perm()&0100 == 0100, nil
}
// Let's call this next module "IntermediateLevel" Module
type IntermediateErr struct {
error
}
func runJob(id string) error {
const jobBinPath := "/path/to/binary"
isExecutable, err := isGloballyExec(jobBinPath)
if err != nil {
return IntermediateErr{wrapError(
err,
"cannot run job %q: binaries are not available",
id,
)}
} else if isExecutable == false {
return wrapError(
nil,
"cannot run job %q: binaries are not executable",
id,
)
}
return exec.Command(jobBinPath, "--id="+id).Run()
}
// main
func handleError(key int, err error, message string) {
log.SetPrefix(fmt.Sprintf("[logID: %v]:", key))
log.Printf("%#v", err)
fmt.Println("[%v] %v", key, message)
}
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime|log.LUTC)
err := runJob("1")
if err != nil {
msg := "Unexpected error, please contact someone"
if _, ok := err.(IntermediateErr); ok {
msg = err.Error()
}
handleError(1, err, msg)
}
}
Output:
[logID: 1]: 22:00:00 main.IntermediateErr{error: main.MyError{Inner: main.LowLevelErr{error: main.MyError(Inner: (*os.PathError)(0xc4200123f0), Message: "stat: \"/path/to/binary\" no such file or directory", Stacktrace: "stacktrace")}}}
We should always make sure all of our code is preemptable, so that if an operation is canceled at any given point, we can cancel the operation entirely.
func someFunction() {
//... previous code
var value interface{}
select {
case <-done:
return
case value <-valueStream:
}
result := reallyLongCalculation(done, value)
select {
case <-done:
return
case resultStream<-result:
}
}
func reallyLongCalculation(done <-chan interface{}, value interface{}) interface{} {
intermediateRes := longCalculation(done, value)
return longCalculation(done, intermediateRes)
}
Heartbeats are a way for concurrent processes to signal life to outside parties. We can had an heartbeat to our goroutine that will run at a time interval, or an heartbeat that will run at the beggining of a unit of work. It is not necessary to always add heartbeats, but they are useful when:
Useful for checking if a goroutine is healthy (if the channels are not closed and we don't get an heartbeat, for instance).
doWork := func(
done <-chan interface{},
pulseInterval time.Duration,
) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{})
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2*pulseInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
// we want to continue to run even if we are not heartbeating
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case results := <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
return heartbeat, results
}()
}
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) })
const timeout := 2*time.Second
heartbeat, results := doWork(done, timeout/2)
for {
select {
case _, ok := <-heartbeat:
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-result:
if ok == false {
return
}
fmt.Printf("results %v\n", r.Second())
case <-time.After(timeout):
return
}
}
Output:
Pulse
Pulse
results 54
Pulse
Pulse
results 56
Pulse
Pulse
results 58
Pulse
These are useful for testing a goroutine. The following is an example of a goroutine with this kind of heartbeat and how to unit test it.
func DoWork(done <-chan interface{}, nums ...int) (<-chan interface{}, <-chan int) {
heartbeatStream := make(chan interface{}, 1)
workStream := make(chan int)
go func() {
defer close(heartbeatStream)
defer close(workStream)
for _, n := range nums {
select {
case heartbeatStream <- struct{}{}
default:
// continue to run even if we can't run the heartbeat (buffered channel is full and someone should listen, we don't care if no)
}
select {
case <-done:
return
case workStream <- n:
}
}
}()
return heartbeatStream, workStream
}
func TestDoWork_GeneratedAllNumbers(t *testing.T) {
done := make(chan interface{}),
defer close(done)
intSlice := []int{1, 2, 3, 4, 5}
heartbeat, results := DoWork(done, intSlice...)
<-heartbeat // this makes sure that `DoWork` started it's process.
for i, expected := range intSlice {
select {
case r := <-results:
if expected != r {
t.Errorf("index %v: expected %v, but got %v", i, expected, r)
}
case <-time.After(1 * time.Second):
t.Fatal("test timed out")
}
}
}
This is about setting up multiple workers to provide the first response of multiple requests (the same requests).
doWork := func(done <-chan interface{}, id int, wg *sync.WaitGroup, result chan<- int) {
started := time.Now()
defer wg.Done()
// Simulate random load
simulatedLoadTime := time.Duration(1+rand.Intn(5)) * time.Second
select {
case <-done:
case <-time.After(simulatedLoadTime):
}
select {
case <-done:
case result <- id:
}
took := time.Since(started)
// Display how long handlers would have taken
if took < simulatedLoadTime {
took = simulatedLoadTime
}
fmt.Printf("%v took %v\n", id, took)
}
done := make(chan interface{})
result := make(chan int)
var wg sync.WaitGroup
wg.Add(10)
// Here we start 10 handlers to handle our requests.
for i := 0; i < 10; i++ {
go doWork(done, i, &wg, result)
}
// This line grabs the first returned value from the group of handlers.
firstReturned := <-result
// Here we cancel all the remaining handlers.
// This ensures they don’t continue to do unnecessary work.
close(done)
wg.Wait()
fmt.Printf("Received an answer from #%v\n", firstReturned)