grpc 使用 consul 服务发现

grpc 使用 consul 服务发现

grpc 可以自定义 Balancer,而在 Balancer 基础上可以通过实现自定义的 naming.Resolver 来达到使用 consul 等服务发现组件来发现服务的功能。

大概流程如下:

  1. grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer
  2. Balancer 会通过 naming.Resolver 去解析(Resolve) Dial 传入的 target 得到一个 naming.Watcher
  3. naming.Watcher 持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer

implement

提供一个简单实现,目录结构如下:

1
2
3
4
5
6
7
8
9
├── example
│   ├── helloclient
│   │   └── main.go
│   ├── helloproto
│   │   └── hello.proto
│   └── helloserver
│   └── main.go
└── grpcresolver
└── consul.go

grpcresolver/consul.go

实现 consulResolver 把依赖的服务名作为 Watch 的 target

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package grpcresolver

import (
"net"
"strconv"
"sync"
"sync/atomic"

"github.com/hashicorp/consul/api"
"google.golang.org/grpc/naming"
)

type watchEntry struct {
addr string
modi uint64
last uint64
}

type consulWatcher struct {
down int32
c *api.Client
service string
mu sync.Mutex
watched map[string]*watchEntry
lastIndex uint64
}

func (w *consulWatcher) Close() {
atomic.StoreInt32(&w.down, 1)
}

func (w *consulWatcher) Next() ([]*naming.Update, error) {
w.mu.Lock()
defer w.mu.Unlock()
watched := w.watched
lastIndex := w.lastIndex
retry:
services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{
WaitIndex: lastIndex,
})
if err != nil {
return nil, err
}
if lastIndex == meta.LastIndex {
if atomic.LoadInt32(&w.down) != 0 {
return nil, nil
}
goto retry
}
lastIndex = meta.LastIndex
var updating []*naming.Update
for _, s := range services {
ws := watched[s.ServiceID]
if ws == nil {
ws = &watchEntry{
addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)),
modi: s.ModifyIndex,
}
watched[s.ServiceID] = ws
updating = append(updating, &naming.Update{
Op: naming.Add,
Addr: ws.addr,
})
} else if ws.modi != s.ModifyIndex {
updating = append(updating, &naming.Update{
Op: naming.Delete,
Addr: ws.addr,
})
ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort))
ws.modi = s.ModifyIndex
updating = append(updating, &naming.Update{
Op: naming.Add,
Addr: ws.addr,
})
}
ws.last = lastIndex
}
for id, ws := range watched {
if ws.last != lastIndex {
delete(watched, id)
updating = append(updating, &naming.Update{
Op: naming.Delete,
Addr: ws.addr,
})
}
}
w.watched = watched
w.lastIndex = lastIndex
return updating, nil
}

type consulResolver api.Client

func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
return &consulWatcher{
c: (*api.Client)(r),
service: target,
watched: make(map[string]*watchEntry),
}, nil
}

func ForConsul(reg *api.Client) naming.Resolver {
return (*consulResolver)(reg)
}

example/helloproto/hello.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";

package helloproto;

service Hello {
rpc Say(HelloReq) returns (HelloResp);
}

message HelloReq {
string text = 1;
}

message HelloResp {
string text = 1;
}

生成 hello.pb.go

protoc –go_out=plugins=grpc:. hello.proto

example/helloserver/main.go

服务端注册流程和大多数 consul 服务注册类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package main

import (
"context"
"crypto/rand"
"encoding/hex"
"flag"
"fmt"
"log"
"net"
"playground/gogrpc/example/helloproto"
"strconv"
"time"

"github.com/hashicorp/consul/api"
grpc "google.golang.org/grpc"
)

type helloserver struct {
id string
}

func (s *helloserver) Say(ctx context.Context, req *helloproto.HelloReq) (*helloproto.HelloResp, error) {
log.Printf("[%s] reply %s", s.id, req.Text)
return &helloproto.HelloResp{
Text: fmt.Sprintf("[%s] %s", s.id, req.Text),
}, nil
}

func main() {
const ttl = 30 * time.Second
host := flag.String("h", "127.0.0.1", "host")
port := flag.Int("p", 9876, "port")
flag.Parse()
l, err := net.Listen("tcp", net.JoinHostPort(*host, strconv.Itoa(*port)))
if err != nil {
log.Fatalln(err)
}
registry, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalln(err)
}
var h [16]byte
rand.Read(h[:])
id := fmt.Sprintf("helloserver-%s", hex.EncodeToString(h[:]))
err = registry.Agent().ServiceRegister(&api.AgentServiceRegistration{
ID: id,
Name: "helloserver",
Port: *port,
Address: *host,
Check: &api.AgentServiceCheck{
TTL: (ttl + time.Second).String(),
Timeout: time.Minute.String(),
},
})
if err != nil {
log.Fatalln(err)
}
go func() {
checkid := "service:" + id
for range time.Tick(ttl) {
err := registry.Agent().PassTTL(checkid, "")
if err != nil {
log.Fatalln(err)
}
}
}()
svr := grpc.NewServer()
helloproto.RegisterHelloServer(svr, &helloserver{
id: id,
})
log.Println("serving", id, l.Addr())
err = svr.Serve(l)
if err != nil {
log.Fatalln(err)
}
}

example/helloclient/main.go

客户端使用自定义 resolver 和 grpc.RoundRobin 创建 Balancer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"context"
"log"
"playground/gogrpc/example/helloproto"
"playground/gogrpc/grpcresolver"
"time"

"github.com/hashicorp/consul/api"
"google.golang.org/grpc"
)

func main() {
registry, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.Fatalln(err)
}
lbrr := grpc.RoundRobin(grpcresolver.ForConsul(registry))
cc, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lbrr))
if err != nil {
log.Fatalln(err)
}
client := helloproto.NewHelloClient(cc)
for range time.Tick(time.Second) {
resp, err := client.Say(context.Background(), &helloproto.HelloReq{
Text: "hello: " + time.Now().String(),
})
if err != nil {
log.Println("say failed", err)
continue
}
log.Println("server reply", resp)
}
}