NATS 2.9 JetStream

从 STAN 说起

去年本想水一篇关于 NATS JetStream 的博客,当时写了一部分,结果翻资料翻到一篇博客 淺談 NATS、STAN 和 JetStream 兩三事,把我想说的基本都提到了,所以我当时弃坑了。最近用上了 NATS 2.9 版本,所以想着把新了解到的概念和之前想聊的一些内容都整理一下。如果你对 NATS 了解不多,非常推荐先看看我前面提到的那篇文章。

在升到 NATS JetStream 之前,我一直在用 STAN(aka NATS Streaming)。STAN 是 NATS 上一代持久化方案,是一个独立的 Server,它内嵌了 NATS Server,在这之上构建了一个持久化层,并且支持集群,集群节点之间使用 Raft 算法。STAN 在使用上基本接近 NATS,但是有个别要注意的地方。

一个是客户端连接时要提供 Cluster ID 和 Client ID。可以把 NATS 抽象看作 STAN 的网络层,客户端连入的是 NATS 的网络,访问这个网络上的 STAN Server,那么连接这个网络上的 Cluster 需要说明;Client ID 必须是唯一的,STAN 用 Client ID 和 Durable Name 区分 Consumer 持久化视图。

还有一个比较重要的是断线重连问题,也是我们踩过的坑。基于前面提到的抽象理解,STAN 构建于 NATS 网络层之上,客户端实际上没有真正连接到 STAN Server,所以断线重连机制变得复杂。记得当时是阿里云网络波动,STAN 客户端集体掉线后没有恢复,造成了不小影响。后来我封装 stan.go 实现了一个支持重连的客户端,通过 SetConnectionLostHandler 在连接断开时创建新连接,并且支持在连接切换后恢复订阅。

不过这些都不重要了,STAN 已经被标记为 deprecated,只维护到 2023.6。

JetStream

JetStream 是 NATS 2.2 开始内置的持久化机制,满足 At-Least-Once 语义,也提供了一些机制支持 Exactly-Once 语义。目前发展到 2.9.15 版本,已经很好用了。

Stream

Stream 是 JetStream 消息存储单元,在 Stream 配置消息的存储和管理规则,如果你熟悉 STAN Store 配置,会对 Stream 部分配置非常眼熟。挑几个配置谈谈我的理解:

  • Subjects:Stream 和 Subject 命名没有联系,当发现消息的 Subject 如果和 Stream 关联,那么就会进入到 Stream 的消息存储中。在同一个 Stream 中的所有 Subject 共享一个 Stream 配额。
  • Duplicate Window:消息重复窗口,配合 Nats-Msg-Id 可以保证窗口内消息不会重复投递。
  • Retention Policy:
    • LimitsPolicy:消息会一直保留,直到配额上限才会触发消息删除
    • InterestPolicy:在 LimitsPolicy 的前提下,消息在每个正在订阅的 Consumer 都消费后删除,如果没有 Consumer 立即删除消息
    • WorkQueuePolicy:在 LimitsPolicy 的前提下,消息在 Consumer 消费(Ack)后删除。同一个 Subject 最多只能有一个 Consumer
  • Discard Policy:
    • DiscardOld:当消息到达配额上限时,优先丢弃旧消息
    • DiscardNew:当消息到达配额上限时,拒收新消息
    • DiscardNewPerSubject:2.9 的新特征,是一个拓展配置,按 Subject 执行 DiscardNew 策略。官方给了一个用法叫 Infinite message deduplication,不依赖 Duplicate Window 和 Nats-Msg-Id,结合 Per Subject Messages Limit(例如设为 1) 和 DiscardNewPerSubject,把 Subject 看作一个事务或者消息 ID,来保证不会重复投递。

Consumer 和订阅

Consumer 是订阅的状态视图。Consumer 维护消费配置,跟踪消息投递和 ACK,会影响到 Stream 是否能删除一条消息。Stream 从理解上也算订阅的单元,即 Consumer 创建一个订阅,关联一个 Stream,在上面过滤感兴趣的 Subject。

Consumer 中比较重要的概念是 Durable/Ephemeral 和 Push-based/Pull-based 这两块。

Durable 和 Ephemeral

Consumer 配置的 Durable 字段是空的话,这个 Consumer 就是一个 Ephemeral Consumer,这里实际上又涉及到另一个字段:Name。大致流程是这样的:如果 Consumer DurableName 都为空,NATS Server 会生成一个唯一的 name 作为 Ephemeral Consumer 的 Name,用作 Consumer 视图名,这个 Name 会在 Consumer 创建成功以后,返回给客户端。如果你查看 nats-server 代码,甚至会发现 Durable is deprecated. 的注释。

客户端正常退出(Drain 或者 Unsubscribe)的话,会主动在 Stream 上删除自己创建的 Consumer,但客户端故障退出后,新客户端会认为自己不是 Consumer 的创建者,它在退出时不会删除 Consumer,再然后如果客户端退出后永远不上线,那么 Consumer 会一直存在,只能通过手动删除的方式清除。

InactiveThreshold 配置允许一个 Consumer 一定时间不活跃一定时间后由 Server 清理不活跃 Consumer。在 2.9 版本之前,Ephemeral Consumer 和 Durable Consumer 有一个非常大的区别是只有 Ephemeral Consumer 支持 InactiveThreshold,如果 Ephemeral Consumer 没指定 InactiveThreshold,Server 也会为 Ephemeral Consumer 提供一个预设的值(目前 5s)。2.9 以后 Durable Consumer 也支持了这个配置,但是必须显式指定。

Push-based 和 Pull-based

Push-based Consumer 和 NATS、STAN 的 Consumer 用法相同,Server 主动推送订阅的消息到客户端,Push-based Consumer 支持 queue,也就是 DeliverGroup

Pull-based Consumer 是 JetStream 新增的一个模式,模式上更接近其他 MQ 客户端用法。创建订阅后你需要主动拉取消息,消息必须手动 Ack,Pull 模式不支持 queue。

具体配置参考文档,不多介绍。

Cluster

NATS Server 拥有非常丰富的拓扑能力,这方面我不打算展开聊,想了解直接阅读文档 NATS Adaptive Deployment ArchitecturesJetStream Clustering。我就简单聊聊集群配置。

假设我有 3 个节点,那么我 nats-0 节点关于 cluster 的配置大概是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server_name: "dev-nats-0"

cluster: {
name: "dev"
listen: ":6222"
authorization {
user: replica
password: "WC49PtzPF4Tcfpf3CwNfhbWMRM47Fxmb"
timeout: 1
}
routes: [
# nats-route://replica:WC49PtzPF4Tcfpf3CwNfhbWMRM47Fxmb@nats-0:6222
nats-route://replica:WC49PtzPF4Tcfpf3CwNfhbWMRM47Fxmb@nats-1:6222
nats-route://replica:WC49PtzPF4Tcfpf3CwNfhbWMRM47Fxmb@nats-2:6222
]
}
  • server_name 是必须的,并且不能重复。
  • cluster 配置中
    • 相同集群 name 必须相同
    • authorization 里的 user 如果都相同,password 必须也相同
    • routes 配置里关于自己节点的路由可以不配置,对 NATS Server 来说,这是集群的 member 表。节点之间会交换 routes 列表,并且会从中删除重复的项。如果启用了 JetStream,那么 routes 列表不能为空,否则 Server 不能启动。

顺便一提,在 Stream 中其实有几个配置是和 Cluster 相关的,例如目前有 Replicas、Placement

Accounts

Accounts 是 NATS 关于多租户的能力。NATS 有一个默认的全局 Account,即 $G,如果仅配置了 authorization,登录到 Server 的客户端访问的是默认 Account。你可以在配置 accounts 启用多租户:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
accounts: {
biz_a: {
jetstream: enabled
users: [
{ user: "a_user", password: "CgqvKgzsM7qW9xbgknmvVhmdmHPJPznq" }
]
}
biz_b: {
jetstream: enabled
users: [
{ user: "b_user", password: "XpnLbc3VsbL9wgwcL7VVnpH3XssbnHwP" }
]
}
}

只要客户端分别通过,a_userb_user 登录,那么他们就会处于不同的 Account 空间,对同一个 Subject 的消息相互隔离。jetstream 可以是一个配置对象,针对不同的 Account 配置 JetStream 配额。

Account 还提供了跨租户访问的能力,你可以声明导出你的 Subject 提供给其他 Account 导入,这里不再细说。

Client

最后再聊聊客户端开发,我这里用的是 nats.go 客户端。

连接

连接主要通过 nats.Connect,然后提供一些选项,有几个选项可以了解一下:

  • RetryOnFailedConnect:如果设为 true,Connect 失败不会返回错误,而是在后台尝试连接 Server,连接状态为 Connecting
  • MaxReconnects:顾名思义,最大重连次数
  • ErrorHandlerErrSlowConsumer 是一个需要关注的特殊错误,需要通过设置 ErrorHandler 来接收这个错误,在收到 ErrSlowConsumer 错误时,可以同时执行相关 SubscriptionPending 函数获取消息堆积数量

JetStream 的客户端已经能做到自动重连后重新订阅了,所以不需要我们做额外的封装。

JetStream

JetStream 客户端要从 nats.Conn 中产生,只是增加了一些专用参数,用法接近。

Pull-based 订阅

Pull-based 订阅在 Fetch 时尽量带上 nats.MaxWait 或者 nats.Context。一方面是经验之谈,另外一方面是我的使用场景需要,nats.MaxWait 超时返回的错误是 nats.ErrTimeout,不是 context.DeadlineExceeded

退出

正如我前面提到的,一个异常关闭的客户端,会影响到 Consumer 的释放,不论是否配置了 InactiveThreshold,这都不是一个优雅的过程,所以最好时能做到先取消订阅,再退出客户端。

如果在关闭连接时使用 Drain,那么客户端会先取消所有订阅后再关闭连接,所以总是推荐使用。

如果你没有使用 InactiveThreshold,对于已经处于异常状态的 Durable Consumer,你可以选择从消费者集群里选择一个客户端,或者定期启动一个客户端来清理异常 Consumer。

Relevant Links