Service discovery with etcd

  sonic0002        2021-03-08 05:36:29       5,361        0    

In previous post, we have talked about etcd and its usage. This post we will cover how to implement server discovery with etcd.

Service discovery is to solve one of the most commonly seen scenarios in distributed system where how to find the corresponding target service to talk to. In short, it is to find some server which one can talk to based on some service name.

A complete service discovery system include below three key functions:

  • Service registration: A service must register itself to some common place so that others can discover it
  • Health check: An instance must report its health status to the common place so that others can still talk to and discover it
  • Service discovery: A service initiates the service discovery process by initializing client with specified target service name to find

Service registration and health check

According to etcd v3 API, when a new service is started, it needs to register itself to etcd by writing the server address and also grant a lease and keep the lease alive when it's still providing service to achieve health check.

A sample Go code snippet for this process can be found below.

package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

//ServiceRegister create and register service
type ServiceRegister struct {
	cli     *clientv3.Client //etcd client
	leaseID clientv3.LeaseID //lease ID
	// lease keep-alive chan
	keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
	key           string //key
	val           string //value
}

//NewServiceRegister create new service
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	ser := &ServiceRegister{
		cli: cli,
		key: key,
		val: val,
	}

	if err := ser.putKeyWithLease(lease); err != nil {
		return nil, err
	}

	return ser, nil
}

// set lease
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
	// grant lease time
	resp, err := s.cli.Grant(context.Background(), lease)
	if err != nil {
		return err
	}
	// register service and bind lease
	_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	// set keep-alive logic
	leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
	if err != nil {
		return err
	}
	s.leaseID = resp.ID
	log.Println(s.leaseID)
	s.keepAliveChan = leaseRespChan
	log.Printf("Put key:%s  val:%s  success!", s.key, s.val)
	return nil
}

//ListenLeaseRespChan listena and watch
func (s *ServiceRegister) ListenLeaseRespChan() {
	for leaseKeepResp := range s.keepAliveChan {
		log.Println("续约成功", leaseKeepResp)
	}
	log.Println("关闭续租")
}

// Close
func (s *ServiceRegister) Close() error {
	// revoke lease
	if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
		return err
	}
	log.Println("撤销租约")
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"localhost:2379"}
	ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
	if err != nil {
		log.Fatalln(err)
	}
	// listen to keep alive chan
	go ser.ListenLeaseRespChan()
	select {
	// case <-time.After(20 * time.Second):
	// 	ser.Close()
	}
}

When the service is shutdown, should call Close() to revoke the lease.

Service discovery

The service discovery can leverage the Watch mechanism provided by v3 API and it can be used to discover server addition, deletion etc.

package main

import (
	"context"
	"log"
	"sync"
	"time"

	"github.com/coreos/etcd/mvcc/mvccpb"
	"go.etcd.io/etcd/clientv3"
)

//ServiceDiscovery service discovery
type ServiceDiscovery struct {
	cli        *clientv3.Client  // etcd client
	serverList map[string]string // server list
	lock       sync.Mutex
}

//NewServiceDiscovery  new service discovery client
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}

	return &ServiceDiscovery{
		cli:        cli,
		serverList: make(map[string]string),
	}
}

//WatchService initialize service and watch service
func (s *ServiceDiscovery) WatchService(prefix string) error {
	resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
	if err != nil {
		return err
	}

	for _, ev := range resp.Kvs {
		s.SetServiceList(string(ev.Key), string(ev.Value))
	}

	// watch prefix
	go s.watcher(prefix)
	return nil
}

//watcher ...
func (s *ServiceDiscovery) watcher(prefix string) {
	rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
	log.Printf("watching prefix:%s now...", prefix)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.PUT: //修改或者新增
				s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
			case mvccpb.DELETE: //删除
				s.DelServiceList(string(ev.Kv.Key))
			}
		}
	}
}

//SetServiceList add server to list
func (s *ServiceDiscovery) SetServiceList(key, val string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	s.serverList[key] = string(val)
	log.Println("put key :", key, "val:", val)
}

//DelServiceList ...
func (s *ServiceDiscovery) DelServiceList(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	delete(s.serverList, key)
	log.Println("del key:", key)
}

//GetServices get server list
func (s *ServiceDiscovery) GetServices() []string {
	s.lock.Lock()
	defer s.lock.Unlock()
	addrs := make([]string, 0)

	for _, v := range s.serverList {
		addrs = append(addrs, v)
	}
	return addrs
}

//Close close service
func (s *ServiceDiscovery) Close() error {
	return s.cli.Close()
}

func main() {
	var endpoints = []string{"localhost:2379"}
	ser := NewServiceDiscovery(endpoints)
	defer ser.Close()
	ser.WatchService("/web/")
	ser.WatchService("/gRPC/")
	for {
		select {
		case <-time.Tick(10 * time.Second):
			log.Println(ser.GetServices())
		}
	}
}

Sample output:

$go run discovery.go
watching prefix:/web/ now...
put key : /web/node1 val:localhost:8000
[localhost:8000]

$go run register.go
Put key:/web/node1 val:localhost:8000 success!
续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7 
续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7 
...

Reference: etcd实现服务发现 - 烟花易冷人憔悴 - 博客园 (cnblogs.com)

TUTORIAL  DEMO  ETCD  SERVICE DISCOVERY 

       

  RELATED


  0 COMMENT


No comment for this article.



  RANDOM FUN

When called to fix a production issue