When and How to Use the Go Channel

  sonic0002        2022-09-17 23:06:36       1,741        0         

Go’s concise structure and powerful native library enable us to hit the ground running easily. It is more efficient than Java or Python when implementing the same functions, especially its concurrent programming, which is very handy and widely admired due to its goroutine and channel.

goroutine and channel has much to dig into, and let’s start with channel, which I used to consider narrowly as a message queue to transfer data between goroutines and support data synchronization, but definitely owns a bigger stage.

Channel == Semaphore + Buffer

Channel is a data type like slice and map, and the equation channel == Semaphore + Buffer reveals its essence. Semaphore is the core concept followed by the two major features, delivery guarantee, and state.

Delivery guarantee clears the way for goroutine, saving it from focusing on whether the signal can be transferred through the channel or whether the dependent channel can receive the signal but only on the logic implementation. In the example below, the delivery guarantee of ch1 and ch2 determines the success of the goroutine.

go func() {
   for {
      i,ok := <-ch1 // blocked if ch1 is empty
      if ok {
         ch2 <- i*i
      } else {
         break
      }
   }
}()

And whether a channel can send or receive data is determined by its state.

  • Nil, channel that is uninitialized, var ch chan int. Not receiving or sending any signals. The operation is blocked, and panic when closed.
  • Open, initialized channel, ch = make(chan int). It enables receiving and sending any signals.
  • Closed, closed channel, close(ch), receiving signals only and panic when sending.

Buffer changes the channel from a semaphore into a first-in-first-out queue, which well explains the name “channel.” However, the buffer also jeopardizes the delivery guarantee and causes bugs. If the buffer still has more than one item when the channel is closed, these signals will fail to be delivered.

func main() {
	ch := make(chan int, 3)
	go func() {
		for i := 0; i < 3; i++ {
			ch <- i
			time.Sleep(1 * time.Second)
		}
	}()

	time.Sleep(2 * time.Second)
	if v, ok := <-ch; ok {
		fmt.Printf("time: %d \n", v)
	}
	close(ch)
}
// output
time: 0

Channel Simplifies Concurrency

Concurrent programming is based on the multi-thread mindset: The threads run independently, while the data is shared and synchronized. Data processing is always a difficult point, but using channels reasonably can simplify concurrency.

Define a channel

ChannelType = ( “chan” | “chan” “<-” | “<-” “chan” ) ElementType .

Channels can be categorized as send-and-receive, read-only, and send-only according to whether the internal recvq and sendq queues are permitted to store the corresponding waiters, the goroutines.

Generally, the read-only and send-only channels are not used directly but as function parameters or returns, constraining the channel’s role in the function. For example,

func send(ch chan<- int) {
   for i := 0; i < 5; i++ {
      ch <- i
   }
}

func recv(ch <-chan int) {
   for i := range ch {
      fmt.Println(i)
   }
}

func rc(int n) <- chan int {
   ch := make(chan int)
   go func(){
      // do some work
      ch <- v
   }
   return ch
}

If you change send(ch chan<- int) to send(ch <- chan int), the code won’t compile.

Use Channel

There are so many ways to use channels that it is of low-efficiency if you want to memorize all the patterns. But the key is to keep in mind the core of using channels, the signal transmission between goroutines.

  • At least one goroutine is involved.
  • There must be a sender and a receiver.
  • Both the sender and receiver can be multi-goroutines.
  • A block may occur on both parties.
  • The buffer can store excess semaphores.

Based on sending and receiving modes, let’s classify channels into 3 groups, synchronizing and blocking, non-blocking and others.

Synchronizing and Blocking

The channels send and receive with the <- operator, one signal after one. Both send and receive can be blocked in some scenarios even if there’s a buffer, though the buffer can enhance the concurrency to some extent. So we often use the goroutines andfor loop together.

func main() {
  ch1 := make(chan string)
  ch2 := make(chan string, 10) //buffered

  go func() {
      // do some work
      ch <- "end" // work done signal
  }()
  
  go func(p int) {
    // loop channel buffer
    for v := range ch2 {
      // do some work 
    }
  }(p)
  
  // multip tasks in parrallel, can also be in goroutines
  for i := 0; i < 10; i++ {
    // dispath task
    ch2 <- i
  }

  // trigger work, 1:1
  p := <-ch

  // wait work to finish
  time.Sleep(time.Second)
}

Sender and receiver can be one-to-one, one-to-multi, multi-to-one, and multi-to-multi, while the “one” here can be either one or multi goroutines, and even the multiple sending and receiving contained in the for loop. And we choose from patterns depending on our requirements for concurrency, including data volume and concurrency latency.

  • One-to-one and unbuffered channel. The block is inevitable(can be invisible), but the signal can be delivered 100%.
  • One-to-multi. When the buffer is used, it is similar to a worker pool, and the sender(producer) can only be blocked when the workload is heavy and the buffer is fully occupied. When there is no buffer, the maximum concurrency is the number of workers (goroutines).
  • Multi-to-one. The execution order can be guaranteed though, the sender will be blocked with a high possibility.
  • Multi-to-multi. It is the most efficient design with the fastest processing and the lowest blocking but also consumes the most memory. But still, estimating the number of senders and receivers is required to achieve the highest performance, otherwise either the sending is blocked, or the receiving is “starved.”

Non-blocking

Both the <- operator and the for range are for one channel. And Go provides the select operator to facilitate channel operation.

  • Supportive of receiving multiple channels simultaneously
  • Supportive of sending and receiving simultaneously
  • Supportive of adding a timeout to prevent the channel from waiting long
  • Supportive of Drop the signal when the channel is full
func main() {
    cap := 10 // max number of active requests at any given moment

    t1 := make(chan string, cap)  // buffered channel is used, team 1
    t2 := make(chan string) // no buffer channel, team 2
    stopCh := make(chan string) // stop all workers

    // can trigger work in one goroutine
    go func() {
        for p := range t1 {
            fmt.Println("do work:", p)
        }
    }()

    const num = 200 // task count

    for w := 0; w < num; w++ {
        // select-case allow us to perform multiple channel operations
        // at the same time, on the same goroutine
        select {
        case v := <- t2:
          go work(v) // trigger work goroutine for t2
        case t1 <- "start":
            fmt.Println("send signal:", w)
        // if channel buffer is full, drop the message
        case <- stopCh:
            close(t1)
            close(t2)
        case <-time.After(timeout): // stop all workers after sometime
           close(t1)
           close(t2)
        default:
            fmt.Println("drop work", w)
        }
    }

    time.Sleep(time.Second) // wait for everyone to finish
}

Others

Channels can be combined with other Go language features to better design concurrent code.

  • To verify the “end”

Through if v, ok := <- ch; ok{}, you can determine whether the channel has closed, so that the receiver can end gracefully.

  • To guarantee the “end”

In the above example, we used time.Sleep and waited for the goroutine to end in the main method, which obviously does not meet the production standard. While with waitGroup combined, the channel and goroutine, a classic “producer-consumer” mode, can end perfectly.

func main() {
    res := make(chan chan int, 10) 
    
    var wg sync.WaitGroup
    
    wg.Add(10) // 10 jobs
    
    go func() {
      defer close(outs)
      for i := 0; i < 10; i++ {
        o := work(&wg, i) // do work
        res <- o  // pass result
      } 
    }()

   go func() {
    for o := range res {
        fmt.Println(<-o)
    }
   }
    wg.Wait()
}

func work(wg *sync.WaitGroup, a int) chan int {
    out := make(chan int)
    go func() {
        // do some work
        out <- a
        wg.Done()
    }()
    return out
}
  • To cancel

Using After to cancel the channel after a certain time as we did in the select example is uncommon in practice. In production, we do not limit the channel but the execution time of the entire business. That is the role context plays: We use the context to link every aspect of the entire logic, set a timeout, and determine to end the channel block when the context ends.

func main() {
    ctx := context.Background()

    ctx, cancel := context.WithTimeout(emptyCtx, 100 * time.Millisecond)
    defer cancel()

    ch := make(chan string, 1)

    // create worker goroutine
    go func() {
        time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
        ch <- "start"
    }()

    select {

    case d := <-ch:
        fmt.Println("work complete", d)

    // ctx.Done() call starts the 150ms duration clock ticking.
    // If 150 ms passes before the worker goroutine finishes, this println will be executed
    case <-ctx.Done():
        fmt.Println("work cancelled")
    }
}

We set the buffer as 1 to avoid the memory leak when the goroutine sent is timeout and main ends.

When to Use Channel

It's hard to tell when to use channel, but depending mostly on experience, especially your experience with concurrency development.

You can find many usage cases in the Kubernetes code bases, but here I can only list the most common scenarios.

  • Trigger signal, including end and start
  • Transfer data in async. Asynchronous worker processes one by one for the non-urgent
  • Block for purpose. For critical steps, you need to block and wait
  • The worker pool. The worker is either woken up by the task or long blocked until the task comes

Signal Channel

Use channel as a signal to end certain tasks. {name} chan struct{} is used to define a channel without any data, since struct{} occupies the lowest memory. Notify to end and close channel with <- {name}.

It is used many times in the kuberentes/controller-runtime manager.

Send the signal.

In controllerManager, there are another two signal channels of elected chan struct{} and internalProceduresStop chan struct{}.

Asynchronous processing

As a data channel, channel can also transmit data and facilitate asynchronous processing, such as the error handling in controllerManager. All errors will be put into a errChan chan error.

Only when the channel end, this err channel is drained and asynchronously processed.

Processing error asynchronously is very common in Go code, and you can also find similar processing in the kind code, combined with waitGroup.

The “starving” worker

In the concurrent worker pattern, the channels trigger the task execution, and workers are blocked in the absence of tasks, avoiding the overhead increase of the goroutines’ creation and destruction.

Let’s see how the Kubernetes watcher works.

  • Forward all events to the channel before the stop signal is received.
  • Start the for loop select to wait for tasks

Tasks to be executed

Another common example is to execute tasks asynchronously and wait for completion. This pattern is often applied to the critical step of the code, such as the image download in kubelet.

The pullImage here is a blocking job executed asynchronously, being executed by goroutine in puller.go, and sending the result to the channel.

You may wonder: Why blocking? Why asynchronously? Why not integrate the code and execute it in sequent instead?

I think it is a typical example of splitting synchronized tasks via channels and abstracting the code to improve code flexibility for subsequent needs to improve concurrency.

Conclusion

Channel simplifies concurrency and offers big flexibility, but only when we know how and when to use it. So it is of great importance that we understand the core of channel = Semaphore + Buffer, and figure out when a semaphore or a buffer is needed, in which scenarios the sending and receiving are blocked. And to improve coding efficiency, we should integrate it into the concurrency mindset to make it better serve our practice.

Note: The post is authorized by original author to republish on our site. Original author is Stefanie Lai who is currently a Spotify engineer and lives in Stockholm, original post is published here.

 

GOLANG  CONTEXT  CHANNEL 

       

  RELATED


  0 COMMENT


No comment for this article.



  RANDOM FUN

Difference between man and woman