NATS 2.9 JetStream
从 STAN 说起
去年本想水一篇关于 NATS JetStream 的博客,当时写了一部分,结果翻资料翻到一篇博客 淺談 NATS、STAN 和 JetStream 兩三事,把我想说的基本都提到了,所以我当时弃坑了。最近用上了 NATS 2.9 版本,所以想着把新了解到的概念和之前想聊的一些内容都整理一下。如果你对 NATS 了解不多,非常推荐先看看我前面提到的那篇文章。
在升到 NATS JetStream 之前,我一直在用 STAN(aka NATS Streaming)。STAN 是 NATS 上一代持久化方案,是一个独立的 Server,它内嵌了 NATS Server,在这之上构建了一个持久化层,并且支持集群,集群节点之间使用 Raft 算法。STAN 在使用上基本接近 NATS,但是有个别要注意的地方。
一个是客户端连接时要提供 Cluster ID 和 Client ID。可以把 NATS 抽象看作 STAN 的网络层,客户端连入的是 NATS 的网络,访问这个网络上的 STAN Server,那么连接这个网络上的 Cluster 需要说明;Client ID 必须是唯一的,STAN 用 Client ID 和 Durable Name 区分 Consumer 持久化视图。
还有一个比较重要的是断线重连问题,也是我们踩过的坑。基于前面提到的抽象理解,STAN 构建于 NATS 网络层之上,客户端实际上没有真正连接到 STAN Server,所以断线重连机制变得复杂。记得当时是阿里云网络波动,STAN 客户端集体掉线后没有恢复,造成了不小影响。后来我封装 stan.go 实现了一个支持重连的客户端,通过 SetConnectionLostHandler
在连接断开时创建新连接,并且支持在连接切换后恢复订阅。
不过这些都不重要了,STAN 已经被标记为 deprecated,只维护到 2023.6。
JetStream
JetStream 是 NATS 2.2 开始内置的持久化机制,满足 At-Least-Once 语义,也提供了一些机制支持 Exactly-Once 语义。目前发展到 2.9.15 版本,已经很好用了。
Stream
Stream 是 JetStream 消息存储单元,在 Stream 配置消息的存储和管理规则,如果你熟悉 STAN Store 配置,会对 Stream 部分配置非常眼熟。挑几个配置谈谈我的理解:
- Subjects:Stream 和 Subject 命名没有联系,当发现消息的 Subject 如果和 Stream 关联,那么就会进入到 Stream 的消息存储中。在同一个 Stream 中的所有 Subject 共享一个 Stream 配额。
- Duplicate Window:消息重复窗口,配合
Nats-Msg-Id
可以保证窗口内消息不会重复投递。 - Retention Policy:
LimitsPolicy
:消息会一直保留,直到配额上限才会触发消息删除InterestPolicy
:在LimitsPolicy
的前提下,消息在每个正在订阅的 Consumer 都消费后删除,如果没有 Consumer 立即删除消息WorkQueuePolicy
:在LimitsPolicy
的前提下,消息在 Consumer 消费(Ack)后删除。同一个 Subject 最多只能有一个 Consumer
- Discard Policy:
DiscardOld
:当消息到达配额上限时,优先丢弃旧消息DiscardNew
:当消息到达配额上限时,拒收新消息DiscardNewPerSubject
:2.9 的新特征,是一个拓展配置,按 Subject 执行DiscardNew
策略。官方给了一个用法叫 Infinite message deduplication,不依赖 Duplicate Window 和Nats-Msg-Id
,结合 Per Subject Messages Limit(例如设为 1) 和DiscardNewPerSubject
,把 Subject 看作一个事务或者消息 ID,来保证不会重复投递。
Consumer 和订阅
Consumer 是订阅的状态视图。Consumer 维护消费配置,跟踪消息投递和 ACK,会影响到 Stream 是否能删除一条消息。Stream 从理解上也算订阅的单元,即 Consumer 创建一个订阅,关联一个 Stream,在上面过滤感兴趣的 Subject。
Consumer 中比较重要的概念是 Durable/Ephemeral 和 Push-based/Pull-based 这两块。
Durable 和 Ephemeral
Consumer 配置的 Durable
字段是空的话,这个 Consumer 就是一个 Ephemeral Consumer,这里实际上又涉及到另一个字段:Name
。大致流程是这样的:如果 Consumer Durable
和 Name
都为空,NATS Server 会生成一个唯一的 name 作为 Ephemeral Consumer 的 Name
,用作 Consumer 视图名,这个 Name
会在 Consumer 创建成功以后,返回给客户端。如果你查看 nats-server 代码,甚至会发现 Durable is deprecated. 的注释。
客户端正常退出(Drain 或者 Unsubscribe)的话,会主动在 Stream 上删除自己创建的 Consumer,但客户端故障退出后,新客户端会认为自己不是 Consumer 的创建者,它在退出时不会删除 Consumer,再然后如果客户端退出后永远不上线,那么 Consumer 会一直存在,只能通过手动删除的方式清除。
InactiveThreshold
配置允许一个 Consumer 一定时间不活跃一定时间后由 Server 清理不活跃 Consumer。在 2.9 版本之前,Ephemeral Consumer 和 Durable Consumer 有一个非常大的区别是只有 Ephemeral Consumer 支持 InactiveThreshold
,如果 Ephemeral Consumer 没指定 InactiveThreshold
,Server 也会为 Ephemeral Consumer 提供一个预设的值(目前 5s)。2.9 以后 Durable Consumer 也支持了这个配置,但是必须显式指定。
Push-based 和 Pull-based
Push-based Consumer 和 NATS、STAN 的 Consumer 用法相同,Server 主动推送订阅的消息到客户端,Push-based Consumer 支持 queue,也就是 DeliverGroup
。
Pull-based Consumer 是 JetStream 新增的一个模式,模式上更接近其他 MQ 客户端用法。创建订阅后你需要主动拉取消息,消息必须手动 Ack,Pull 模式不支持 queue。
具体配置参考文档,不多介绍。
Cluster
NATS Server 拥有非常丰富的拓扑能力,这方面我不打算展开聊,想了解直接阅读文档 NATS Adaptive Deployment Architectures 和 JetStream Clustering。我就简单聊聊集群配置。
假设我有 3 个节点,那么我 nats-0 节点关于 cluster 的配置大概是这样的:
1 | server_name: "dev-nats-0" |
server_name
是必须的,并且不能重复。cluster
配置中- 相同集群
name
必须相同 authorization
里的user
如果都相同,password
必须也相同routes
配置里关于自己节点的路由可以不配置,对 NATS Server 来说,这是集群的 member 表。节点之间会交换routes
列表,并且会从中删除重复的项。如果启用了 JetStream,那么routes
列表不能为空,否则 Server 不能启动。
- 相同集群
顺便一提,在 Stream 中其实有几个配置是和 Cluster 相关的,例如目前有 Replicas、Placement
Accounts
Accounts 是 NATS 关于多租户的能力。NATS 有一个默认的全局 Account,即 $G
,如果仅配置了 authorization
,登录到 Server 的客户端访问的是默认 Account。你可以在配置 accounts
启用多租户:
1 | accounts: { |
只要客户端分别通过,a_user
和 b_user
登录,那么他们就会处于不同的 Account 空间,对同一个 Subject 的消息相互隔离。jetstream
可以是一个配置对象,针对不同的 Account 配置 JetStream 配额。
Account 还提供了跨租户访问的能力,你可以声明导出你的 Subject 提供给其他 Account 导入,这里不再细说。
Client
最后再聊聊客户端开发,我这里用的是 nats.go 客户端。
连接
连接主要通过 nats.Connect
,然后提供一些选项,有几个选项可以了解一下:
RetryOnFailedConnect
:如果设为 true,Connect 失败不会返回错误,而是在后台尝试连接 Server,连接状态为 ConnectingMaxReconnects
:顾名思义,最大重连次数ErrorHandler
:ErrSlowConsumer
是一个需要关注的特殊错误,需要通过设置ErrorHandler
来接收这个错误,在收到ErrSlowConsumer
错误时,可以同时执行相关Subscription
的Pending
函数获取消息堆积数量
JetStream 的客户端已经能做到自动重连后重新订阅了,所以不需要我们做额外的封装。
JetStream
JetStream 客户端要从 nats.Conn
中产生,只是增加了一些专用参数,用法接近。
Pull-based 订阅
Pull-based 订阅在 Fetch
时尽量带上 nats.MaxWait
或者 nats.Context
。一方面是经验之谈,另外一方面是我的使用场景需要,nats.MaxWait
超时返回的错误是 nats.ErrTimeout
,不是 context.DeadlineExceeded
。
退出
正如我前面提到的,一个异常关闭的客户端,会影响到 Consumer 的释放,不论是否配置了 InactiveThreshold
,这都不是一个优雅的过程,所以最好时能做到先取消订阅,再退出客户端。
如果在关闭连接时使用 Drain
,那么客户端会先取消所有订阅后再关闭连接,所以总是推荐使用。
如果你没有使用 InactiveThreshold
,对于已经处于异常状态的 Durable Consumer,你可以选择从消费者集群里选择一个客户端,或者定期启动一个客户端来清理异常 Consumer。