In previous post, we have talked about etcd and its usage. This post we will cover how to implement server discovery with etcd.
Service discovery is to solve one of the most commonly seen scenarios in distributed system where how to find the corresponding target service to talk to. In short, it is to find some server which one can talk to based on some service name.
A complete service discovery system include below three key functions:
- Service registration: A service must register itself to some common place so that others can discover it
- Health check: An instance must report its health status to the common place so that others can still talk to and discover it
- Service discovery: A service initiates the service discovery process by initializing client with specified target service name to find
Service registration and health check
According to etcd v3 API, when a new service is started, it needs to register itself to etcd by writing the server address and also grant a lease and keep the lease alive when it's still providing service to achieve health check.
A sample Go code snippet for this process can be found below.
package main
import (
"context"
"log"
"time"
"go.etcd.io/etcd/clientv3"
)
//ServiceRegister create and register service
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.LeaseID //lease ID
// lease keep-alive chan
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister create new service
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}
return ser, nil
}
// set lease
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
// grant lease time
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
// register service and bind lease
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
// set keep-alive logic
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}
//ListenLeaseRespChan listena and watch
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("ç»çº¦æˆåŠŸ", leaseKeepResp)
}
log.Println("å…³é—ç»ç§Ÿ")
}
// Close
func (s *ServiceRegister) Close() error {
// revoke lease
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser, err := NewServiceRegister(endpoints, "/web/node1", "localhost:8000", 5)
if err != nil {
log.Fatalln(err)
}
// listen to keep alive chan
go ser.ListenLeaseRespChan()
select {
// case <-time.After(20 * time.Second):
// ser.Close()
}
}
When the service is shutdown, should call Close()
to revoke the lease.
Service discovery
The service discovery can leverage the Watch mechanism provided by v3 API and it can be used to discover server addition, deletion etc.
package main
import (
"context"
"log"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
)
//ServiceDiscovery service discovery
type ServiceDiscovery struct {
cli *clientv3.Client // etcd client
serverList map[string]string // server list
lock sync.Mutex
}
//NewServiceDiscovery new service discovery client
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}
//WatchService initialize service and watch service
func (s *ServiceDiscovery) WatchService(prefix string) error {
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}
// watch prefix
go s.watcher(prefix)
return nil
}
//watcher ...
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //åˆ é™¤
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}
//SetServiceList add server to list
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}
//DelServiceList ...
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}
//GetServices get server list
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)
for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}
//Close close service
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}
func main() {
var endpoints = []string{"localhost:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
ser.WatchService("/web/")
ser.WatchService("/gRPC/")
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
}
}
}
Sample output:
$go run discovery.go
watching prefix:/web/ now...
put key : /web/node1 val:localhost:8000
[localhost:8000]
$go run register.go
Put key:/web/node1 val:localhost:8000 success!
ç»çº¦æˆåŠŸ cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7
ç»çº¦æˆåŠŸ cluster_id:14841639068965178418 member_id:10276657743932975437 revision:29 raft_term:7
...
Reference: etcd实现æœåŠ¡å‘现 - 烟花易冷人憔悴 - åšå®¢å› (cnblogs.com)