NATS文档
  • 欢迎
  • 发行备注
    • 最新情况
      • NATS 2.2
      • NATS 2.0
  • NATS 概念
    • 概览
      • 比较 NATS
    • 什么是NATS
      • 演练安装
    • 基于主题的消息
    • 核心NATS
      • 发布和订阅
        • 发布/订阅演 练
      • 请求和响应
        • 请求/响应 演练
      • 队列组
        • 队列 演练
    • JetStream
      • 流
      • 消费者
        • 示例
      • JetStream 演练
      • 键值对存储
        • 键值对存储演练
      • 对象存储
        • 对象存储演练
    • 主题映射与分区
    • NATS服务器基础架构
      • NATS部署架构适配
    • 安全
    • 连接性
  • 使用 NATS
    • NATS工具
      • nats
        • nats基准测试
      • nk
      • nsc
        • 基础
        • 流
        • 服务
        • 签名密钥
        • 撤销
        • 管理操作
      • nats-top
        • 教程
    • 用NATS开发
      • 一个NATS应用的解剖
      • 连接
        • 连接到默认服务器
        • 连接到特定服务器
        • 连接到群集
        • 连接名称
        • 用用户名和密码做认证
        • 用令牌做认证
        • 用NKey做认证
        • 用一个可信文件做认证
        • 用TLS加密连接
        • 设置连接超时
        • 乒乓协议
        • 关闭响应消息
        • 杂技功能
        • 自动恢复
          • 禁用自动重连
          • 设置自动重新连接的最大次数
          • 随机
          • 重连尝试之间暂停
          • 关注重连事件
          • 重连尝试期间缓存消息
        • 监视连接
          • 关注连接事件
          • 低速消费者
      • 接收消息
        • 同步订阅
        • 异步订阅
        • 取消订阅
        • N个消息后取消订阅
        • 回复一个消息
        • 通配符订阅
        • 队列订阅
        • 断开连接前清除消息
        • 接收结构化数据
      • 发送消息
        • 包含一个回复主题
        • 请求回复语义
        • 缓存刷入和乒
        • 发送结构化数据
      • JetStream
        • 深入JetStream模型
        • 管理流和消费者
        • 消费者详情
        • 发布到流
        • 使用键值对存储
        • 使用对象存储
      • 教程
        • 用go做个自定义拨号器
  • 运行一个NATS服务
    • 安装、运行和部署NATS服务
      • 安装一个NATS服务
      • 运行和部署一个NATS服务
      • Windows服务
      • 信号
    • 环境约束
    • NATS和Docker
      • 教程
      • Docker Swarm
      • Python 和 NGS 运行在Docker
      • JetStream
    • NATS和Kubernetes
      • 用Helm 部署NATS
      • 创建一个Kubernetes群集
      • NATS群集和认证管理
      • 用cfssl保护NATS群集
      • 用负载均衡来保护外部的NATS访问
      • 在Digital Ocean用Helm创建超级NATS群集
      • 使用Helm从0到K8S再到叶子节点
    • NATS服务的客户端
    • 配置 NATS服务
      • 配置 JetStream
        • 配置管理 Management
          • NATS管理命令行
          • 地形
          • GitHub Actions
          • Kubernetes控制器
      • 群集
        • 群集配置
        • JetStream 群集
          • 管理
      • 网关超级群集
        • 配置
      • 叶子节点
        • 配置
        • JetStream在叶子节点
      • 安全加固NATS
        • 使用 TLS
        • 认证
          • 令牌
          • 用户名/密码
          • TLS认证
            • 群集中的TLS认证
          • NKeys
          • 认证超时
          • 去中心化的 JWT 认证/授权
            • 使用解析器查找帐户
            • 内存解析器教程
            • 混合认证/授权安装
        • 授权
        • 基于账户的多租户
        • OCSP Stapling
      • 日志
      • 使用监控
      • MQTT
        • 配置
      • 配置主题映射
      • 系统事件
        • 系统时间和去中心化的JWT教程
      • WebSocket
        • 配置
    • 管理和监控你的NATS服务基础架构
      • 监控
        • 监控 JetStream
      • 管理 JetStream
        • 账号信息
        • 命名流,消费者和账号
        • 流
        • 消费者
        • 数据复制
        • 灾难回复
        • 加密Rest
      • 管理JWT安全
        • 深入JWT指南
      • 升级一个群集
      • 慢消费者
      • 信号
      • 跛脚鸭模式
  • 参考
    • 常见问题
    • NATS协议
      • 协议演示
      • 客户端协议
        • 开发一个客户端
      • NATS群集协议
      • JetStream API参考
  • 遗产
    • STAN='NATS流'
      • STAN概念
        • 和NATS的关系
        • 客户端连接
        • 频道
          • 消息日志
          • 订阅
            • 通常的
            • 持久化的
            • 队列组
            • 重新投递
        • 存储接口
        • 存储加密
        • 群集
          • Supported Stores
          • Configuration
          • Auto Configuration
          • Containers
        • Fault Tolerance
          • Active Server
          • Standby Servers
          • Shared State
          • Failover
        • Partitioning
        • Monitoring
          • Endpoints
      • Developing With STAN
        • Connecting to NATS Streaming Server
        • Publishing to a Channel
        • Receiving Messages from a Channel
        • Durable Subscriptions
        • Queue Subscriptions
        • Acknowledgements
        • The Streaming Protocol
      • STAN NATS Streaming Server
        • Installing
        • Running
        • Configuring
          • Command Line Arguments
          • Configuration File
          • Store Limits
          • Persistence
            • File Store
            • SQL Store
          • Securing
        • Process Signaling
        • Windows Service
        • Embedding NATS Streaming Server
        • Docker Swarm
        • Kubernetes
          • NATS Streaming with Fault Tolerance.
    • nats账号服务
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
由 GitBook 提供支持
在本页
  • Reference
  • Error Handling
  • Admin API
  • General Info
  • Streams
  • Consumers
  • ACLs
  • Acknowledging Messages
  • Fetching The Next Message From a Pull-based Consumer
  • Fetching From a Stream By Sequence
  • Consumer Samples
  1. 参考
  2. NATS协议

JetStream API参考

The normal way to use JetStream is through the NATS client libraries which expose a set of JetStream functions that you can use directly in your programs. But that is not the only way you can interact with the JetStream infrastructure programmatically. Just like core NATS has a wire protocol on top of TCP, the JetStream enabled nats-server(s) expose a set of Services over core NATS.

Reference

All of these subjects are found as constants in the NATS Server source, so for example the subject $JS.API.STREAM.LIST is represented by api.JSApiStreamList constant in the nats-server source. Tables below will reference these constants and payload related data structures.

Error Handling

The APIs used for administrative tools all respond with standardised JSON and these include errors.

nats req '$JS.API.STREAM.INFO.nonexisting' ''
Published 11 bytes to $JS.API.STREAM.INFO.nonexisting
Received  [_INBOX.lcWgjX2WgJLxqepU0K9pNf.mpBW9tHK] : {
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "error": {
    "code": 404,
    "description": "stream not found"
  }
}
nats req '$JS.STREAM.INFO.ORDERS' ''
Published 6 bytes to $JS.STREAM.INFO.ORDERS
Received  [_INBOX.fwqdpoWtG8XFXHKfqhQDVA.vBecyWmF] : '{
  "type": "io.nats.jetstream.api.v1.stream_info_response",
  "config": {
    "name": "ORDERS",
  ...
}

Here the responses include a type which can be used to find the JSON Schema for each response.

Non admin APIs - like those for adding a message to the stream will respond with -ERR or +OK with an optional reason after.

Admin API

All the admin actions the nats CLI can do falls in the sections below. The API structure are kept in the api package in the jsm.go repository.

Subjects that end in T like api.JSApiConsumerCreateT are formats and would need to have the Stream Name and in some cases also the Consumer name interpolated into them. In this case t := fmt.Sprintf(api.JSApiConsumerCreateT, streamName) to get the final subject.

The command nats events will show you an audit log of all API access events which includes the full content of each admin request, use this to view the structure of messages the nats command sends.

The API uses JSON for inputs and outputs, all the responses are typed using a type field which indicates their Schema. A JSON Schema repository can be found in nats-io/jsm.go/schemas.

General Info

Subject
Constant
Description
Request Payload
Response Payload

$JS.API.INFO

api.JSApiAccountInfo

Retrieves stats and limits about your account

empty payload

api.JetStreamAccountStats

Streams

Subject
Constant
Description
Request Payload
Response Payload

$JS.API.STREAM.LIST

api.JSApiStreamList

Paged list known Streams including all their current information

api.JSApiStreamListRequest

api.JSApiStreamListResponse

$JS.API.STREAM.NAMES

api.JSApiStreamNames

Paged list of Streams

api.JSApiStreamNamesRequest

api.JSApiStreamNamesResponse

$JS.API.STREAM.CREATE.*

api.JSApiStreamCreateT

Creates a new Stream

api.StreamConfig

api.JSApiStreamCreateResponse

$JS.API.STREAM.UPDATE.*

api.JSApiStreamUpdateT

Updates an existing Stream with new config

api.StreamConfig

api.JSApiStreamUpdateResponse

$JS.API.STREAM.INFO.*

api.JSApiStreamInfoT

Information about config and state of a Stream

empty payload, Stream name in subject

api.JSApiStreamInfoResponse

$JS.API.STREAM.DELETE.*

api.JSApiStreamDeleteT

Deletes a Stream and all its data

empty payload, Stream name in subject

api.JSApiStreamDeleteResponse

$JS.API.STREAM.PURGE.*

api.JSApiStreamPurgeT

Purges all of the data in a Stream, leaves the Stream

empty payload, Stream name in subject

api.JSApiStreamPurgeResponse

$JS.API.STREAM.MSG.DELETE.*

api.JSApiMsgDeleteT

Deletes a specific message in the Stream by sequence, useful for GDPR compliance

api.JSApiMsgDeleteRequest

api.JSApiMsgDeleteResponse

$JS.API.STREAM.MSG.GET.*

api.JSApiMsgGetT

Retrieves a specific message from the stream

api.JSApiMsgGetRequest

api.JSApiMsgGetResponse

$JS.API.STREAM.SNAPSHOT.*

api.JSApiStreamSnapshotT

Initiates a streaming backup of a streams data

api.JSApiStreamSnapshotRequest

api.JSApiStreamSnapshotResponse

$JS.API.STREAM.RESTORE.*

api.JSApiStreamRestoreT

Initiates a streaming restore of a stream

{}

api.JSApiStreamRestoreResponse

Consumers

Subject
Constant
Description
Request Payload
Response Payload

$JS.API.CONSUMER.CREATE.*

api.JSApiConsumerCreateT

Create an ephemeral Consumer

api.ConsumerConfig, Stream name in subject

api.JSApiConsumerCreateResponse

$JS.API.CONSUMER.DURABLE.CREATE.*.*

api.JSApiDurableCreateT

Create an Consumer

api.ConsumerConfig, Stream name in subject

api.JSApiConsumerCreateResponse

$JS.API.CONSUMER.LIST.*

api.JSApiConsumerListT

Paged list of known Consumers including their current info

api.JSApiConsumerListRequest

api.JSApiConsumerListResponse

$JS.API.CONSUMER.NAMES.*

api.JSApiConsumerNamesT

Paged list of known Consumer names

api.JSApiConsumerNamesRequest

api.JSApiConsumerNamesResponse

$JS.API.CONSUMER.INFO.*.*

api.JSApiConsumerInfoT

Information about an Consumer

empty payload, Stream and Consumer names in subject

api.JSApiConsumerInfoResponse

$JS.API.CONSUMER.DELETE.*.*

api.JSApiConsumerDeleteT

Deletes a Consumer

empty payload, Stream and Consumer names in subject

api.JSApiConsumerDeleteResponse

$JS.FC.>

N/A

Consumer to subscriber flow control messages (also needed for stream mirroring)

ACLs

It's hard to notice here but there is a clear pattern in these subjects, lets look at the various JetStream related subjects:

General information

$JS.API.INFO

Stream and Consumer Admin

$JS.API.STREAM.CREATE.<stream>
$JS.API.STREAM.UPDATE.<stream>
$JS.API.STREAM.DELETE.<stream>
$JS.API.STREAM.INFO.<stream>
$JS.API.STREAM.PURGE.<stream>
$JS.API.STREAM.LIST
$JS.API.STREAM.NAMES
$JS.API.STREAM.MSG.DELETE.<stream>
$JS.API.STREAM.MSG.GET.<stream>
$JS.API.STREAM.SNAPSHOT.<stream>
$JS.API.STREAM.RESTORE.<stream>
$JS.API.CONSUMER.CREATE.<stream>
$JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>
$JS.API.CONSUMER.DELETE.<stream>.<consumer>
$JS.API.CONSUMER.INFO.<stream>.<consumer>
$JS.API.CONSUMER.LIST.<stream>
$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
$JS.API.CONSUMER.NAMES.<stream>

Stream and Consumer Use

$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
$JS.ACK.<stream>.<consumer>.x.x.x
$JS.SNAPSHOT.ACK.<stream>.<msg id>
$JS.SNAPSHOT.RESTORE.<stream>.<msg id>
$JS.FC.>

Events and Advisories:

$JS.EVENT.METRIC.CONSUMER_ACK.<stream>.<consumer>
$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.<stream>.<consumer>
$JS.EVENT.ADVISORY.CONSUMER.MSG_TERMINATED.<stream>.<consumer>
$JS.EVENT.ADVISORY.STREAM.CREATED.<stream>
$JS.EVENT.ADVISORY.STREAM.DELETED.<stream>
$JS.EVENT.ADVISORY.STREAM.UPDATED.<stream>
$JS.EVENT.ADVISORY.CONSUMER.CREATED.<stream>.<consumer>
$JS.EVENT.ADVISORY.CONSUMER.DELETED.<stream>.<consumer>
$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.<stream>
$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.<stream>
$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.<stream>
$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.<stream>
$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.<stream>
$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.<stream>
$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED.<stream>.<consumer>
$JS.EVENT.ADVISORY.CONSUMER.QUORUM_LOST.<stream>.<consumer>
$JS.EVENT.ADVISORY.API

This design allows you to easily create ACL rules that limit users to a specific Stream or Consumer and to specific verbs for administration purposes. For ensuring only the receiver of a message can Ack it we have response permissions ensuring you can only Publish to Response subject for messages you received.

Acknowledging Messages

Messages that need acknowledgement will have a Reply subject set, something like $JS.ACK.ORDERS.test.1.2.2, this is the prefix defined in api.JetStreamAckPre followed by <stream>.<consumer>.<delivered count>.<stream sequence>.<consumer sequence>.<timestamp>.<pending messages>.

In all of the Synadia maintained API's you can simply do msg.Respond(nil) (or language equivalent) which will send nil to the reply subject.

Fetching The Next Message From a Pull-based Consumer

If you have a pull-based Consumer you can send a standard NATS Request to $JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>, here the format is defined in api.JetStreamRequestNextT and requires populating using fmt.Sprintf().

nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.test' '1'
Published 1 bytes to $JS.API.CONSUMER.MSG.NEXT.ORDERS.test
Received  [js.1] : 'message 1'

Here we ask for just 1 message - nats req only shows 1 - but you can fetch a batch of messages by varying the argument. This combines well with the AckAll Ack policy.

The above request for the next message will stay in the server for as long as the client is connected and future pulls from the same client will accumulate on the server, meaning if you ask for 1 message 100 times and 1000 messages arrive you'll get sent 100 messages not 1.

This is often not desired, pull consumers support a mode where a JSON document is sent describing the pull request.

{
  "expires": 7000000000,
  "batch": 10
}

This requests 10 messages and asks the server to keep this request for 7 seconds, this is useful when you poll the server frequently and do not want the pull requests to accumulate on the server. Set the expire time to now + your poll frequency.

{
  "batch": 10,
  "no_wait": true
}

Here we see a second format of the Pull request that will not store the request on the queue at all but when there are no messages to deliver will send a nil bytes message with a Status header of 404, this way you can know when you reached the end of the stream for example. A 409 is returned if the Consumer has reached MaxAckPending limits.

nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW' '{"no_wait": true, "batch": 10}'
13:45:30 Sending request on "$JS.API.CONSUMER.MSG.NEXT.ORDERS.NEW"
13:45:30 Received on "_INBOX.UKQGqq0W1EKl8inzXU1naH.XJiawTRM" rtt 594.908µs
13:45:30 Status: 404
13:45:30 Description: No Messages

Fetching From a Stream By Sequence

If you know the Stream sequence of a message you can fetch it directly, this does not support acks. Do a Request() to $JS.API.STREAM.MSG.GET.ORDERS sending it the message sequence as payload. Here the prefix is defined in api.JetStreamMsgBySeqT which also requires populating using fmt.Sprintf().

nats req '$JS.API.STREAM.MSG.GET.ORDERS' '{"seq": 1}'
Published 1 bytes to $JS.STREAM.ORDERS.MSG.BYSEQ
Received  [_INBOX.cJrbzPJfZrq8NrFm1DsZuH.k91Gb4xM] : '{
  "type": "io.nats.jetstream.api.v1.stream_msg_get_response",
  "message": {
    "subject": "x",
    "seq": 1,
    "data": "aGVsbG8=",
    "time": "2020-05-06T13:18:58.115424+02:00"
  }
}'

The Subject shows where the message was received, Data is base64 encoded and Time is when it was received.

Consumer Samples

Samples are published to a specific subject per Consumer, something like $JS.EVENT.METRIC.CONSUMER_ACK.<stream>.<consumer> you can just subscribe to that and get api.ConsumerAckMetric messages in JSON format. The prefix is defined in api.JetStreamMetricConsumerAckPre.

上一页NATS群集协议下一页STAN='NATS流'

最后更新于2年前