go-etcd实战

go-etcd实战,第1张

etcd 简介

etcd is a strongly consistent, distributed key-value store that provides a reliable way to store data that needs to be accessed by a distributed system or cluster of machines. It gracefully handles leader elections during network partitions and can tolerate machine failure, even in the leader node.
贴了官网的介绍。大致的意思就是etcd 是一个强一致性的分布式kv存储。提供了可信赖的手段去保证分布式系统中的数据存储。在网络抖动分裂和机器坏死的情况下,都可以优雅的处理leader选举问题。

学习参考的资料
官网
中文文档
本文代码收录

环境搭建

本文是伪分布式集群的方式搭建etcd集群,3个server结点。etcd集群结点之间通讯使用的是grpc协议。启动一个etcd实例需要占用两个端口号,一个端口号用于对外提供服务的(2379),另外一个端口号用于集群内部之间进行通讯的(2380)

说明

本文的实 *** 环境是基于虚拟机完成的,虚拟机通过静态ip与本机进行连接,虚拟机局域网 ip地址固定为 10.248.174.155。etcd的版本为 v3.5.0。

下载安装

etcd 直接下载对应的二进制包进行解压即可使用。

# 下载
wget https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz
# 解压
tar -zxvf etcd-v3.5.0-linux-amd64.tar.gz

解压后的目录如下所示

root@n248-174-155:~/file/etcd-v3.5.0-linux-amd64$ ll
total 56304
drwxr-xr-x 3 root root     4096 Jun 16  2021 Documentation
-rwxr-xr-x 1 root root 23560192 Jun 16  2021 etcd
-rwxr-xr-x 1 root root 17969152 Jun 16  2021 etcdctl
-rwxr-xr-x 1 root root 16048128 Jun 16  2021 etcdutl
-rw-r--r-- 1 root root    42066 Jun 16  2021 README-etcdctl.md
-rw-r--r-- 1 root root     7359 Jun 16  2021 README-etcdutl.md
-rw-r--r-- 1 root root     9394 Jun 16  2021 README.md
-rw-r--r-- 1 root root     7896 Jun 16  2021 READMEv2-etcdctl.md
命令实战

一般来说,官方提供各种语言相关的sdk都与原生提供的命令有相似之处,而etcd使用go语言编写的,原生api对go的支持更为友好。先了解一下基本的一些 *** 作命令。

本 地 e t c d 相 关 的 *** 作 命 令 都 是 在 解 压 后 的 根 目 录 下 进 行 的 \color{#FF0000}{本地etcd相关的 *** 作命令都是在解压后的根目录下进行的} etcd ***

cd  /home/root/etcd-v3.5.0-linux-amd64/
启动etcd集群 启动参数说明 listen-peer-urls: 集群内部通讯的端点listen-client-urls: 集群对外提供服务的端点initial-cluster-token:三个结点都是一个集群 etcd-cluster-1initial-cluste:集群中的node信息最后以 nohup 后台的方式启动三个实例,并指定了日志的输出目录
nohup etcd --name etcd01 --initial-advertise-peer-urls http://10.248.174.155:2370 \
--listen-peer-urls http://0.0.0.0:2370 \
--listen-client-urls http://0.0.0.0:2371 \
--advertise-client-urls http://10.248.174.155:2371 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd01=http://10.248.174.155:2370,etcd02=http://10.248.174.155:2380
,etcd03=http://10.248.174.155:2390 \
--initial-cluster-state new > /home/root/etcd-v3.5.0-linux-amd64/etcd01.etcd/etcd01.log 2>&1 &

nohup etcd --name etcd02 --initial-advertise-peer-urls http://10.248.174.155:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--listen-client-urls http://0.0.0.0:2381 \
--advertise-client-urls http://10.248.174.155:2381 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd01=http://10.248.174.155:2370,etcd02=http://10.248.174.155:2380
,etcd03=http://10.248.174.155:2390 \
--initial-cluster-state new > /home/root/etcd-v3.5.0-linux-amd64/etcd02.etcd/etcd02.log 2>&1 &

nohup etcd --name etcd03 --initial-advertise-peer-urls http://10.248.174.155:2390 \
--listen-peer-urls http://0.0.0.0:2390 \
--listen-client-urls http://0.0.0.0:2391 \
--advertise-client-urls http://10.248.174.155:2391 \
--initial-cluster-token etcd-cluster-1 \
--initial-cluster etcd01=http://10.248.174.155:2370,etcd02=http://10.248.174.155:2380
,etcd03=http://10.248.174.155:2390 \
--initial-cluster-state new > /home/root/etcd-v3.5.0-linux-amd64/etcd03.etcd/etcd03.log 2>&1 &

三个结点都启动成功

root@n248-174-155:~/file/etcd-v3.5.0-linux-amd64$ netstat -ntpl|grep etcd
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2370                 :::*                    LISTEN      3785847/etcd
tcp6       0      0 :::2371                 :::*                    LISTEN      3785847/etcd
tcp6       0      0 :::2380                 :::*                    LISTEN      3785815/etcd
tcp6       0      0 :::2381                 :::*                    LISTEN      3785815/etcd
tcp6       0      0 :::2390                 :::*                    LISTEN      3785781/etcd
tcp6       0      0 :::2391                 :::*                    LISTEN      3785781/etcd
测试命令
# 查看etcd版本
root@n248-174-155:~/file/etcd-v3.5.0-linux-amd64$ etcdctl version
etcdctl version: 3.5.0
API version: 3.5
# 查看集群的成员
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 member list -w table
# 查看集群的状态
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 endpoint status -w table 
kv *** 作命令 添加key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 put services/svr-user/001 192.168.3.2:5780 
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 put services/svr-user/002 192.168.3.3:5643 
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 put services/svr-user/003 192.168.172.8:3456
查询key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 get services/svr-user/003
根据前缀查询
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 get --prefix services/svr-user/
获取所有key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 get --from-key ''
删除单个key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 del services/svr-user/003
删除所有的key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 del --prefix ""
创建一个租约60s 得到一个租约id
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease grant 60 

lease 600f7e04ffa10330 granted with TTL(60s)

查询某个租约的具体信息
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease timetolive 738c7e05014d164d 

lease 600f7e04ffa10330 granted with TTL(60s), remaining(45s)

将租约绑定到具体的键值对上,时间过期后,k,v也就失效了
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 put k1 v1 --lease=600f7e04ffa10330
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 put k2 v2 --lease=600f7e04ffa10330
查询这个租约绑定了哪些key
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease timetolive 26a37e0501d46f26 --keys

lease 600f7e04ffa10330 granted with TTL(60s), remaining(14s), attached keys([k1 k2])

查询当前被激活的所有租约
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease list

found 4 leases
738c7e05014d161f
26a37e0501d46f20
26a37e0501d46f22
26a37e0501d46f24

一直续约租约

比如一开始是60,续约后将租约又刷新到60,不过是每隔20s刷新一次,实际测试这个刷新间隔时间会根据你的租约时间而改变,测试租约20s的时候,10s刷新间隔, 租约10s,5s刷新间隔, 租约5s,3s刷新间隔。 保证一定能续约上。

etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease keep-alive 26a37e0501d46f28 lease

26a37e0501d46f26 keepalived with TTL(60)
lease 26a37e0501d46f26 keepalived with TTL(60)
lease 26a37e0501d46f26 keepalived with TTL(60)
lease 26a37e0501d46f26 keepalived with TTL(60)
lease 26a37e0501d46f26 keepalived with TTL(60)

只续约一次
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 lease keep-alive 26a37e0501d46f28 --once
监听某个前缀的key的改变

终端会一直卡住接收事件,删除和新增都能收到,如果是key绑定了租约,租约到期了,会接收到一个删除事件

etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 watch --prefix aa/
PUT
aa/002
192.168.4.2:5780
PUT 
aa/002 
192.168.4.2:5780 
DELETE 
aa/002 
PUT 
aa/002 
192.168.4.2:5780 
DELETE 
aa/002
终端一直卡在这里,监听某一个key的改变,每次改变,会将key之前的值和最新的值都推送过来
etcdctl --endpoints=10.248.174.155:2371,10.248.174.155:2381,10.248.174.155:2391 watch --prev-kv aa/
go-etcd 实战

etcd 的常用应用场景是微服务体系中的注册中心,维护服务的注册信息,提供服务发现能力。下面做一个服务注册与发现的例子,仅供参考。

服务注册发现kv设计 思路一

key : /services/XXX/ip:port 三段式,第一段固定,第二段为服务名称(唯一),第三段是服务的ip和端口组成的字符串
value:就为空字符串就行了

思路二

key: /services/XXX/nodeID 三段式,第一段固定,第二段为服务名称(唯一),第三段是实例id,这里我用一个随机函数生成的一个随机字符串
value:则包含服务的id和端口,value最后序列化为json字符串

这 里 采 用 思 路 二 去 设 计 的 整 个 流 程 \color{#FF0000}{这里采用思路二去设计的整个流程}

基本数据结构设计

NodeInfo 用于描述结点具体信息,序列化为json后以某个服务实例的 value信息

package etcd

import (
	"fmt"
	"log"
	"math/rand"
	"sync"
	"time"
)

// NodeInfo kv中的 v 存具体结点信息
type NodeInfo struct {
	IpAddr  string `json:"ip_addr"`
	Port    int    `json:"port"`
	SvrID   string `json:"svr_id"`
	SvrName string `json:"svr_name"`
}

// RandomNum 生成随机数,用来模拟随机算法做服务发现时随机挑选一个服务实例
func RandomNum(s int64, e int64) int64 {
	//随机数如果 Seed不变 则生成的随机数一直不变
	rand.Seed(time.Now().UnixNano())
	r := rand.Int63n(e - s)
	return s + r
}


func getRegKeyPrefix(svrName string) string {
	return fmt.Sprintf("%s/%s/", "services", svrName)
}

func getRegKey(svrName string, svrID string) string {
	return fmt.Sprintf("%s%s", getRegKeyPrefix(svrName), svrID)
}

// LocalNodeCache 服务发现时将使用的结点信息缓存到本地,这里有一点不好就是如果调用过一次就会一直存在本地缓存了,需要设计一条缓存淘汰机制
type LocalNodeCache struct {
	sync.RWMutex
	// 
	nodes map[string][]*NodeInfo
}

// 查询本地缓存是否有某个服务
func (n *LocalNodeCache) hasSvr(svrName string) bool {
	_, exist := n.nodes[svrName]
	return exist
}

// AddNode 向本地缓存中添加结点信息
func (n *LocalNodeCache) AddNode(node *NodeInfo) {
	if node == nil {
		return
	}
	n.Lock()
	defer n.Unlock()
	if !n.hasSvr(node.SvrName) {
		n.nodes[node.SvrName] = make([]*NodeInfo, 0, 3)
		n.nodes[node.SvrName] = append(n.nodes[node.SvrName], node)
		return
	}
	for idx, oldNode := range n.nodes[node.SvrName] {
		if oldNode.SvrID == node.SvrID {
			n.nodes[node.SvrName][idx] = node
			return
		}
	}
	n.nodes[node.SvrName] = append(n.nodes[node.SvrName], node)
}

// DelNode 本地缓存删除服务实例信息
func (n *LocalNodeCache) DelNode(svrName string, svrID string) {
	n.Lock()
	defer n.Unlock()
	if !n.hasSvr(svrName) {
		return
	}
	for idx, node := range n.nodes[svrName] {
		if node.SvrID == svrID {
			n.nodes[svrName] = append(n.nodes[svrName][:idx], n.nodes[svrName][idx+1:]...)
			return
		}
	}
}

// GetNode 获取服务实例,一个服务有多个实例,随机挑选一个
func (n *LocalNodeCache) GetNode(svrName string) *NodeInfo {
	nodes, exist := n.nodes[svrName]
	if !exist || len(nodes) == 0 {
		return nil
	}
	log.Printf("LocalNodeCache svrName:%v nodes len:%+v \n", svrName, len(nodes))
	pickIdx := RandomNum(0, int64(len(nodes)))

	log.Printf("LocalNodeCache pick index:%v\n", pickIdx)
	return nodes[pickIdx]
}

服务注册
package etcd

import (
	"context"
	"encoding/json"
	uuid "github.com/satori/go.uuid"
	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
	clientv3 "go.etcd.io/etcd/client/v3"
	"log"
	"strings"
	"time"
)

// Register 定义一个注册器
type Register struct {
	client3   *clientv3.Client
	stop      chan bool
	interval  time.Duration
	leaseTime int64
	leaseID   clientv3.LeaseID
	node      *NodeInfo
}

// RandomStr 随机生成一个字符串用户表示服务的nodeID
func RandomStr(len int) string {
	nUid := uuid.NewV4().String()
	str := strings.Replace(nUid, "-", "", -1)
	if len < 0 || len >= 32 {
		return str
	}
	return str[:len]
}

// NewRegister 注册器构造函数
func NewRegister(svrName string, cli *clientv3.Client, ipAddr string, port int) *Register {
	return &Register{
		client3:   cli,
		interval:  3 * time.Second,
		leaseTime: 5,
		stop:      make(chan bool, 1),
		node: &NodeInfo{
			SvrID:   RandomStr(32),
			SvrName: svrName,
			IpAddr:  ipAddr,
			Port:    port,
		},
	}
}

// Reg 服务注册 v1版本 配合租约一起使用,每次心跳检测时候,是都新建一个租约将key绑定到新的租约上
func (r *Register) Reg() chan error {
	// 启动协程检测心跳, 定期续租
	errChan := make(chan error, 1)
	go func() {
		t := time.NewTicker(r.interval)
		err := r.doReg()
		if err != nil {
			errChan <- err
			log.Println("注册失败,退出")
			return
		}
		for {
			select {
			//case ttl := <-t.C:
			//	r.doReg()
			//	log.Printf("heartbeat check for etcd k:%v t:[%v]\n", r.node.SvrName, ttl)
			case <-t.C:
				r.doReg()
				//log.Printf("heartbeat check for etcd k:%v t:[%v]\n", r.node.SvrName, ttl)
			case <-r.stop:
				log.Println("程序退出,心跳检测协程结束")
				return
			}
		}
	}()
	return errChan
}

// v1 版本,心跳续约如果失败,直接返回,如果网络一旦恢复,可以直接恢复注册 一次心跳检测需要 新建一个租约新put一次 2次io
func (r *Register) doReg() error {
	key := getRegKey(r.node.SvrName, r.node.SvrID)
	cli := r.client3
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	resp, err := cli.Grant(ctx, r.leaseTime)
	if err != nil {
		log.Printf("get grant err:%v\n", err)
		return err
	}
	//log.Printf("new lease:%v ttl:%v\n", resp.ID, resp.TTL)
	r.leaseID = resp.ID
	nodeBy, err := json.Marshal(r.node)
	if err != nil {
		log.Printf("data:%v to json string err:%v", r.node, err)
		return err
	}
	log.Printf("key:%v,node:%v, val:%v\n", key, r.node, string(nodeBy))
	_, err = cli.Put(ctx, key, string(nodeBy), clientv3.WithLease(resp.ID))
	if err != nil {
		log.Printf("put register key err:%v\n", err)
		return err
	}
	return nil
}

// UnReg 服务卸载
func (r *Register) UnReg() {
	r.stop <- true
	r.stop = make(chan bool, 1)
	key := getRegKey(r.node.SvrName, r.node.SvrID)
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	if _, err := r.client3.Delete(ctx, key); err != nil {
		log.Fatalln(err)
	}
	log.Printf("服务:%v 摘除成功\n", r.node.SvrName)
}

// RegV2 服务注册v2版本 服务put kv的时候配合租约一起使用,心跳检测只是去续约租约
func (r *Register) RegV2() error {
	err := r.doReg()
	if err != nil {
		log.Println("注册失败,退出")
		return err
	}
	go func() {
		t := time.NewTicker(r.interval)
		for {
			select {
			case ttl := <-t.C:
				ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
				_, err := r.client3.KeepAliveOnce(ctx, r.leaseID)
				if err != nil && err == rpctypes.ErrLeaseNotFound {
					log.Println("某次超时,导致租约丢失,重新注册")
					r.doReg()
				} else if err != nil {
					log.Printf("租约续期失败:%v t:[%v]\n", err, ttl)
				}
				//log.Printf("heartbeat check for etcd r:%+v t:[%v]\n", r, ttl)
			case <-r.stop:
				log.Println("程序退出,心跳检测协程结束")
				return
			}
		}
	}()
	return nil
}

服务注册单元测试
package etcd

import (
	"context"
	clientv3 "go.etcd.io/etcd/client/v3"
	"log"
	"testing"
	"time"
)

func TestCreateCli(t *testing.T) {
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	cli, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"10.248.174.155:2371", "10.248.174.155:2381", "10.248.174.155:2391"},
		//Endpoints:   []string{"127.0.0.1:2371"},
		DialTimeout: 5 * time.Second,
		Context:     ctx,
	})
	//ca()
	if err != nil {
		// handle error!
		log.Fatalf("get etcd client init error %v", err)
	}
	defer cli.Close()

	_, err = cli.Put(ctx, "k", "v")
	if err != nil {
		// handle error!
		log.Fatalf("put------------ etcd client init error %v", err)
	}
	log.Printf("完毕")
}

func TestRegister_Reg(t *testing.T) {
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:         []string{"10.248.174.155:2371", "10.248.174.155:2381", "10.248.174.155:2391"},
		DialTimeout:       5 * time.Second,
		DialKeepAliveTime: 1 * time.Minute,
		Context:           ctx,
	})
	if err != nil {
		// handle error!
		log.Fatalf("get etcd client init error %v", err)
	}
	defer cli.Close()
	register := NewRegister("svr1", cli, "127.0.0.1", 8080)
	// 主进程处理业务,另起一个协程进行定时心跳
	// Reg,RegV2 共同点如果首次注册失败(项目启动),则直接主进程退出
	// 使用第一种注册方式,是间隔3s创建一个新的租约,租约时长为5s,将kv,与新的租约进行绑定。每次心跳即续约有2次io 生成新租约与绑定租约。容错:由于每次都是创建新的租约,网络波动与否几乎不影响流程,弊端是可能存在某一时刻一个实例注册了多个结点
	//errChan = register.Reg()
	// 使用第二种注册方式,将首次注册的租约id记录下来,将kv,与租约进行绑定。后面每次心跳只需要进行租约续期,续期5s。容错:如果网络波动导致续约失败,等待网络恢复会重新注册新的租约
	err = register.RegV2()
	if err != nil {
		log.Fatalf("注册失败 error %v", err)
	}
	for true {
		// 制定一个定时器,模拟20s后摘除服务
		t := time.NewTicker(20 * time.Second)
		select {
		case tt := <-t.C:
			register.UnReg()
			log.Printf("服务卸载,tt:%v\n", tt)
			return
		}
	}
}

服务发现

服务发现的主要思路是使用服务名获取服务注册上去的信息,利用etcd提供的前缀查询api来实现

package etcd

import (
	"context"
	"encoding/json"
	"fmt"
	clientv3 "go.etcd.io/etcd/client/v3"
	"log"
	"strings"
	"time"
)

// Discovery 发现器
type Discovery struct {
	cli       *clientv3.Client
	info      *NodeInfo
	nodeCache *LocalNodeCache
}

func NewDiscovery(cli *clientv3.Client) *Discovery {
	return &Discovery{
		cli:  cli,
		info: &NodeInfo{},
		nodeCache: &LocalNodeCache{
			nodes: make(map[string][]*NodeInfo),
		},
	}
}

// Watch etcd watch功能监听结点信息变更更新本地缓存信息
func (d *Discovery) Watch() {
	watcher := clientv3.NewWatcher(d.cli)
	watchChan := watcher.Watch(context.Background(), "services/", clientv3.WithPrefix())
	for {
		select {
		case resp := <-watchChan:
			d.watchEvent(resp.Events)
		}
	}
}

func (d *Discovery) watchEvent(evs []*clientv3.Event) {
	for _, ev := range evs {
		log.Printf("event:%+v\n", ev)
		switch ev.Type {
		case clientv3.EventTypePut:
			node := &NodeInfo{}
			err := json.Unmarshal(ev.Kv.Value, node)
			if err != nil {
				log.Printf("Discovery watchEvent json.Unmarshal err:%+v\n", err)
				continue
			}
			if d.nodeCache.hasSvr(node.SvrName) {
				log.Printf("只缓存本地已存在的svrvice:%v\n", node)
				d.nodeCache.AddNode(node)
				return
			}
			log.Printf(fmt.Sprintf("watchEvent put svr:%v new node:%s", node.SvrName, string(ev.Kv.Value)))
		case clientv3.EventTypeDelete:
			key := string(ev.Kv.Key)
			keys := strings.Split(key, "/")
			if len(keys) != 3 {
				log.Printf("service key:%v illegal \n", key)
				continue
			}
			d.nodeCache.DelNode(keys[1], keys[2])
			log.Printf(fmt.Sprintf("watchEvent delete svr:%v", string(ev.Kv.Key)))
		}
	}
}

// GetNode 服务发现,根据服务名随机获取一个服务实例,优先去本地缓存查,如果本地缓存没有则去etcd上查询结点信息,并且缓存起来,再从缓存中获取
func (d *Discovery) GetNode(svrName string) *NodeInfo {
	node := d.nodeCache.GetNode(svrName)
	if node != nil {
		log.Printf("本地缓存找到node:%v\n", node)
		return node
	}
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	resp, err := d.cli.Get(ctx, getRegKeyPrefix(svrName), clientv3.WithPrefix())
	if err != nil {
		log.Printf("Discovery get kv err:%+v\n", err)
		return nil
	}
	for _, v := range resp.Kvs {
		node := &NodeInfo{}
		err = json.Unmarshal(v.Value, node)
		if err != nil {
			log.Printf("Discovery json.Unmarshal err:%+v", err)
			continue
		}
		d.nodeCache.AddNode(node)
		log.Printf("Discovery add node:%+v", node)
	}
	return d.nodeCache.GetNode(svrName)
}

服务发现单元测试参考
package etcd

import (
	"context"
	clientv3 "go.etcd.io/etcd/client/v3"
	"log"
	"testing"
	"time"
)

func TestDiscovery_GetNode(t *testing.T) {
	// 模拟现实场景根据 serviceKey 获取一个服务结点信息。主要思想为主进程处理业务,另起协程监听服务结点的变动
	// 1. 懒加载,发现本地缓存没有,则去etcd上拉取对应key的信息,如果还没有直接返回nil,否则拉取到的所有结点都存入本地缓存
	// 2. 然后从本地缓存中采用伪随机算法随机取一个结点
	// 3. 额外启动一个协程监听所有的服务结点信息的变动,如果有新增结点 *** 作,判断是否是需要本地缓存key是否有,没有就不 *** 作。这里的考虑是只需缓存和自己相关的结点信息
	// 4. 监听事件如果是 delete 判断本地缓存key是否存在,如果不存在则直接结束,如果存在,则根据结点的 serviceID 进行结点摘除
	// 缺陷
	// 1. 本地缓存无法做完全意义上的负载均衡,顶多只能是将打到当前实例上的流量均分给这个结点的下游
	// 2. 本地缓存缺少过期机制,1)有可能存在某个服务只调用一次,那就会一直缓存下来无法过期 2)如果出于某种异常情况,本地缓存的结点列表里面如果有坏死结点,无法进行剔除
	ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:         []string{"10.248.174.155:2371", "10.248.174.155:2381", "10.248.174.155:2391"},
		DialTimeout:       5 * time.Second,
		DialKeepAliveTime: 1 * time.Minute,
		Context:           ctx,
	})
	if err != nil {
		// handle error!
		log.Fatalf("get etcd client init error %v", err)
	}
	defer cli.Close()
	discovery := NewDiscovery(cli)
	go func() {
		discovery.Watch()
	}()
	for {
		ticker := time.NewTicker(10 * time.Second)
		select {
		case tt := <-ticker.C:
			svrName := "svr1"
			node := discovery.GetNode(svrName)
			log.Printf("ttl:%v,svr:%v, 获取node:%v", tt, svrName, node)
		}
	}

}

总结

本文没有深入介绍etcd中一些原理性东西,主要介绍了etcd的安装及使用,介绍了一些常用的api *** 作,最后用go语言简单实现了一遍服务注册与发现的流程。
这些流程都只是简单的模拟,还有很多设计不合理的地方可以一起探讨一下。

待优化点

上文的服务发现本地缓存没有过期机制,使用了一次发现就会一直存在实例的本地缓存中服务发现的负载均衡算法未实现,本文利用伪随机方式简单替代

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/langs/993864.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存