add etcd module
This commit is contained in:
95
pkg/etcd/discovery.go
Normal file
95
pkg/etcd/discovery.go
Normal file
@@ -0,0 +1,95 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
clientV3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type RemoteService struct {
|
||||
name string
|
||||
node map[string]string
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
type Resolver struct {
|
||||
v3 *clientV3.Client
|
||||
endpoint []string
|
||||
}
|
||||
|
||||
// NewResolver 构造 resolver 对象
|
||||
func NewResolver(endpoint []string) (*Resolver, error) {
|
||||
client, err := clientV3.New(clientV3.Config{
|
||||
Endpoints: endpoint,
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Resolver{v3: client, endpoint: endpoint}, nil
|
||||
}
|
||||
|
||||
// Close 关闭client
|
||||
func (r *Resolver) Close() error {
|
||||
return r.v3.Close()
|
||||
}
|
||||
|
||||
// Discovery 发现服务
|
||||
func (r *Resolver) Discovery(serviceName string) (*RemoteService, error) {
|
||||
service := &RemoteService{
|
||||
name: serviceName,
|
||||
node: make(map[string]string, 1),
|
||||
}
|
||||
|
||||
kv := clientV3.NewKV(r.v3)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
resp, err := kv.Get(ctx, serviceName, clientV3.WithPrefix())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service.mutex.Lock()
|
||||
for _, kv := range resp.Kvs {
|
||||
service.node[string(kv.Key)] = string(kv.Value)
|
||||
}
|
||||
service.mutex.Unlock()
|
||||
|
||||
go r.watchServiceUpdate(service)
|
||||
|
||||
return service, nil
|
||||
}
|
||||
|
||||
// watchServiceUpdate 监控服务目录下的事件
|
||||
func (r *Resolver) watchServiceUpdate(service *RemoteService) {
|
||||
watcher := clientV3.NewWatcher(r.v3)
|
||||
// Watch 服务目录下的更新
|
||||
watchChan := watcher.Watch(context.TODO(), service.name, clientV3.WithPrefix())
|
||||
for watchResp := range watchChan {
|
||||
for _, event := range watchResp.Events {
|
||||
service.mutex.Lock()
|
||||
switch event.Type {
|
||||
case mvccpb.PUT: // PUT事件,目录下有了新key
|
||||
service.node[string(event.Kv.Key)] = string(event.Kv.Value)
|
||||
case mvccpb.DELETE: // DELETE事件,目录中有key被删掉(Lease过期,key 也会被删掉)
|
||||
delete(service.node, string(event.Kv.Key))
|
||||
}
|
||||
service.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetName 获取服务名称
|
||||
func (r *RemoteService) GetName() string {
|
||||
return r.name
|
||||
}
|
||||
|
||||
// GetNode 获取服务列表
|
||||
func (r *RemoteService) GetNode() map[string]string {
|
||||
return r.node
|
||||
}
|
||||
108
pkg/etcd/register.go
Normal file
108
pkg/etcd/register.go
Normal file
@@ -0,0 +1,108 @@
|
||||
package etcd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
clientV3 "go.etcd.io/etcd/client/v3"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
v3 *clientV3.Client
|
||||
lease *clientV3.LeaseGrantResponse
|
||||
interval int32
|
||||
serviceName string
|
||||
}
|
||||
|
||||
func NewClient(endpoint []string) (*Client, error) {
|
||||
client, err := clientV3.New(clientV3.Config{
|
||||
Endpoints: endpoint,
|
||||
DialTimeout: 5 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Client{v3: client, interval: 5}, nil
|
||||
}
|
||||
|
||||
// Close 关闭client
|
||||
func (e *Client) Close() error {
|
||||
return e.v3.Close()
|
||||
}
|
||||
|
||||
// Register 注册服务
|
||||
func (e *Client) Register(serviceName string, serviceAddress string) error {
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
e.lease, err = e.v3.Grant(ctx, int64(e.interval+1))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.serviceName = serviceName + "/" + strconv.Itoa(int(e.lease.ID))
|
||||
_, err = e.v3.Put(ctx, e.serviceName, getValue(serviceAddress), clientV3.WithLease(e.lease.ID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.keepAlive(serviceName, serviceAddress)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deregister 注销服务
|
||||
func (e *Client) Deregister() error {
|
||||
_, err := e.v3.Delete(context.Background(), e.serviceName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// keepAlive 异步续约
|
||||
func (e *Client) keepAlive(name string, addr string) {
|
||||
// 永久续约,续约成功后,etcd客户端和服务器会保持通讯,通讯成功会写数据到返回的通道中
|
||||
// 停止进程后,服务器链接不上客户端,相应key租约到期会被服务器自动删除
|
||||
c, err := e.v3.KeepAlive(e.v3.Ctx(), e.lease.ID)
|
||||
go func() {
|
||||
if err == nil {
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-c:
|
||||
if !ok { //续约失败
|
||||
e.v3.Revoke(e.v3.Ctx(), e.lease.ID)
|
||||
e.Register(name, addr)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
defer e.v3.Revoke(e.v3.Ctx(), e.lease.ID)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// getValue etcd服务发现时,底层解析的是一个json串,且包含Addr字段
|
||||
func getValue(addr string) string {
|
||||
// return "{\"Addr\":\"" + localIP() + addr + "\"}"
|
||||
|
||||
return localIP() + addr
|
||||
}
|
||||
|
||||
// localIP 获取当前ip
|
||||
func localIP() string {
|
||||
addrList, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
for _, address := range addrList {
|
||||
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
|
||||
if ipnet.IP.To4() != nil {
|
||||
return ipnet.IP.String()
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
Reference in New Issue
Block a user