Go’s concise structure and powerful native library enable us to hit the ground running easily. It is more efficient than Java or Python when implementing the same functions, especially its concurrent programming, which is very handy and widely admired due to its goroutine
and channel
.
goroutine and channel has much to dig into, and let’s start with channel, which I used to consider narrowly as a message queue to transfer data between goroutines and support data synchronization, but definitely owns a bigger stage.
Channel == Semaphore + Buffer
Channel is a data type like slice and map, and the equation channel == Semaphore + Buffer reveals its essence. Semaphore is the core concept followed by the two major features, delivery guarantee, and state.
Delivery guarantee clears the way for goroutine, saving it from focusing on whether the signal can be transferred through the channel or whether the dependent channel can receive the signal but only on the logic implementation. In the example below, the delivery guarantee of ch1 and ch2 determines the success of the goroutine.
go func() {
for {
i,ok := <-ch1 // blocked if ch1 is empty
if ok {
ch2 <- i*i
} else {
break
}
}
}()
And whether a channel can send or receive data is determined by its state.
- Nil, channel that is uninitialized, var ch chan int. Not receiving or sending any signals. The operation is blocked, and panic when closed.
- Open, initialized channel, ch = make(chan int). It enables receiving and sending any signals.
- Closed, closed channel, close(ch), receiving signals only and panic when sending.
Buffer changes the channel from a semaphore into a first-in-first-out queue, which well explains the name “channel.” However, the buffer also jeopardizes the delivery guarantee and causes bugs. If the buffer still has more than one item when the channel is closed, these signals will fail to be delivered.
func main() {
ch := make(chan int, 3)
go func() {
for i := 0; i < 3; i++ {
ch <- i
time.Sleep(1 * time.Second)
}
}()
time.Sleep(2 * time.Second)
if v, ok := <-ch; ok {
fmt.Printf("time: %d \n", v)
}
close(ch)
}
// output
time: 0
Channel Simplifies Concurrency
Concurrent programming is based on the multi-thread mindset: The threads run independently, while the data is shared and synchronized. Data processing is always a difficult point, but using channels reasonably can simplify concurrency.
Define a channel
ChannelType = ( “chan” | “chan” “<-” | “<-” “chan” ) ElementType .
Channels can be categorized as send-and-receive, read-only, and send-only according to whether the internal recvq
and sendq
queues are permitted to store the corresponding waiters, the goroutines.
Generally, the read-only and send-only channels are not used directly but as function parameters or returns, constraining the channel’s role in the function. For example,
func send(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i
}
}
func recv(ch <-chan int) {
for i := range ch {
fmt.Println(i)
}
}
func rc(int n) <- chan int {
ch := make(chan int)
go func(){
// do some work
ch <- v
}
return ch
}
If you change send(ch chan<- int) to send(ch <- chan int), the code won’t compile.
Use Channel
There are so many ways to use channels that it is of low-efficiency if you want to memorize all the patterns. But the key is to keep in mind the core of using channels, the signal transmission between goroutines.
- At least one goroutine is involved.
- There must be a sender and a receiver.
- Both the sender and receiver can be multi-goroutines.
- A block may occur on both parties.
- The buffer can store excess semaphores.
Based on sending and receiving modes, let’s classify channels into 3 groups, synchronizing and blocking, non-blocking and others.
Synchronizing and Blocking
The channels send and receive with the <- operator, one signal after one. Both send and receive can be blocked in some scenarios even if there’s a buffer, though the buffer can enhance the concurrency to some extent. So we often use the goroutines andfor loop together.
func main() {
ch1 := make(chan string)
ch2 := make(chan string, 10) //buffered
go func() {
// do some work
ch <- "end" // work done signal
}()
go func(p int) {
// loop channel buffer
for v := range ch2 {
// do some work
}
}(p)
// multip tasks in parrallel, can also be in goroutines
for i := 0; i < 10; i++ {
// dispath task
ch2 <- i
}
// trigger work, 1:1
p := <-ch
// wait work to finish
time.Sleep(time.Second)
}
Sender and receiver can be one-to-one, one-to-multi, multi-to-one, and multi-to-multi, while the “one” here can be either one or multi goroutines, and even the multiple sending and receiving contained in the for loop. And we choose from patterns depending on our requirements for concurrency, including data volume and concurrency latency.
- One-to-one and unbuffered channel. The block is inevitable(can be invisible), but the signal can be delivered 100%.
- One-to-multi. When the buffer is used, it is similar to a worker pool, and the sender(producer) can only be blocked when the workload is heavy and the buffer is fully occupied. When there is no buffer, the maximum concurrency is the number of workers (goroutines).
- Multi-to-one. The execution order can be guaranteed though, the sender will be blocked with a high possibility.
- Multi-to-multi. It is the most efficient design with the fastest processing and the lowest blocking but also consumes the most memory. But still, estimating the number of senders and receivers is required to achieve the highest performance, otherwise either the sending is blocked, or the receiving is “starved.”
Non-blocking
Both the <- operator and the for range are for one channel. And Go provides the select operator to facilitate channel operation.
- Supportive of receiving multiple channels simultaneously
- Supportive of sending and receiving simultaneously
- Supportive of adding a timeout to prevent the channel from waiting long
- Supportive of Drop the signal when the channel is full
func main() {
cap := 10 // max number of active requests at any given moment
t1 := make(chan string, cap) // buffered channel is used, team 1
t2 := make(chan string) // no buffer channel, team 2
stopCh := make(chan string) // stop all workers
// can trigger work in one goroutine
go func() {
for p := range t1 {
fmt.Println("do work:", p)
}
}()
const num = 200 // task count
for w := 0; w < num; w++ {
// select-case allow us to perform multiple channel operations
// at the same time, on the same goroutine
select {
case v := <- t2:
go work(v) // trigger work goroutine for t2
case t1 <- "start":
fmt.Println("send signal:", w)
// if channel buffer is full, drop the message
case <- stopCh:
close(t1)
close(t2)
case <-time.After(timeout): // stop all workers after sometime
close(t1)
close(t2)
default:
fmt.Println("drop work", w)
}
}
time.Sleep(time.Second) // wait for everyone to finish
}
Others
Channels can be combined with other Go language features to better design concurrent code.
- To verify the “end”
Through if v, ok := <- ch; ok{}, you can determine whether the channel has closed, so that the receiver can end gracefully.
- To guarantee the “end”
In the above example, we used time.Sleep and waited for the goroutine to end in the main method, which obviously does not meet the production standard. While with waitGroup combined, the channel and goroutine, a classic “producer-consumer” mode, can end perfectly.
func main() {
res := make(chan chan int, 10)
var wg sync.WaitGroup
wg.Add(10) // 10 jobs
go func() {
defer close(outs)
for i := 0; i < 10; i++ {
o := work(&wg, i) // do work
res <- o // pass result
}
}()
go func() {
for o := range res {
fmt.Println(<-o)
}
}
wg.Wait()
}
func work(wg *sync.WaitGroup, a int) chan int {
out := make(chan int)
go func() {
// do some work
out <- a
wg.Done()
}()
return out
}
- To cancel
Using After to cancel the channel after a certain time as we did in the select example is uncommon in practice. In production, we do not limit the channel but the execution time of the entire business. That is the role context plays: We use the context to link every aspect of the entire logic, set a timeout, and determine to end the channel block when the context ends.
func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(emptyCtx, 100 * time.Millisecond)
defer cancel()
ch := make(chan string, 1)
// create worker goroutine
go func() {
time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
ch <- "start"
}()
select {
case d := <-ch:
fmt.Println("work complete", d)
// ctx.Done() call starts the 150ms duration clock ticking.
// If 150 ms passes before the worker goroutine finishes, this println will be executed
case <-ctx.Done():
fmt.Println("work cancelled")
}
}
We set the buffer as 1 to avoid the memory leak when the goroutine sent is timeout and main ends.
When to Use Channel
It's hard to tell when to use channel, but depending mostly on experience, especially your experience with concurrency development.
You can find many usage cases in the Kubernetes code bases, but here I can only list the most common scenarios.
- Trigger signal, including end and start
- Transfer data in async. Asynchronous worker processes one by one for the non-urgent
- Block for purpose. For critical steps, you need to block and wait
- The worker pool. The worker is either woken up by the task or long blocked until the task comes
Signal Channel
Use channel as a signal to end certain tasks. {name} chan struct{} is used to define a channel without any data, since struct{} occupies the lowest memory. Notify to end and close channel with <- {name}.
It is used many times in the kuberentes/controller-runtime manager.
In controllerManager, there are another two signal channels of elected chan struct{} and internalProceduresStop chan struct{}.
Asynchronous processing
As a data channel, channel can also transmit data and facilitate asynchronous processing, such as the error handling in controllerManager. All errors will be put into a errChan chan error.
Only when the channel end, this err channel is drained and asynchronously processed.
Processing error asynchronously is very common in Go code, and you can also find similar processing in the kind code, combined with waitGroup.
The “starving” worker
In the concurrent worker pattern, the channels trigger the task execution, and workers are blocked in the absence of tasks, avoiding the overhead increase of the goroutines’ creation and destruction.
Let’s see how the Kubernetes watcher works.
- Forward all events to the channel before the stop signal is received.
- Start the for loop select to wait for tasks
Tasks to be executed
Another common example is to execute tasks asynchronously and wait for completion. This pattern is often applied to the critical step of the code, such as the image download in kubelet.
The pullImage here is a blocking job executed asynchronously, being executed by goroutine in puller.go, and sending the result to the channel.
You may wonder: Why blocking? Why asynchronously? Why not integrate the code and execute it in sequent instead?
I think it is a typical example of splitting synchronized tasks via channels and abstracting the code to improve code flexibility for subsequent needs to improve concurrency.
Conclusion
Channel simplifies concurrency and offers big flexibility, but only when we know how and when to use it. So it is of great importance that we understand the core of channel = Semaphore + Buffer, and figure out when a semaphore or a buffer is needed, in which scenarios the sending and receiving are blocked. And to improve coding efficiency, we should integrate it into the concurrency mindset to make it better serve our practice.
Note: The post is authorized by original author to republish on our site. Original author is Stefanie Lai who is currently a Spotify engineer and lives in Stockholm, original post is published here.