Background
Before getting into the main content, let me give a brief introduction to WaitGroup
and its related background knowledge. Here, the focus is on the basic usage of WaitGroup
and the fundamental knowledge of system semaphores. For those who are familiar with these, you can skip this section directly.
WaitGroup
WaitGroup
is one of the most common concurrency control techniques in Golang, and its function can be roughly compared to the join()
in concurrency control of other languages' multithreading. The sample code is as follows:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
fmt.Println("Main starts...")
var wg sync.WaitGroup
// 2 indicates below two goroutines need to wait
wg.Add(2)
go waitFunc(&wg, 3)
go waitFunc(&wg, 1)
// wait and block
wg.Wait()
fmt.Println("Main ends...")
}
func waitFunc(wg *sync.WaitGroup, num int) {
// notify WaitGroup the work is done
defer wg.Done()
time.Sleep(time.Duration(num) * time.Second)
fmt.Printf("Hello World from %v\n", num)
}
// output:
Main starts...
Hello World from 1
Hello World from 3
Main ends...
If there were no WaitGroup here, the main goroutine(main function) would directly run to the final "Main ends..." without the output of the two middle goroutines. With WaitGroup added, the main goroutine will block at wg.Wait()
and wait for both goroutines to end before continuing execution.
The three methods of WaitGroup we saw above: Wait()
, Add(int)
, and Done()
are also the only three methods of a WaitGroup struct.
Semaphore
Semaphore is a mechanism used to implement synchronization and mutual exclusion between multiple processes or threads, and it is also the technique used in WaitGroup. The synchronization principle of WaitGroup itself is also similar to that of Semaphore.
Semaphore can be essentially understood as an integer, mainly including two operations: P(Proberen, test) operation and V(Verhogen, increase) operation. The P operation will attempt to obtain a semaphore. If the value of the semaphore is greater than 0, the value of the semaphore will be reduced by 1 and the execution will continue. Otherwise, the current process or thread will be blocked until another process or thread releases this semaphore. The V operation releases a semaphore and increases its value by 1.
Semaphore can be thought of as something similar to a lock, where the P operation is equivalent to acquiring a lock, and the V operation is equivalent to releasing a lock. Since Semaphore is a mechanism at the operating system level, it is usually supported by the kernel. Therefore, we do not need to worry that the operations on Semaphore itself will produce race conditions. We believe that the kernel can handle this kind of thing.
The focus of this article is not on Semaphore, so we won't delve too much into the technical details of Semaphore. For those who are interested, you can refer to relevant materials.
Finally, let's talk about something other than technology. Proberen and Verhogen, these two words look unfamiliar, right? Because they are Dutch, not English. Why is it Dutch? Because the inventor of Semaphore was a computer pioneer from the Netherlands, the ancient computer master Edsger W. Dijkstra. Yes, that Dijkstra.
WaitGroup underlying logic
Disclaimer: The source code used in this article is based on Go version 1.20.3. The WaitGroup source code for different versions of Go may differ slightly, but the design principles are generally consistent.
The relevant source code for WaitGroup is very short, with only about 120 lines including comments and blank lines. They are all in src/sync/waitgroup.go.
Definition
Let's first look at the definition of WaitGroup.
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}
The WaitGroup type is a struct, which has three private members. Let's take a look at them one by one.
noCopy
First is noCopy
, which is used to tell the compiler that a WaitGroup struct object cannot be copied, i.e., wg2 := wg
is illegal. The reason for prohibiting copying is to prevent possible deadlocks. However, in reality, if we copy a WaitGroup object, at least in version 1.20, Go's compiler only issues a warning and does not prevent the compilation process. We can still compile successfully. The warning message is as follows:
assignment copies lock value to wg2: sync.WaitGroup contains sync.noCopy
Why the compiler does not fail to compile, I guess it is because the Go official wants to minimize the intervention of the compiler in the program and leave more to the programmers to handle themselves (at this time Rust burst into laughter). In any case, when using WaitGroup, we should not try to copy it, otherwise it is very easy to cause deadlocks (in fact, the comments on the struct also say that WaitGroup cannot be copied after first use). For example, I slightly changed the main
function in the code at the beginning of the article:
func main() {
fmt.Println("Main starts...")
var wg sync.WaitGroup
// 2 indicates two goroutines need to wait
wg.Add(1)
wg2 := wg
wg2.Add(1)
go waitFunc(&wg, 3)
go waitFunc(&wg2, 1)
// wait and block
wg.Wait()
wg2.Wait()
fmt.Println("Main ends...")
}
// output
Main starts...
Hello World from 1
Hello World from 3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000042060?)
C:/Program Files/Go/src/runtime/sema.go:62 +0x27
sync.(*WaitGroup).Wait(0xe76b28?)
C:/Program Files/Go/src/sync/waitgroup.go:116 +0x4b
main.main()
D:/Codes/Golang/waitgroup/main.go:23 +0x139
exit status 2
Why does this happen? Because wg
has already been Add(1)
, and at this point we copy wg
to wg2
, and it is a shallow copy, meaning that wg2
internally is already in the state after Add(1)
(the value stored in the state
member). At this point, when we call wg2.Add(1)
, we are actually executing wg2.Add(1)
twice. Later, in waitFunc()
, wg2
is only released once with Done()
, and when main()
calls wg2.Wait()
, it gets stuck in an infinite wait, i.e., "all goroutines are asleep". After understanding the principles behind Add()
and Done()
later, when we look back at this deadlock code, it will become clearer.
So can we copy this code without causing a deadlock? Of course, we just need to move wg2 := wg
in front of wg.Add(1)
.
state atomic.Uint64
state
is the core of WaitGroup. It is an unsigned 64-bit integer and uses Uint64
from the atomic
package, so state
itself is thread-safe. As for why atomic.Uint64
can ensure thread safety, it is because it uses the Compare And Swap(CAS) operation, which relies on atomic instructions provided by the CPU and is a CPU-level atomic operation.
The high 32 bits of state
is the counter, and the low 32 bits is the number of waiters. The counter is actually the sum of the Add(int)
quantities. For example, after Add(1)
and Add(2)
, the counter is 1 + 2 = 3. The number of waiters is the number of goroutines currently executing Wait()
and waiting for WaitGroup to be released.
sema uint32
It is a semaphore, and we will talk about its usage later in the article with the help of some code examples.
Add(delta int)
All three methods of WaitGroup have no return value, and only Add
has a parameter. The entire design is extremely concise.
The first line of code in the Add
method is:
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
race.Enabled
checks whether the program has enabled the detection of race conditions, which needs to be manually specified at compile time using
go build -race main.go
By default, it is not enabled, so race.Enabled
is false
by default. If the program has enabled the detection of race conditions, this code will disable it and then re-enable it later. We will not discuss other details related to race conditions in this article, as they have little effect on our understanding of the core mechanism of WaitGroup. Including them would only increase the complexity of our understanding of WaitGroup. Therefore, all parts related to race conditions will be ignored in the following code.
The code for the Add
method after cleaning up is as follows:
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
// Convert the int32 delta to uint64, left shift by 32 bits, and then add it to the state.
// This is equivalent to adding the high 32 bits of delta to the high 32 bits of state.
state := wg.state.Add(uint64(delta) << 32)
// The high 32 bits refer to the counter, which is the counter.
v := int32(state >> 32)
// The low 32 bits refer to the waiters, which is the number of waiters.
w := uint32(state)
// It will panic if counter is negative
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// When Wait and Add are executed concurrently, there is a chance of triggering the following panic.
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// If the counter is greater than 0, or there are no waiters, that is, no goroutine is in Wait(), then it returns directly.
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// reset state to 0
wg.state.Store(0)
// awake all waiters
for ; w != 0; w-- {
// use semaphore to awake waiter
runtime_Semrelease(&wg.sema, false, 0)
}
}
At the beginning, the method converts the delta parameter to a uint64
, shifts it left by 32 bits, and adds it to state. Since the high 32 bits of state are the counter of this WaitGroup, this is actually an accumulation operation on the counter:
state := wg.state.Add(uint64(delta) << 32)
Next, the program extracts the accumulated counter v
and the current number of waiters w
:
v := int32(state >> 32)
w := uint32(state)
Then there are several checks:
// panic if the counter is negative
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// panic if Wait and Add are called concurrently
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// return directly if the counter is greater than 0 or there are no waiters,
// i.e., no goroutine is waiting in Wait()
if v > 0 || w == 0 {
return
}
The comments are already quite clear. Here, let's explain the second if statement in more detail: if w != 0 && delta > 0 && v == int32(delta)
.
w != 0
means that there are goroutines waiting inWait()
;delta > 0
means thatAdd()
is called with a positive integer, i.e., a normal call;v == int32(delta)
means that the accumulated counter is equal to the passed-in delta. The most straightforward scenario that satisfies this equation is when the original counter is 0, i.e., wg is used for the first time, or when all previousWait()
calls have already returned.
The above three conditions may seem somewhat conflicting: w != 0
indicates that there is a Wait()
, while v == int32(delta)
suggests that there is no Wait()
. Further analysis reveals that v
does not have a Wait()
when it is obtained, while w
has a Wait()
when it is obtained. Is this possible? Yes! This can happen during concurrency: the current goroutine obtains v
, and then another goroutine immediately calls Wait()
. Then this goroutine obtains w
again. The process is as follows:
We can use the following code to reproduce this panic:
func main() {
var wg sync.WaitGroup
// It's not easy to reproduce concurrency issues, so we loop multiple times
for i := 0; i < 100000; i++ {
go addDoneFunc(&wg)
go waitFunc(&wg)
}
wg.Wait()
}
func addDoneFunc(wg *sync.WaitGroup) {
wg.Add(1)
wg.Done()
}
func waitFunc(wg *sync.WaitGroup) {
wg.Wait()
}
// Output:
// panic: sync: WaitGroup misuse: Add called concurrently with Wait
//
// goroutine 71350 [running]:
// sync.(*WaitGroup).Add(0x0?, 0xbf8aa5?)
// C:/Program Files/Go/src/sync/waitgroup.go:65 +0xce
// main.addDoneFunc(0xc1cf66?, 0x0?)
// D:/Codes/Golang/waitgroup/main.go:19 +0x1e
// created by main.main
// D:/Codes/Golang/waitgroup/main.go:11 +0x8f
// exit status 2
This code may need to be run multiple times to see the above effect, as this concurrent operation can cause several types of panic during the entire lifecycle of the WaitGroup, including in the Wait() method.
Therefore, when using WaitGroup, we should be careful not to use Add inside the called goroutine, but outside, as follows:
// Correct
wg.Add(1)
go func(wg *sync.WaitGroup) {
defer wg.Done()
}(&wg)
wg.Wait()
// Incorrect
go func(wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
}(&wg)
wg.Wait()
This can avoid exceptions caused by concurrency.
After the three if statements, the consistency of state will be checked again to prevent concurrency exceptions:
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
Here, state.Load()
and the Store()
that will appear later are atomic operations of atomic.Uint64
.
According to the logic of the earlier code, the counter must be 0 when the program reaches this point, while the number of waiters may be >= 0. Therefore, the code will execute wg.state.Store(0)
once to set state to 0, and then perform the operation of notifying the waiters to end their waiting:
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
Okay, this is another confusing part. When I first saw this code, I had several questions:
- Why does the
Add
method have a branch logic for a counter of 0? Isn't the counter cumulative? - Why notify the waiters to end in
Add
instead ofDone
? - Why does
runtime_Semrelease(&wg.sema, false, 0)
need to loopw
times?
Let's take a closer look at each one.
Why does the Add method have a branch logic for a counter of 0?
First, according to the logic of the previous code, the code will only reach the last two sentences when the counter v is 0. The reason for being 0 is that the parameter delta of Add(delta int) is an int, which means delta can be negative! So when will a negative number be passed in? At the time of Done. If we look at the code for Done(), we can see that it is very simple:
// Done decrements the WaitGroup counter by 1.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Therefore, when we use Done() or manually pass a negative number to Add, we will enter the last few lines of Add's logic. Done itself also means the end of the current goroutine's WaitGroup, and needs to be synchronized with the external Wait to unblock it.
Why do we notify the waiters to end in Add instead of Done method?
Well, this question was actually solved together in the previous question, because Done() actually calls Add(-1).
Why does runtime_Semrelease(&wg.sema, false, 0) need to loop w times?
This function, as its literal meaning suggests, releases a semaphore. The source code is located in src/sync/runtime.go
, and the function declaration is as follows:
// Semrelease atomically increments *s and signals any waiters,
// if any. It is intended as a simple wakeup primitive for use with
// synchronization loops, not for general use.
// If handoff is true, count is handed off directly to the first waiter.
// skipframes specifies the number of stack frames to skip over
// when tracing over this function.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
The first parameter is the value of the semaphore itself, and it will be increased by 1 when it is released.
The second parameter handoff, based on my understanding after consulting materials, should be: When handoff is false, only other waiting coroutines are awakened normally, but the awakened coroutines will not be immediately scheduled; when handoff is true, the awakened coroutines will be scheduled immediately.
The third parameter skipframes
seems to be related to scheduling, but I'm not sure about the specific meaning, so I won't guess here (sorry for my limited knowledge).
According to the mechanism of the semaphore itself, the value of the semaphore will be increased by 1 when it is released. Similarly, there is a semaphore acquisition function runtime_Semacquire(s *uint32) which will decrease the semaphore by 1 when the semaphore > 0, otherwise it will wait. It will be called in Wait(). This is also the reason why runtime_Semrelease needs to loop w times: because there will be w Wait() calls that will call runtime_Semacquire and decrease the semaphore by 1, so the two places need to offset each other.
The mechanism of the semaphore is similar to that of WaitGroup, but the counter is reversed, so here are a few more words to supplement:
When the semaphore is acquired (runtime_Semacquire), it is actually blocking and waiting, doing a P (Proberen, test) operation. If the semaphore > 0 at this time, the acquisition is successful, and the semaphore is decreased by 1, otherwise it continues to wait.
When the semaphore is released (runtime_Semrelease), the semaphore will be increased by 1, which is a V (Verhogen, increase) operation.
Done()
We have seen Done() explained above.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait()
Below is the cleaned up code for Wait()
with race condition related code removed.
// Wait will block until counter reaches 0。
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32) // counter
w := uint32(state) // waiter number
if v == 0 {
// return if counter is 0
return
}
// increase waiter number
if wg.state.CompareAndSwap(state, state+1) {
// get semaphore
runtime_Semacquire(&wg.sema)
// avoid concurrency issue
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Compared to Add
, Wait
is much simpler, and with the lengthy explanation of Add as a basis, the code for Wait seems clear at a glance.
When the counter is 0, that is, when no goroutine calls Add, calling Wait directly has no meaning, so it returns directly without operating on the semaphore.
Finally, Wait also has a check to prevent concurrency issues, and this panic can also be reproduced using the concurrency issue code in Add, which you can try.
The only difference in Wait is that it uses an infinite loop for{}, why? This is because the atomic operation wg.state.CompareAndSwap(state, state+1) may fail due to concurrency reasons, so it is necessary to reacquire state and go through the whole process again. Once the operation is successful, Wait will block at runtime_Semacquire(&wg.sema) until the Done operation reduces the counter to 0, and Add releases the semaphore.
Conclusion
The source code for WaitGroup has been fully analyzed. As one of the most important concurrency components in Golang, the source code for WaitGroup is surprisingly only a few dozen lines of code, which makes it much easier for us to understand its internals.
Reference: Golang WaitGroup 底层原理及源码详解 - 程序员小屋 - SegmentFault 思否