1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| package grpcresolver
import ( "net" "strconv" "sync" "sync/atomic"
"github.com/hashicorp/consul/api" "google.golang.org/grpc/naming" )
type watchEntry struct { addr string modi uint64 last uint64 }
type consulWatcher struct { down int32 c *api.Client service string mu sync.Mutex watched map[string]*watchEntry lastIndex uint64 }
func (w *consulWatcher) Close() { atomic.StoreInt32(&w.down, 1) }
func (w *consulWatcher) Next() ([]*naming.Update, error) { w.mu.Lock() defer w.mu.Unlock() watched := w.watched lastIndex := w.lastIndex retry: services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{ WaitIndex: lastIndex, }) if err != nil { return nil, err } if lastIndex == meta.LastIndex { if atomic.LoadInt32(&w.down) != 0 { return nil, nil } goto retry } lastIndex = meta.LastIndex var updating []*naming.Update for _, s := range services { ws := watched[s.ServiceID] if ws == nil { ws = &watchEntry{ addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)), modi: s.ModifyIndex, } watched[s.ServiceID] = ws updating = append(updating, &naming.Update{ Op: naming.Add, Addr: ws.addr, }) } else if ws.modi != s.ModifyIndex { updating = append(updating, &naming.Update{ Op: naming.Delete, Addr: ws.addr, }) ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)) ws.modi = s.ModifyIndex updating = append(updating, &naming.Update{ Op: naming.Add, Addr: ws.addr, }) } ws.last = lastIndex } for id, ws := range watched { if ws.last != lastIndex { delete(watched, id) updating = append(updating, &naming.Update{ Op: naming.Delete, Addr: ws.addr, }) } } w.watched = watched w.lastIndex = lastIndex return updating, nil }
type consulResolver api.Client
func (r *consulResolver) Resolve(target string) (naming.Watcher, error) { return &consulWatcher{ c: (*api.Client)(r), service: target, watched: make(map[string]*watchEntry), }, nil }
func ForConsul(reg *api.Client) naming.Resolver { return (*consulResolver)(reg) }
|