NATS文档
  • 欢迎
  • 发行备注
    • 最新情况
      • NATS 2.2
      • NATS 2.0
  • NATS 概念
    • 概览
      • 比较 NATS
    • 什么是NATS
      • 演练安装
    • 基于主题的消息
    • 核心NATS
      • 发布和订阅
        • 发布/订阅演 练
      • 请求和响应
        • 请求/响应 演练
      • 队列组
        • 队列 演练
    • JetStream
      • 流
      • 消费者
        • 示例
      • JetStream 演练
      • 键值对存储
        • 键值对存储演练
      • 对象存储
        • 对象存储演练
    • 主题映射与分区
    • NATS服务器基础架构
      • NATS部署架构适配
    • 安全
    • 连接性
  • 使用 NATS
    • NATS工具
      • nats
        • nats基准测试
      • nk
      • nsc
        • 基础
        • 流
        • 服务
        • 签名密钥
        • 撤销
        • 管理操作
      • nats-top
        • 教程
    • 用NATS开发
      • 一个NATS应用的解剖
      • 连接
        • 连接到默认服务器
        • 连接到特定服务器
        • 连接到群集
        • 连接名称
        • 用用户名和密码做认证
        • 用令牌做认证
        • 用NKey做认证
        • 用一个可信文件做认证
        • 用TLS加密连接
        • 设置连接超时
        • 乒乓协议
        • 关闭响应消息
        • 杂技功能
        • 自动恢复
          • 禁用自动重连
          • 设置自动重新连接的最大次数
          • 随机
          • 重连尝试之间暂停
          • 关注重连事件
          • 重连尝试期间缓存消息
        • 监视连接
          • 关注连接事件
          • 低速消费者
      • 接收消息
        • 同步订阅
        • 异步订阅
        • 取消订阅
        • N个消息后取消订阅
        • 回复一个消息
        • 通配符订阅
        • 队列订阅
        • 断开连接前清除消息
        • 接收结构化数据
      • 发送消息
        • 包含一个回复主题
        • 请求回复语义
        • 缓存刷入和乒
        • 发送结构化数据
      • JetStream
        • 深入JetStream模型
        • 管理流和消费者
        • 消费者详情
        • 发布到流
        • 使用键值对存储
        • 使用对象存储
      • 教程
        • 用go做个自定义拨号器
  • 运行一个NATS服务
    • 安装、运行和部署NATS服务
      • 安装一个NATS服务
      • 运行和部署一个NATS服务
      • Windows服务
      • 信号
    • 环境约束
    • NATS和Docker
      • 教程
      • Docker Swarm
      • Python 和 NGS 运行在Docker
      • JetStream
    • NATS和Kubernetes
      • 用Helm 部署NATS
      • 创建一个Kubernetes群集
      • NATS群集和认证管理
      • 用cfssl保护NATS群集
      • 用负载均衡来保护外部的NATS访问
      • 在Digital Ocean用Helm创建超级NATS群集
      • 使用Helm从0到K8S再到叶子节点
    • NATS服务的客户端
    • 配置 NATS服务
      • 配置 JetStream
        • 配置管理 Management
          • NATS管理命令行
          • 地形
          • GitHub Actions
          • Kubernetes控制器
      • 群集
        • 群集配置
        • JetStream 群集
          • 管理
      • 网关超级群集
        • 配置
      • 叶子节点
        • 配置
        • JetStream在叶子节点
      • 安全加固NATS
        • 使用 TLS
        • 认证
          • 令牌
          • 用户名/密码
          • TLS认证
            • 群集中的TLS认证
          • NKeys
          • 认证超时
          • 去中心化的 JWT 认证/授权
            • 使用解析器查找帐户
            • 内存解析器教程
            • 混合认证/授权安装
        • 授权
        • 基于账户的多租户
        • OCSP Stapling
      • 日志
      • 使用监控
      • MQTT
        • 配置
      • 配置主题映射
      • 系统事件
        • 系统时间和去中心化的JWT教程
      • WebSocket
        • 配置
    • 管理和监控你的NATS服务基础架构
      • 监控
        • 监控 JetStream
      • 管理 JetStream
        • 账号信息
        • 命名流,消费者和账号
        • 流
        • 消费者
        • 数据复制
        • 灾难回复
        • 加密Rest
      • 管理JWT安全
        • 深入JWT指南
      • 升级一个群集
      • 慢消费者
      • 信号
      • 跛脚鸭模式
  • 参考
    • 常见问题
    • NATS协议
      • 协议演示
      • 客户端协议
        • 开发一个客户端
      • NATS群集协议
      • JetStream API参考
  • 遗产
    • STAN='NATS流'
      • STAN概念
        • 和NATS的关系
        • 客户端连接
        • 频道
          • 消息日志
          • 订阅
            • 通常的
            • 持久化的
            • 队列组
            • 重新投递
        • 存储接口
        • 存储加密
        • 群集
          • Supported Stores
          • Configuration
          • Auto Configuration
          • Containers
        • Fault Tolerance
          • Active Server
          • Standby Servers
          • Shared State
          • Failover
        • Partitioning
        • Monitoring
          • Endpoints
      • Developing With STAN
        • Connecting to NATS Streaming Server
        • Publishing to a Channel
        • Receiving Messages from a Channel
        • Durable Subscriptions
        • Queue Subscriptions
        • Acknowledgements
        • The Streaming Protocol
      • STAN NATS Streaming Server
        • Installing
        • Running
        • Configuring
          • Command Line Arguments
          • Configuration File
          • Store Limits
          • Persistence
            • File Store
            • SQL Store
          • Securing
        • Process Signaling
        • Windows Service
        • Embedding NATS Streaming Server
        • Docker Swarm
        • Kubernetes
          • NATS Streaming with Fault Tolerance.
    • nats账号服务
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
由 GitBook 提供支持
在本页
  • Slow consumers identified in the client
  • Slow consumers identified by the server
  • Handling slow consumers
  • Server Configuration
  • Client Configuration
  1. 运行一个NATS服务
  2. 管理和监控你的NATS服务基础架构

慢消费者

上一页升级一个群集下一页信号

最后更新于2年前

To support resiliency and high availability, NATS provides built-in mechanisms to automatically prune the registered listener interest graph that is used to keep track of subscribers, including slow consumers and lazy listeners. NATS automatically handles a slow consumer. If a client is not processing messages quick enough, the NATS server cuts it off. To support scaling, NATS provides for auto-pruning of client connections. If a subscriber does not respond to ping requests from the server within the , the client is cut off (disconnected). The client will need to have reconnect logic to reconnect with the server.

In core NATS, consumers that cannot keep up are handled differently from many other messaging systems: NATS favors the approach of protecting the system as a whole over accommodating a particular consumer to ensure message delivery.

What is a slow consumer?

A slow consumer is a subscriber that cannot keep up with the message flow delivered from the NATS server. This is a common case in distributed systems because it is often easier to generate data than it is to process it. When consumers cannot process data fast enough, back pressure is applied to the rest of the system. NATS has mechanisms to reduce this back pressure.

NATS identifies slow consumers in the client or the server, providing notification through registered callbacks, log messages, and statistics in the server's monitoring endpoints.

What happens to slow consumers?

When detected at the client, the application is notified and messages are dropped to allow the consumer to continue and reduce potential back pressure. When detected in the server, the server will disconnect the connection with the slow consumer to protect itself and the integrity of the messaging system.

Slow consumers identified in the client

A on a local connection and notify the application through use of the asynchronous error callback. It is better to catch a slow consumer locally in the client rather than to allow the server to detect this condition. This example demonstrates how to define and register an asynchronous error handler that will handle slow consumer errors.

func natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
    fmt.Printf("error: %v\n", natsErr)
    if natsErr == nats.ErrSlowConsumer {
        pendingMsgs, _, err := sub.Pending()
        if err != nil {
            fmt.Printf("couldn't get pending messages: %v", err)
            return
        }
        fmt.Printf("Falling behind with %d pending messages on subject %q.\n",
            pendingMsgs, sub.Subject)
        // Log error, notify operations...
    }
    // check for other errors
}

// Set the error handler when creating a connection.
nc, err := nats.Connect("nats://localhost:4222",
  nats.ErrorHandler(natsErrHandler))

With this example code and default settings, a slow consumer error would generate output something like this:

error: nats: slow consumer, messages dropped
Falling behind with 65536 pending messages on subject "foo".

Note that if you are using a synchronous subscriber, Subscription.NextMsg(timeout time.Duration) will also return an error indicating there was a slow consumer and messages have been dropped.

Slow consumers identified by the server

When a client does not process messages fast enough, the server will buffer messages in the outbound connection to the client. When this happens and the server cannot write data fast enough to the client, in order to protect itself, it will designate a subscriber as a "slow consumer" and may drop the associated connection.

When the server initiates a slow consumer error, you'll see the following in the server output:

[54083] 2017/09/28 14:45:18.001357 [INF] ::1:63283 - cid:7 - Slow Consumer Detected

The server will also keep count of the number of slow consumer errors encountered, available through the monitoring varz endpoint in the slow_consumers field.

Handling slow consumers

Scaling with queue subscribers

Create a subject namespace that can scale

You can distribute work further through the subject namespace, with some forethought in design. This approach is useful if you need to preserve message order. The general idea is to publish to a deep subject namespace, and consume with wildcard subscriptions while giving yourself room to expand and distribute work in the future.

For a simple example, if you have a service that receives telemetry data from IoT devices located throughout a city, you can publish to a subject namespace like Sensors.North, Sensors.South, Sensors.East and Sensors.West. Initially, you'll subscribe to Sensors.> to process everything in one consumer. As your enterprise grows and data rates exceed what one consumer can handle, you can replace your single consumer with four consuming applications to subscribe to each subject representing a smaller segment of your data. Note that your publishing applications remain untouched.

Meter the publisher

A less favorable option may be to meter the publisher. There are several ways to do this varying from simply slowing down your publisher to a more complex approach periodically issuing a blocking request-reply to match subscriber rates.

Tune NATS through configuration

The NATS server can be tuned to determine how much data can be buffered before a consumer is considered slow, and some officially supported clients allow buffer sizes to be adjusted. Decreasing buffer sizes will let you identify slow consumers more quickly. Increasing buffer sizes is not typically recommended unless you are handling temporary bursts of data. Often, increasing buffer capacity will only postpone slow consumer problems.

Server Configuration

The NATS server has a write deadline it uses to write to a connection. When this write deadline is exceeded, a client is considered to have a slow consumer. If you are encountering slow consumer errors in the server, you can increase the write deadline to buffer more data.

The write_deadline configuration option in the NATS server configuration file will tune this:

write_deadline: 2s

Tuning this parameter is ideal when you have bursts of data to accommodate. Be sure you are not just postponing a slow consumer error.

Client Configuration

Most officially supported clients have an internal buffer of pending messages and will notify your application through an asynchronous error callback if a local subscription is not catching up. Receiving an error locally does not necessarily mean that the server will have identified a subscription as a slow consumer.

This buffer can be configured through setting the pending limits after a subscription has been created:

if err := sub.SetPendingLimits(1024*500, 1024*5000); err != nil {
  log.Fatalf("Unable to set pending limits: %v", err)
}

The default subscriber pending message limit is 65536, and the default subscriber pending byte limit is 65536*1024

If the client reaches this internal limit, it will drop messages and continue to process new messages. This is aligned with NATS at most once delivery. It is up to your application to detect the missing messages and recover from this condition.

Apart from using or optimizing your consuming application, there are a few options available: scale, meter, or tune NATS to your environment.

This is ideal if you do not rely on message order. Ensure your NATS subscription belongs to a , then scale as required by creating more instances of your service or application. This is a great approach for microservices - each instance of your microservice will receive a portion of the messages to process, and simply add more instances of your service to scale. No code changes, configuration changes, or downtime whatsoever.

NATS streaming
queue group
ping-pong interval
client can detect it is a slow consumer