# Go Concurrency Reference
## Goroutines
Go efficiently packs a large number of goroutines into a modest amount of OS threads by aggressively selecting between goroutines. When a goroutine blocks, Go switches to another.
## Channels
Since goroutines often need to share resources, Go offers “channels”. Channels connect goroutines. A goroutine uses a channel to send a value to another goroutine.
Each channel has a type, and only sends values of that type. Internally, a channel is a reference to a data structure created by make, like maps.
Channels offer two operations: send and receive. (A third operation — close — stops any further communication across the channel.) Send and receive use the <- operator:
```go
ch <- x // send
x = <-ch // receive and assign
<-ch // receive and discard
close(ch)
```
A channel can be unbuffered (the default) or buffered:
```go
ch = make(chan int) // unbuffered
ch = make(chan int, 5) // buffered with capacity 3
```
A send on an unbuffered channel blocks the sending goroutine until another goroutine receives on the channel. Conversely, if a goroutine tries to receive before the channel has data, that goroutine blocks until another goroutine sends. Receiving runs before Go reawakens the sending goroutine. In effect, unbuffered channels “synchronize” sending and receiving goroutines.
## Pipelines
A “pipeline” is the output of one channel used as the input to another channel. Like a shell pipeline, a Go pipeline can have any number of connected channel and function stages
Closing the naturals channels signals the “squarer” function to end its for loop:
```go
// Pipeline1 demonstrates channels used as pipelines.
// Taken from TGPL section 8.4.2.
package main
import "fmt"
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter:
go func() {
for x := 0; x < 10; x++ {
naturals <- x
}
close(naturals)
}()
// Squarer:
go func() {
for x := range naturals { // A syntactic convenience; see TGLP p. 229.
squares <- x * x
}
close(squares)
}()
// Printer:
for x := range squares {
fmt.Println(x)
}
}
```
The above construct of “for-ranging” over a channel is shorthand for:
```go
// Squarer:
go func() {
for {
x, ok := <-naturals
if !ok {
break // Channel closed and drained
}
squares <- x * x
}
close(squares)
}()
```
Drained means all further receives on the closed channel don’t block but yield zero values.
It’s not vital to close every channel; the garbage collector eventually reclaims any open but unreachable channels. Only worry about closing a channel to signal receiving goroutine(s) that sending has ended. Trying to close a previously closed channel panics.
### Uni-directional Channels
Channels can be passed into or returned from functions like any other value. A channel passes as a parameter is almost always intended only for sending or only for receiving.
```go
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
```
A unidirectional channel exposes only send or only receive. The channel declaration indicates its directionality:
```go
out chan<- int // A send-only channel. Data only flows in to it (`chan<-`).
in <-chan int // A receive-only channel. Data only flows out of it (`<-chan`).
```
The compiler detects errant sends to a receive-only channel and vice versa. The compiler also errors on closes of receive-only channels (since close means no more data will be sent).
## Buffered Channels
```go
ch = make(ch string, 3)
```
The above creates a buffered channel with a first-in-first-out queue of three elements. A send operation slots a value into the back of the queue, and a receive empties a value from the front.
```go
ch <- "X"
ch <- "Y"
ch <- "Z"
```
A receive on this channel would get “X”.
The key thing about a buffered channel is that sends/receives do not cause a goroutine to block unless the buffer completely fills/empties. Buffered channels don’t naturally cause sending and receiving goroutines to synchronize.
### Fan in, fan out
The “fan out” pattern is when multiple functions read from the same channel until it closes.
The “fan in” pattern is when a function multiplexes several channels into one channel, and reads until all the inputs close. Because sends on a closed channel panic, make sure to finish all sends before closing the channel. sync.WaitGroup helps.
### Select
The select construct lets us multiplex between channels. It selects a channel that’s ready to receive or send. If multiple channels are ready, select chooses randomly.
```go
package main
import (
"fmt"
"os"
"time"
)
func main() {
abort := make(chan bool)
go func() {
os.Stdin.Read(make([]byte, 1))
abort <- true
}()
tick := time.Tick(1 * time.Second)
fmt.Println("Begin countdown. Press RETURN to abort.")
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
/* Either count down or abort.
We can't just receive from each channel, because the first one
will block the other. We need to multiplex with `select`.
*/
select {
case <-tick:
// Pass
case <-abort:
fmt.Println("Launch aborted.")
return
}
}
fmt.Println("Lift off!")
}
```
```go
package main
import (
"fmt"
)
func counter(s string, ch chan string, done chan int) {
for i := 0; i <= 2; i++ {
ch <- fmt.Sprintf("%v: %v", s, i)
}
done <- 1
}
func main() {
ch := make(chan string)
done := make(chan int)
go counter("A", ch, done)
go counter("B", ch, done)
go counter("C", ch, done)
for i := 0; i < 3; {
select {
case x := <-ch:
fmt.Println(x)
case <- done:
i++
}
}
}
```
### sync.WaitGroup
If we know how many iterations we need (i.e. — how many goroutines we’ll start and stop), coordinate them through simple signaling, as seen below in parallel(). If we don’t know that in advance, use sync.WaitGroup to act as a reference counter for goroutines, as seen below in wtgrp().
```go
package main
import (
"fmt"
"sync"
"time"
)
var Msgs = []string{"Hello.", "Good day.", "Buenos noches."}
func Worker(s string, t time.Duration) {
time.Sleep(time.Second * t)
fmt.Println(s)
}
func nonparallel() {
for _, m := range Msgs {
Worker(m, 2)
}
}
func broken() {
for _, m := range Msgs {
go Worker(m, 2)
}
}
func parallel() {
ch := make(chan bool)
for _, m := range Msgs {
go func(m string) {
Worker(m, 2)
ch <- true
}(m)
}
// Wait for signal on channel that each goroutine completed.
// This solution only works for a known quantity of goroutines.
for range Msgs {
<-ch
}
}
func wtgrp() {
// Wtgrp is like parallel(), but works when we don't know the number of iterations.
// sync.WaitGroup essentially keeps a count of starting goroutines and matches to ending goroutines.
var wg sync.WaitGroup
for _, m := range Msgs {
wg.Add(1) // Be certain to Add before launching the goroutine!
go func(m string) {
defer wg.Done()
Worker(m, 2)
}(m)
}
wg.Wait()
}
func main() {
// nonparallel()
// broken()
// parallel()
wtgrp()
}
```
## Concurrency with Shared Variables
### Race Conditions
See The Go Programming Language, chapter nine.
Race conditions defy intuition, because the order of interleaved operations is unpredictable, even from one program run to another. Whenever two goroutines concurrently access the same variable, and at least one access includes a write, a data race can happen. How to avoid a race condition:
Avoid the problem by writing the variable at program init, and only reading it thereafter.
Confine the variable to one goroutine (other goroutines request changes over a channel). The goroutine that brokers access to a confined variable using channel requests is called a “monitor goroutine” for that variable. This is an example of Go’s “share memory by communicating, rather than communicating by sharing memory” mantra.
In “serial confinement”, a variation of the above, the pointer address of a shared variable may be passed along a pipeline of functions, so long as each stage in the pipeline knows not to access the variable after passing it on.
The final way to avoid a race condition is through mutual exclusion. See below.
### Mutual Exclusion with sync.Mutex
With sync.Mutex, a goroutine must acquire an exclusive lock to write a variable and unlock it when finished writing. When another goroutine already holds the lock, this operation blocks until the other goroutine calls Unlock. By convention, declare the variables guarded by a mutex immediately after declaration of the mutex itself.
```go
// Bankmutex demonstrates using a mutex for avoiding races for shared variables.
package main
import "fmt"
import "sync"
var mu sync.Mutex // guards balance
var balance int
var wg sync.WaitGroup
func main() {
TestTrans(99)
TestTrans(100)
wg.Wait()
}
func TestTrans(d int) {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("Before:", Balance())
Deposit(d)
fmt.Println("After:", Balance())
}()
}
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
b := balance
mu.Unlock()
return b
}
```
The code between Lock and Unlock is called the “critical section”. Be sure to unlock on all paths through the function, even when it branches to an error. Although the above example explicitly delineates the critical section, it might be safer to use defer, like:
```go
func Deposit(amount int) {
mu.Lock()
defer mu.Unlock()
balance = balance + amount
}
```
---
-
-
Go Concurrency Patterns: Pipelines and cancellation - The Go Programming Language
How to use Go's concurrency to build data-processing pipelines.