grpc 使用 consul 服务发现

grpc 使用 consul 服务发现

grpc 可以自定义 Balancer,而在 Balancer 基础上可以通过实现自定义的 naming.Resolver 来达到使用 consul 等服务发现组件来发现服务的功能。

大概流程如下:

  1. grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer
  2. Balancer 会通过 naming.Resolver 去解析(Resolve) Dial 传入的 target 得到一个 naming.Watcher
  3. naming.Watcher 持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer

implement

提供一个简单实现,目录结构如下:

├── example
│   ├── helloclient
│   │   └── main.go
│   ├── helloproto
│   │   └── hello.proto
│   └── helloserver
│       └── main.go
└── grpcresolver
    └── consul.go

grpcresolver/consul.go

实现 consulResolver 把依赖的服务名作为 Watch 的 target

package grpcresolver

import (
    "net"
    "strconv"
    "sync"
    "sync/atomic"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc/naming"
)

type watchEntry struct {
    addr string
    modi uint64
    last uint64
}

type consulWatcher struct {
    down      int32
    c         *api.Client
    service   string
    mu        sync.Mutex
    watched   map[string]*watchEntry
    lastIndex uint64
}

func (w *consulWatcher) Close() {
    atomic.StoreInt32(&w.down, 1)
}

func (w *consulWatcher) Next() ([]*naming.Update, error) {
    w.mu.Lock()
    defer w.mu.Unlock()
    watched := w.watched
    lastIndex := w.lastIndex
retry:
    services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{
        WaitIndex: lastIndex,
    })
    if err != nil {
        return nil, err
    }
    if lastIndex == meta.LastIndex {
        if atomic.LoadInt32(&w.down) != 0 {
            return nil, nil
        }
        goto retry
    }
    lastIndex = meta.LastIndex
    var updating []*naming.Update
    for _, s := range services {
        ws := watched[s.ServiceID]
        if ws == nil {
            ws = &watchEntry{
                addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)),
                modi: s.ModifyIndex,
            }
            watched[s.ServiceID] = ws
            updating = append(updating, &naming.Update{
                Op:   naming.Add,
                Addr: ws.addr,
            })
        } else if ws.modi != s.ModifyIndex {
            updating = append(updating, &naming.Update{
                Op:   naming.Delete,
                Addr: ws.addr,
            })
            ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort))
            ws.modi = s.ModifyIndex
            updating = append(updating, &naming.Update{
                Op:   naming.Add,
                Addr: ws.addr,
            })
        }
        ws.last = lastIndex
    }
    for id, ws := range watched {
        if ws.last != lastIndex {
            delete(watched, id)
            updating = append(updating, &naming.Update{
                Op:   naming.Delete,
                Addr: ws.addr,
            })
        }
    }
    w.watched = watched
    w.lastIndex = lastIndex
    return updating, nil
}

type consulResolver api.Client

func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
    return &consulWatcher{
        c:       (*api.Client)(r),
        service: target,
        watched: make(map[string]*watchEntry),
    }, nil
}

func ForConsul(reg *api.Client) naming.Resolver {
    return (*consulResolver)(reg)
}

example/helloproto/hello.proto

syntax = "proto3";

package helloproto;

service Hello {
    rpc Say(HelloReq) returns (HelloResp);
}

message HelloReq {
    string text = 1;
}

message HelloResp {
    string text = 1;
}

生成 hello.pb.go

protoc --go_out=plugins=grpc:. hello.proto

example/helloserver/main.go

服务端注册流程和大多数 consul 服务注册类似

package main

import (
    "context"
    "crypto/rand"
    "encoding/hex"
    "flag"
    "fmt"
    "log"
    "net"
    "playground/gogrpc/example/helloproto"
    "strconv"
    "time"

    "github.com/hashicorp/consul/api"
    grpc "google.golang.org/grpc"
)

type helloserver struct {
    id string
}

func (s *helloserver) Say(ctx context.Context, req *helloproto.HelloReq) (*helloproto.HelloResp, error) {
    log.Printf("[%s] reply %s", s.id, req.Text)
    return &helloproto.HelloResp{
        Text: fmt.Sprintf("[%s] %s", s.id, req.Text),
    }, nil
}

func main() {
    const ttl = 30 * time.Second
    host := flag.String("h", "127.0.0.1", "host")
    port := flag.Int("p", 9876, "port")
    flag.Parse()
    l, err := net.Listen("tcp", net.JoinHostPort(*host, strconv.Itoa(*port)))
    if err != nil {
        log.Fatalln(err)
    }
    registry, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatalln(err)
    }
    var h [16]byte
    rand.Read(h[:])
    id := fmt.Sprintf("helloserver-%s", hex.EncodeToString(h[:]))
    err = registry.Agent().ServiceRegister(&api.AgentServiceRegistration{
        ID:      id,
        Name:    "helloserver",
        Port:    *port,
        Address: *host,
        Check: &api.AgentServiceCheck{
            TTL:     (ttl + time.Second).String(),
            Timeout: time.Minute.String(),
        },
    })
    if err != nil {
        log.Fatalln(err)
    }
    go func() {
        checkid := "service:" + id
        for range time.Tick(ttl) {
            err := registry.Agent().PassTTL(checkid, "")
            if err != nil {
                log.Fatalln(err)
            }
        }
    }()
    svr := grpc.NewServer()
    helloproto.RegisterHelloServer(svr, &helloserver{
        id: id,
    })
    log.Println("serving", id, l.Addr())
    err = svr.Serve(l)
    if err != nil {
        log.Fatalln(err)
    }
}

example/helloclient/main.go

客户端使用自定义 resolver 和 grpc.RoundRobin 创建 Balancer

package main

import (
    "context"
    "log"
    "playground/gogrpc/example/helloproto"
    "playground/gogrpc/grpcresolver"
    "time"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc"
)

func main() {
    registry, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatalln(err)
    }
    lbrr := grpc.RoundRobin(grpcresolver.ForConsul(registry))
    cc, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lbrr))
    if err != nil {
        log.Fatalln(err)
    }
    client := helloproto.NewHelloClient(cc)
    for range time.Tick(time.Second) {
        resp, err := client.Say(context.Background(), &helloproto.HelloReq{
            Text: "hello: " + time.Now().String(),
        })
        if err != nil {
            log.Println("say failed", err)
            continue
        }
        log.Println("server reply", resp)
    }
}

标签: go, grpc

添加新评论