grpc 一致性 hash 负载均衡

grpc 一致性 hash 负载均衡

go-grpc 库只提供了一个默认的轮询负载均衡器 grpc.RoundRobin,通过实现 grpc.Balancer 可以实现自定义规则的 Balancer,所以实现一个一致性 hash 的 grpc.Balancer。

Implement

这篇文章的基础上,增加 grpclb 包实现一致性 hash 负载均衡器。

├── example
│   ├── helloclient
│   │   ├── helloclient
│   │   └── main.go
│   ├── helloproto
│   │   ├── hello.pb.go
│   │   └── hello.proto
│   └── helloserver
│       └── main.go
├── grpclb
│   └── consistentlb.go
└── grpcresolver
    └── consul.go

grpclb/consistentlb.go

使用 github.com/vizee/consistent 的 ketama 实现 consistent hashing ring

package grpclb

import (
    "context"
    "errors"
    "strings"
    "sync"
    "sync/atomic"

    "github.com/vizee/consistent"
    "google.golang.org/grpc"
    "google.golang.org/grpc/naming"
)

const (
    ketameNodeNum = 64
)

var (
    ErrBalancerClosed = errors.New("balancer closed")
    ErrMissingHashKey = errors.New("missing hash key")
    ErrNoNode         = errors.New("no node")
    ErrUnavailable    = errors.New("unavailable")
)

var (
    HashKey interface{} = 0
)

type lbaddr struct {
    ok   int32
    addr string
}

type consistentlb struct {
    mu     sync.RWMutex
    closed bool
    addrs  map[string]*lbaddr
    ketama *consistent.Ketama
    wait   chan struct{}
    notify chan []grpc.Address
    w      naming.Watcher
    r      naming.Resolver
}

func (b *consistentlb) update(updates []*naming.Update) {
    for _, u := range updates {
        switch u.Op {
        case naming.Add:
            b.addrs[u.Addr] = &lbaddr{
                addr: u.Addr,
            }
        case naming.Delete:
            delete(b.addrs, u.Addr)
        }
    }
    // 每次地址更新时重建 ketama ring
    b.ketama.Reset(len(b.addrs) * ketameNodeNum)
    open := make([]grpc.Address, len(b.addrs))
    i := 0
    for addr := range b.addrs {
        open[i] = grpc.Address{
            Addr: addr,
        }
        b.ketama.Add(addr, ketameNodeNum)
        i++
    }
    b.ketama.Build()

    b.notify <- open
}

func (b *consistentlb) watch(w naming.Watcher) {
    done := false
    for !done {
        updates, err := w.Next()
        if err != nil {
            return
        }
        b.mu.Lock()
        if b.w == w {
            b.update(updates)
        } else {
            // 如果不一致, 说明当前 w 被替换
            done = true
        }
        b.mu.Unlock()
    }
}

func (b *consistentlb) Start(target string, config grpc.BalancerConfig) error {
    b.mu.Lock()
    defer b.mu.Unlock()
    if b.closed {
        return ErrBalancerClosed
    }
    if b.r == nil {
        // 像 grpc.RoundRobin 一样允许直接指定地址列表
        addrs := strings.Split(target, ";")
        updates := make([]*naming.Update, len(addrs))
        for i, addr := range addrs {
            updates[i] = &naming.Update{
                Op:   naming.Add,
                Addr: addr,
            }
        }
        b.update(updates)
        return nil
    }
    // 只能存在一个 watcher
    w, err := b.r.Resolve(target)
    if err != nil {
        return err
    }
    if b.w != nil {
        b.w.Close()
    }
    // 更新 watcher 需要放弃之前所有的地址
    b.w = w
    b.addrs = make(map[string]*lbaddr)
    b.ketama.Reset(0)

    go b.watch(w)
    return nil
}

func (b *consistentlb) Up(addr grpc.Address) (down func(error)) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    a := b.addrs[addr.Addr]
    // 标记地址为 ok(connected)
    if a == nil || !atomic.CompareAndSwapInt32(&a.ok, 0, 1) {
        return nil
    }
    return func(error) {
        // 只对地址标记处理
        atomic.StoreInt32(&a.ok, 0)
    }
}

func (b *consistentlb) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) {
    key, ok := ctx.Value(HashKey).(uint32)
    if !ok {
        err = ErrMissingHashKey
        return
    }
retry:
    b.mu.RLock()
    if b.closed {
        b.mu.RUnlock()
        err = ErrBalancerClosed
        return
    }
    node, ok := b.ketama.Get32(key)
    if !ok {
        b.mu.RUnlock()
        if opts.BlockingWait {
            b.mu.Lock()
            wait := b.wait
            if wait == nil {
                wait = make(chan struct{})
                b.wait = wait
            }
            b.mu.Unlock()
            select {
            case <-ctx.Done():
                err = ctx.Err()
            case <-wait:
                goto retry
            }
        } else {
            err = ErrNoNode
        }
        return
    }
    // 与轮询不同, 为了保证 key 总能对应到固定 node, 允许拿到未连接的地址
    a := b.addrs[node]
    if atomic.LoadInt32(&a.ok) != 0 {
        addr = grpc.Address{
            Addr: a.addr,
        }
    } else {
        err = ErrUnavailable
    }
    b.mu.RUnlock()
    return
}

func (b *consistentlb) Notify() <-chan []grpc.Address {
    return b.notify
}

func (b *consistentlb) Close() error {
    b.mu.Lock()
    defer b.mu.Unlock()
    if b.closed {
        return ErrBalancerClosed
    }
    if b.w != nil {
        b.w.Close()
        b.w = nil
    }
    if b.wait == nil {
        b.wait = make(chan struct{})
    }
    // 唤醒所有的 wait
    close(b.wait)
    close(b.notify)
    b.ketama = nil
    b.closed = true
    return nil
}

func UseConsistent(r naming.Resolver) grpc.Balancer {
    return &consistentlb{
        r:      r,
        addrs:  make(map[string]*lbaddr),
        notify: make(chan []grpc.Address, 1),
        ketama: &consistent.Ketama{},
    }
}

修改 example/helloclient/main.go

替换 grpc.RoundRobin

package main

import (
    "context"
    "log"
    "playground/gogrpc/example/helloproto"
    "playground/gogrpc/grpclb"
    "playground/gogrpc/grpcresolver"
    "time"

    "github.com/hashicorp/consul/api"
    "google.golang.org/grpc"
)

func main() {
    registry, err := api.NewClient(api.DefaultConfig())
    if err != nil {
        log.Fatalln(err)
    }
    lb := grpclb.UseConsistent(grpcresolver.ForConsul(registry))
    cc, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lb))
    if err != nil {
        log.Fatalln(err)
    }
    client := helloproto.NewHelloClient(cc)
    for range time.Tick(time.Second) {
        resp, err := client.Say(context.Background(), &helloproto.HelloReq{
            Text: "hello: " + time.Now().String(),
        })
        if err != nil {
            log.Println("say failed", err)
            continue
        }
        log.Println("server reply", resp)
    }
}

标签: grpc, go

添加新评论