NATS文档
  • 欢迎
  • 发行备注
    • 最新情况
      • NATS 2.2
      • NATS 2.0
  • NATS 概念
    • 概览
      • 比较 NATS
    • 什么是NATS
      • 演练安装
    • 基于主题的消息
    • 核心NATS
      • 发布和订阅
        • 发布/订阅演 练
      • 请求和响应
        • 请求/响应 演练
      • 队列组
        • 队列 演练
    • JetStream
      • 流
      • 消费者
        • 示例
      • JetStream 演练
      • 键值对存储
        • 键值对存储演练
      • 对象存储
        • 对象存储演练
    • 主题映射与分区
    • NATS服务器基础架构
      • NATS部署架构适配
    • 安全
    • 连接性
  • 使用 NATS
    • NATS工具
      • nats
        • nats基准测试
      • nk
      • nsc
        • 基础
        • 流
        • 服务
        • 签名密钥
        • 撤销
        • 管理操作
      • nats-top
        • 教程
    • 用NATS开发
      • 一个NATS应用的解剖
      • 连接
        • 连接到默认服务器
        • 连接到特定服务器
        • 连接到群集
        • 连接名称
        • 用用户名和密码做认证
        • 用令牌做认证
        • 用NKey做认证
        • 用一个可信文件做认证
        • 用TLS加密连接
        • 设置连接超时
        • 乒乓协议
        • 关闭响应消息
        • 杂技功能
        • 自动恢复
          • 禁用自动重连
          • 设置自动重新连接的最大次数
          • 随机
          • 重连尝试之间暂停
          • 关注重连事件
          • 重连尝试期间缓存消息
        • 监视连接
          • 关注连接事件
          • 低速消费者
      • 接收消息
        • 同步订阅
        • 异步订阅
        • 取消订阅
        • N个消息后取消订阅
        • 回复一个消息
        • 通配符订阅
        • 队列订阅
        • 断开连接前清除消息
        • 接收结构化数据
      • 发送消息
        • 包含一个回复主题
        • 请求回复语义
        • 缓存刷入和乒
        • 发送结构化数据
      • JetStream
        • 深入JetStream模型
        • 管理流和消费者
        • 消费者详情
        • 发布到流
        • 使用键值对存储
        • 使用对象存储
      • 教程
        • 用go做个自定义拨号器
  • 运行一个NATS服务
    • 安装、运行和部署NATS服务
      • 安装一个NATS服务
      • 运行和部署一个NATS服务
      • Windows服务
      • 信号
    • 环境约束
    • NATS和Docker
      • 教程
      • Docker Swarm
      • Python 和 NGS 运行在Docker
      • JetStream
    • NATS和Kubernetes
      • 用Helm 部署NATS
      • 创建一个Kubernetes群集
      • NATS群集和认证管理
      • 用cfssl保护NATS群集
      • 用负载均衡来保护外部的NATS访问
      • 在Digital Ocean用Helm创建超级NATS群集
      • 使用Helm从0到K8S再到叶子节点
    • NATS服务的客户端
    • 配置 NATS服务
      • 配置 JetStream
        • 配置管理 Management
          • NATS管理命令行
          • 地形
          • GitHub Actions
          • Kubernetes控制器
      • 群集
        • 群集配置
        • JetStream 群集
          • 管理
      • 网关超级群集
        • 配置
      • 叶子节点
        • 配置
        • JetStream在叶子节点
      • 安全加固NATS
        • 使用 TLS
        • 认证
          • 令牌
          • 用户名/密码
          • TLS认证
            • 群集中的TLS认证
          • NKeys
          • 认证超时
          • 去中心化的 JWT 认证/授权
            • 使用解析器查找帐户
            • 内存解析器教程
            • 混合认证/授权安装
        • 授权
        • 基于账户的多租户
        • OCSP Stapling
      • 日志
      • 使用监控
      • MQTT
        • 配置
      • 配置主题映射
      • 系统事件
        • 系统时间和去中心化的JWT教程
      • WebSocket
        • 配置
    • 管理和监控你的NATS服务基础架构
      • 监控
        • 监控 JetStream
      • 管理 JetStream
        • 账号信息
        • 命名流,消费者和账号
        • 流
        • 消费者
        • 数据复制
        • 灾难回复
        • 加密Rest
      • 管理JWT安全
        • 深入JWT指南
      • 升级一个群集
      • 慢消费者
      • 信号
      • 跛脚鸭模式
  • 参考
    • 常见问题
    • NATS协议
      • 协议演示
      • 客户端协议
        • 开发一个客户端
      • NATS群集协议
      • JetStream API参考
  • 遗产
    • STAN='NATS流'
      • STAN概念
        • 和NATS的关系
        • 客户端连接
        • 频道
          • 消息日志
          • 订阅
            • 通常的
            • 持久化的
            • 队列组
            • 重新投递
        • 存储接口
        • 存储加密
        • 群集
          • Supported Stores
          • Configuration
          • Auto Configuration
          • Containers
        • Fault Tolerance
          • Active Server
          • Standby Servers
          • Shared State
          • Failover
        • Partitioning
        • Monitoring
          • Endpoints
      • Developing With STAN
        • Connecting to NATS Streaming Server
        • Publishing to a Channel
        • Receiving Messages from a Channel
        • Durable Subscriptions
        • Queue Subscriptions
        • Acknowledgements
        • The Streaming Protocol
      • STAN NATS Streaming Server
        • Installing
        • Running
        • Configuring
          • Command Line Arguments
          • Configuration File
          • Store Limits
          • Persistence
            • File Store
            • SQL Store
          • Securing
        • Process Signaling
        • Windows Service
        • Embedding NATS Streaming Server
        • Docker Swarm
        • Kubernetes
          • NATS Streaming with Fault Tolerance.
    • nats账号服务
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
由 GitBook 提供支持
在本页
  • Stream Limits, Retention, and Policy
  • Message Deduplication
  • Acknowledgement Models
  • Exactly Once Semantics
  • Consumer Starting Position
  • Ephemeral Consumers
  • Consumer Message Rates
  • Ack Sampling
  • Configuration
  • Storage Overhead
  1. 使用 NATS
  2. 用NATS开发
  3. JetStream

深入JetStream模型

Stream Limits, Retention, and Policy

Streams store data on disk, but we cannot store all data forever so we need ways to control their size automatically.

There are 3 features that come into play when Streams decide how long they store data.

The Retention Policy describes based on what criteria a set will evict messages from its storage:

Retention Policy
Description

LimitsPolicy

Limits are set for how many messages, how big the storage and how old messages may be.

WorkQueuePolicy

Messages are kept until they are consumed: meaning delivered ( by the consumer filtering on the message's subject (in this mode of operation you can not have any overlapping consumers defined on the Stream - each subject captured by the stream can only have one consumer at a time)) to a subscribing application and explicitly acknowledged by that application.

InterestPolicy

Messages are kept as long as there are Consumers on the stream (matching the message's subject if they are filtered consumers) for which the message has not yet been ACKed. Once all currently defined consumers have received explicit acknowledgement from a subscribing application for the message it is then removed from the stream.

In all Retention Policies the basic limits apply as upper bounds, these are MaxMsgs for how many messages are kept in total, MaxBytes for how big the set can be in total and MaxAge for what is the oldest message that will be kept. These are the only limits in play with LimitsPolicy retention.

One can then define additional ways a message may be removed from the Stream earlier than these limits. In WorkQueuePolicy the messages will be removed as soon as the Consumer received an Acknowledgement. In InterestPolicy messages will be removed as soon as all Consumers of the stream for that subject have received an Acknowledgement for the message.

In both WorkQueuePolicy and InterestPolicy the age, size and count limits will still apply as upper bounds.

A final control is the Maximum Size any single message may have. NATS have it's own limit for maximum size (1 MiB by default), but you can say a Stream will only accept messages up to 1024 bytes using MaxMsgSize.

The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.

The WorkQueuePolicy mode is a specialized mode where a message, once consumed and acknowledged, is removed from the Stream.

Message Deduplication

JetStream support idempotent message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header.

nats req -H Nats-Msg-Id:1 ORDERS.new hello1
nats req -H Nats-Msg-Id:1 ORDERS.new hello2
nats req -H Nats-Msg-Id:1 ORDERS.new hello3
nats req -H Nats-Msg-Id:1 ORDERS.new hello4

Here we set a Nats-Msg-Id:1 header which tells JetStream to ensure we do not have duplicates of this message - we only consult the message ID not the body.

nats stream info ORDERS

and in the output you can see that the duplicate publications were detected and only one message (the first one) is actually stored in the stream

....
State:

            Messages: 1
               Bytes: 67 B

The default window to track duplicates in is 2 minutes, this can be set on the command line using --dupe-window when creating a stream, though we would caution against large windows.

Acknowledgement Models

Streams support acknowledging receiving a message, if you send a Request() to a subject covered by the configuration of the Stream the service will reply to you once it stored the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck to true in it's configuration.

Consumers have 3 acknowledgement modes:

Mode
Description

AckExplicit

This requires every message to be specifically acknowledged, it's the only supported option for pull-based Consumers

AckAll

In this mode if you acknowledge message 100 it will also acknowledge message 1-99, this is good for processing batches and to reduce ack overhead

AckNone

No acknowledgements are supported

To understand how Consumers track messages we will start with a clean ORDERS Stream and DISPATCH Consumer.

nats str info ORDERS
...
Statistics:

            Messages: 0
               Bytes: 0 B
            FirstSeq: 0
             LastSeq: 0
    Active Consumers: 1

The Set is entirely empty

nats con info ORDERS DISPATCH
...
State:

  Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
    Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
        Pending Messages: 0
    Redelivered Messages: 0

The Consumer has no messages outstanding and has never had any (Consumer sequence is 1).

We publish one message to the Stream and see that the Stream received it:

nats pub ORDERS.processed "order 4"
Published 7 bytes to ORDERS.processed
$ nats str info ORDERS
...
Statistics:

            Messages: 1
               Bytes: 53 B
            FirstSeq: 1
             LastSeq: 1
    Active Consumers: 1

As the Consumer is pull-based, we can fetch the message, ack it, and check the Consumer state:

nats con next ORDERS DISPATCH
--- received on ORDERS.processed
order 4

Acknowledged message

$ nats con info ORDERS DISPATCH
...
State:

  Last Delivered Message: Consumer sequence: 2 Stream sequence: 2
    Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
        Pending Messages: 0
    Redelivered Messages: 0

The message got delivered and acknowledged - Acknowledgement floor is 1 and 1, the sequence of the Consumer is 2 which means its had only the one message through and got acked. Since it was acked, nothing is pending or redelivering.

We'll publish another message, fetch it but not Ack it this time and see the status:

nats pub ORDERS.processed "order 5"
Published 7 bytes to ORDERS.processed

Get the next message from the consumer (but do not acknowledge it)

nats consumer next ORDERS DISPATCH --no-ack
--- received on ORDERS.processed
order 5

Show the consumer info

nats consumer info ORDERS DISPATCH
State:

  Last Delivered Message: Consumer sequence: 3 Stream sequence: 3
    Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
        Pending Messages: 1
    Redelivered Messages: 0

Now we can see the Consumer has processed 2 messages (obs sequence is 3, next message will be 3) but the Ack floor is still 1 - thus 1 message is pending acknowledgement. Indeed this is confirmed in the Pending messages.

If I fetch it again and again do not ack it:

nats consumer next ORDERS DISPATCH --no-ack
--- received on ORDERS.processed
order 5

Show the consumer info again

nats consumer info ORDERS DISPATCH
State:

  Last Delivered Message: Consumer sequence: 4 Stream sequence: 3
    Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
        Pending Messages: 1
    Redelivered Messages: 1

The Consumer sequence increases - each delivery attempt increases the sequence - and our redelivered count also goes up.

Finally, if I then fetch it again and ack it this time:

nats consumer next ORDERS DISPATCH 
--- received on ORDERS.processed
order 5

Acknowledged message

Show the consumer info

nats consumer info ORDERS DISPATCH
State:

  Last Delivered Message: Consumer sequence: 5 Stream sequence: 3
    Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
        Pending Messages: 0
    Redelivered Messages: 0

Having now Acked the message there are no more pending.

Additionally, there are a few types of acknowledgements:

Type
Bytes
Description

AckAck

nil, +ACK

Acknowledges a message was completely handled

AckNak

-NAK

Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried

AckProgress

+WPI

When sent before the AckWait period indicates that work is ongoing and the period should be extended by another equal to AckWait

AckNext

+NXT

Acknowledges the message was handled and requests delivery of the next message to the reply subject. Only applies to Pull-mode.

AckTerm

+TERM

Instructs the server to stop redelivery of a message without acknowledging it as successfully processed

So far all of the examples were the AckAck type of acknowledgement, by replying to the Ack with the body as indicated in Bytes you can pick what mode of acknowledgement you want.

All of these acknowledgement modes, except AckNext, support double acknowledgement - if you set a reply subject when acknowledging the server will in turn acknowledge having received your ACK.

The +NXT acknowledgement can have a few formats: +NXT 10 requests 10 messages and +NXT {"no_wait": true} which is the same data that can be sent in a Pull Request.

Exactly Once Semantics

JetStream supports Exactly Once publication and consumption by combining Message Deduplication and double acks.

Consumers can be 100% sure a message was correctly processed by requesting the server Acknowledge having received your acknowledgement (sometimes referred to as double-acking) by calling the message's AckSync() (rather than Ack()) function which sets a reply subject on the Ack and waits for a response from the server on the reception and processing of the acknowledgement. If the response received from the server indicates success you can be sure that the message will never be re-delivered by the consumer (due to a loss of your acknowledgement).

Consumer Starting Position

When setting up a Consumer you can decide where to start, the system supports the following for the DeliverPolicy:

Policy
Description

all

Delivers all messages that are available

last

Delivers the latest message, like a tail -n 1 -f

new

Delivers only new messages that arrive after subscribe time

by_start_time

Delivers from a specific time onward. Requires OptStartTime to be set

by_start_sequence

Delivers from a specific stream sequence. Requires OptStartSeq to be set

Regardless of what mode you set, this is only the starting point. Once started it will always give you what you have not seen or acknowledged. So this is merely how it picks the very first message.

Let's look at each of these, first we make a new Stream ORDERS and add 100 messages to it.

Now create a DeliverAll pull-based Consumer:

nats consumer add ORDERS ALL --pull --filter ORDERS.processed --ack none --replay instant --deliver all 
nats consumer next ORDERS ALL
--- received on ORDERS.processed
order 1

Acknowledged message

Now create a DeliverLast pull-based Consumer:

nats consumer add ORDERS LAST --pull --filter ORDERS.processed --ack none --replay instant --deliver last
nats consumer next ORDERS LAST
--- received on ORDERS.processed
order 100

Acknowledged message

Now create a MsgSetSeq pull-based Consumer:

nats consumer add ORDERS TEN --pull --filter ORDERS.processed --ack none --replay instant --deliver 10
nats consumer next ORDERS TEN
--- received on ORDERS.processed
order 10

Acknowledged message

And finally a time-based Consumer. Let's add some messages a minute apart:

nats stream purge ORDERS
for i in 1 2 3
do
  nats pub ORDERS.processed "order ${i}"
  sleep 60
done

Then create a Consumer that starts 2 minutes ago:

nats consumer add ORDERS 2MIN --pull --filter ORDERS.processed --ack none --replay instant --deliver 2m
nats consumer next ORDERS 2MIN
--- received on ORDERS.processed
order 2

Acknowledged message

Ephemeral Consumers

So far, all the Consumers you have seen were Durable, meaning they exist even after you disconnect from JetStream. In our Orders scenario, though the MONITOR a Consumer could very well be a short-lived thing there just while an operator is debugging the system, there is no need to remember the last seen position if all you are doing is wanting to observe the real-time state.

In this case, we can make an Ephemeral Consumer by first subscribing to the delivery subject, then creating a durable and giving it no durable name. An Ephemeral Consumer exists as long as any subscription is active on its delivery subject. It is automatically be removed, after a short grace period to handle restarts, when there are no subscribers.

Terminal 1:

nats sub my.monitor

Terminal 2:

nats consumer add ORDERS --filter '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeral

The --ephemeral switch tells the system to make an Ephemeral Consumer.

Consumer Message Rates

Typically what you want is if a new Consumer is made the selected messages are delivered to you as quickly as possible. You might want to replay messages at the rate they arrived though, meaning if messages first arrived 1 minute apart and you make a new Consumer it will get the messages a minute apart.

This is useful in load testing scenarios etc. This is called the ReplayPolicy and have values of ReplayInstant and ReplayOriginal.

You can only set ReplayPolicy on push-based Consumers.

nats consumer add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original
...
     Replay Policy: original
...

Now let's publish messages into the Set 10 seconds apart:

for i in 1 2 3                                                                                                                                                      <15:15:35
do
  nats pub ORDERS.processed "order ${i}"
  sleep 10
done
Published [ORDERS.processed] : 'order 1'
Published [ORDERS.processed] : 'order 2'
Published [ORDERS.processed] : 'order 3'

And when we consume them they will come to us 10 seconds apart:

nats sub -t out.original
Listening on [out.original]
2020/01/03 15:17:26 [#1] Received on [ORDERS.processed]: 'order 1'
2020/01/03 15:17:36 [#2] Received on [ORDERS.processed]: 'order 2'
2020/01/03 15:17:46 [#3] Received on [ORDERS.processed]: 'order 3'
^C

Ack Sampling

In the earlier sections we saw that samples are being sent to a monitoring system. Let's look at that in depth; how the monitoring system works and what it contains.

As messages pass through a Consumer you'd be interested in knowing how many are being redelivered and how many times but also how long it takes for messages to be acknowledged.

Configuration

You can configure a Consumer for sampling bypassing the --sample 80 option to nats consumer add, this tells the system to sample 80% of Acknowledgements.

When viewing info of a Consumer you can tell if it's sampled or not:

nats consumer info ORDERS NEW

Output contains

...
     Sampling Rate: 100
...

Storage Overhead

JetStream file storage is very efficient, storing as little extra information about the message as possible.

We do store some message data with each message, namely:

  • Message headers

  • The subject it was received on

  • The time it was received

  • The message payload

  • A hash of the message

  • The message sequence

  • A few other bits like the length of the subject and the length of headers

Without any headers the size is:

length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)

A 5 byte hello message without headers will take 39 bytes.

With headers:

length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + hdr_len(4) + hdr + msg + hash(8)

So if you are publishing many small messages the overhead will be, relatively speaking, quite large, but for larger messages the overhead is very small. If you publish many small messages it's worth trying to optimize the subject length.

上一页JetStream下一页管理流和消费者

最后更新于2年前

On the publishing side you can avoid duplicate message ingestion using the feature.

Consumers can sample Ack'ed messages for you and publish samples so your monitoring system can observe the health of a Consumer. We will add support for this to .

NATS Surveyor
Message Deduplication