Demo on creating worker pool in GoLang

  sonic0002        2021-01-24 05:04:00       6,746        0         

A worker pool is a pool where a specified number of workers(usually goroutine) created and run to pick up tasks. This can allow multiple tasks to be ran at the same time while keeping the number of workers a fixed number to avoid overuse of resource in the program.

There are usually two approaches of creating worker pool.

  • One is with fixed number of workers pre-created
  • One is creating worker when needed until the max number of workers created

In this post, we will cover the demonstration of creating fixed number of workers pre-ahead. This approach is usually used when one knows there are lots of tasks to be ran at the same time and the chance of reaching to max allowed worker would be high.

To demonstrate, we first need to create a Worker struct which will take task and run.

import (
	"fmt"
)

// Worker ...
type Worker struct {
	ID       int
	Name     string
	StopChan chan bool
}

// Start ...
func (w *Worker) Start(jobQueue chan Job) {
	w.StopChan = make(chan bool)
	successChan := make(chan bool)

	go func() {
		successChan <- true
		for {
			// take job
			job := <-jobQueue
			if job != nil {
				job.Start(w)
			} else {
				fmt.Printf("worker %s to be stopped\n", w.Name))
				w.StopChan <- true
				break
			}
		}
	}()

	// wait for the worker to start
	<-successChan
}

// Stop ...
func (w *Worker) Stop() {
	// wait for the worker to stop, blocking
	_ = <-w.StopChan
	fmt.Printf("worker %s stopped\n", w.Name))
}

The Worker has some properties to carry the worker state and also two functions which will be called to start and stop the worker.

Inside Start(), it will create some channels to stop and indicate the start of the worker. Most importantly is the for loop where it keeps waiting for available job and take and run until a signal tells the queue is drained.

The Job is an interface which has a single function Start(), hence there can be all different types of jobs as long as they implement Start() function.

// Job ...
type Job interface {
	Start(worker *Worker) error
}

Once the Worker is there, next thing is to create the pool and let it manage the workers.

import (
	"fmt"
	"sync"
)

// Pool ...
type Pool struct {
	Name string

	Size    int
	Workers []*Worker

	QueueSize int
	Queue     chan Job
}

// Initiualize ...
func (p *Pool) Initialize() {
	// maintain minimum 1 worker
	if p.Size < 1 {
		p.Size = 1
	}
	p.Workers = []*Worker{}
	for i := 1; i <= p.Size; i++ {
		worker := &Worker{
			ID:   i - 1,
			Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1),
		}
		p.Workers = append(p.Workers, worker)
	}

	// maintain min queue size as 1
	if p.QueueSize < 1 {
		p.QueueSize = 1
	}
	p.Queue = make(chan Job, p.QueueSize)
}

// Start ...
func (p *Pool) Start() {
	for _, worker := range p.Workers {
		worker.Start(p.Queue)
	}
	fmt.Println("all workers started")
}

// Stop ...
func (p *Pool) Stop() {
	close(p.Queue) // close the queue channel

	var wg sync.WaitGroup
	for _, worker := range p.Workers {
		wg.Add(1)
		go func(w *Worker) {
			defer wg.Done()

			w.Stop()
		}(worker)
	}
	wg.Wait()
	fmt.Println("all workers stopped")
}

The Pool will contain a slice of Worker and also a Queue for holding the jobs. The number of workers can be defined when initializing.

The key here is the logic in Stop() function, when this function is called, it would close the job queue, post this, the worker would read nil from the job queue and hence would close the worker. Then in the for loop, it starts to wait for the worker to stop concurrently until the last worker stops. 

To demonstrate the whole logic, below is an example which just prints the job.

import "fmt"

func main() {
	pool := &Pool{
		Name:      "test",
		Size:      5,
		QueueSize: 20,
	}
	pool.Initialize()
	pool.Start()
        defer pool.Stop()

	for i := 1; i <= 100; i++ {
		job := &PrintJob{
			Index: i,
		}
		pool.Queue <- job
	}
}

// PrintJob ...
type PrintJob struct {
	Index int
}

func (pj *PrintJob) Start(worker *Worker) error {
	fmt.Printf("job %s - %d\n", worker.Name, pj.Index)
	return nil
}

If you see the logic, it's pretty straight forward, it creates a Pool with 5 workers and the job queue size is 20. Thereafter, it simulates the job creation process and handling process, once a job is created, it will be pushed to the job queue and the workers waiting on the queue will take the job and run.

Output will be something like:

all workers started
job test-worker-3 - 4
job test-worker-3 - 6
job test-worker-3 - 7
job test-worker-3 - 8
job test-worker-3 - 9
job test-worker-3 - 10
job test-worker-3 - 11
job test-worker-3 - 12
job test-worker-3 - 13
job test-worker-3 - 14
job test-worker-3 - 15
job test-worker-3 - 16
job test-worker-3 - 17
job test-worker-3 - 18
job test-worker-3 - 19
job test-worker-3 - 20
worker test-worker-3 to be stopped
job test-worker-4 - 5
job test-worker-0 - 1
worker test-worker-3 stopped
job test-worker-2 - 3
worker test-worker-2 to be stopped
worker test-worker-2 stopped
worker test-worker-4 to be stopped
worker test-worker-4 stopped
worker test-worker-0 to be stopped
worker test-worker-0 stopped
job test-worker-1 - 2
worker test-worker-1 to be stopped
worker test-worker-1 stopped
all workers stopped

In next post, we will demonstrate how to create worker only if needed instead of creating them all at once which is also taking resource.

GOLANG  GOROUTINE  WORKER POOL 

       

  RELATED


  0 COMMENT


No comment for this article.



  RANDOM FUN

I am a feature but not a bug