NATS文档
  • 欢迎
  • 发行备注
    • 最新情况
      • NATS 2.2
      • NATS 2.0
  • NATS 概念
    • 概览
      • 比较 NATS
    • 什么是NATS
      • 演练安装
    • 基于主题的消息
    • 核心NATS
      • 发布和订阅
        • 发布/订阅演 练
      • 请求和响应
        • 请求/响应 演练
      • 队列组
        • 队列 演练
    • JetStream
      • 流
      • 消费者
        • 示例
      • JetStream 演练
      • 键值对存储
        • 键值对存储演练
      • 对象存储
        • 对象存储演练
    • 主题映射与分区
    • NATS服务器基础架构
      • NATS部署架构适配
    • 安全
    • 连接性
  • 使用 NATS
    • NATS工具
      • nats
        • nats基准测试
      • nk
      • nsc
        • 基础
        • 流
        • 服务
        • 签名密钥
        • 撤销
        • 管理操作
      • nats-top
        • 教程
    • 用NATS开发
      • 一个NATS应用的解剖
      • 连接
        • 连接到默认服务器
        • 连接到特定服务器
        • 连接到群集
        • 连接名称
        • 用用户名和密码做认证
        • 用令牌做认证
        • 用NKey做认证
        • 用一个可信文件做认证
        • 用TLS加密连接
        • 设置连接超时
        • 乒乓协议
        • 关闭响应消息
        • 杂技功能
        • 自动恢复
          • 禁用自动重连
          • 设置自动重新连接的最大次数
          • 随机
          • 重连尝试之间暂停
          • 关注重连事件
          • 重连尝试期间缓存消息
        • 监视连接
          • 关注连接事件
          • 低速消费者
      • 接收消息
        • 同步订阅
        • 异步订阅
        • 取消订阅
        • N个消息后取消订阅
        • 回复一个消息
        • 通配符订阅
        • 队列订阅
        • 断开连接前清除消息
        • 接收结构化数据
      • 发送消息
        • 包含一个回复主题
        • 请求回复语义
        • 缓存刷入和乒
        • 发送结构化数据
      • JetStream
        • 深入JetStream模型
        • 管理流和消费者
        • 消费者详情
        • 发布到流
        • 使用键值对存储
        • 使用对象存储
      • 教程
        • 用go做个自定义拨号器
  • 运行一个NATS服务
    • 安装、运行和部署NATS服务
      • 安装一个NATS服务
      • 运行和部署一个NATS服务
      • Windows服务
      • 信号
    • 环境约束
    • NATS和Docker
      • 教程
      • Docker Swarm
      • Python 和 NGS 运行在Docker
      • JetStream
    • NATS和Kubernetes
      • 用Helm 部署NATS
      • 创建一个Kubernetes群集
      • NATS群集和认证管理
      • 用cfssl保护NATS群集
      • 用负载均衡来保护外部的NATS访问
      • 在Digital Ocean用Helm创建超级NATS群集
      • 使用Helm从0到K8S再到叶子节点
    • NATS服务的客户端
    • 配置 NATS服务
      • 配置 JetStream
        • 配置管理 Management
          • NATS管理命令行
          • 地形
          • GitHub Actions
          • Kubernetes控制器
      • 群集
        • 群集配置
        • JetStream 群集
          • 管理
      • 网关超级群集
        • 配置
      • 叶子节点
        • 配置
        • JetStream在叶子节点
      • 安全加固NATS
        • 使用 TLS
        • 认证
          • 令牌
          • 用户名/密码
          • TLS认证
            • 群集中的TLS认证
          • NKeys
          • 认证超时
          • 去中心化的 JWT 认证/授权
            • 使用解析器查找帐户
            • 内存解析器教程
            • 混合认证/授权安装
        • 授权
        • 基于账户的多租户
        • OCSP Stapling
      • 日志
      • 使用监控
      • MQTT
        • 配置
      • 配置主题映射
      • 系统事件
        • 系统时间和去中心化的JWT教程
      • WebSocket
        • 配置
    • 管理和监控你的NATS服务基础架构
      • 监控
        • 监控 JetStream
      • 管理 JetStream
        • 账号信息
        • 命名流,消费者和账号
        • 流
        • 消费者
        • 数据复制
        • 灾难回复
        • 加密Rest
      • 管理JWT安全
        • 深入JWT指南
      • 升级一个群集
      • 慢消费者
      • 信号
      • 跛脚鸭模式
  • 参考
    • 常见问题
    • NATS协议
      • 协议演示
      • 客户端协议
        • 开发一个客户端
      • NATS群集协议
      • JetStream API参考
  • 遗产
    • STAN='NATS流'
      • STAN概念
        • 和NATS的关系
        • 客户端连接
        • 频道
          • 消息日志
          • 订阅
            • 通常的
            • 持久化的
            • 队列组
            • 重新投递
        • 存储接口
        • 存储加密
        • 群集
          • Supported Stores
          • Configuration
          • Auto Configuration
          • Containers
        • Fault Tolerance
          • Active Server
          • Standby Servers
          • Shared State
          • Failover
        • Partitioning
        • Monitoring
          • Endpoints
      • Developing With STAN
        • Connecting to NATS Streaming Server
        • Publishing to a Channel
        • Receiving Messages from a Channel
        • Durable Subscriptions
        • Queue Subscriptions
        • Acknowledgements
        • The Streaming Protocol
      • STAN NATS Streaming Server
        • Installing
        • Running
        • Configuring
          • Command Line Arguments
          • Configuration File
          • Store Limits
          • Persistence
            • File Store
            • SQL Store
          • Securing
        • Process Signaling
        • Windows Service
        • Embedding NATS Streaming Server
        • Docker Swarm
        • Kubernetes
          • NATS Streaming with Fault Tolerance.
    • nats账号服务
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
由 GitBook 提供支持
在本页
  1. 使用 NATS
  2. 用NATS开发
  3. 接收消息

断开连接前清除消息

This feature is the ability to drain connections or subscriptions and then close the connection. Closing a connection (using close()), or unsubscribing from a subscription, are generally considered immediate requests. When you close or unsubscribe the library will halt messages in any pending queue or cache for subscribers. When you drain a subscription or connection, it will process any inflight and cached/pending messages before closing.

Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to delivery timing.

The libraries can provide drain on a connection or on a subscriber, or both.

For a connection the process is essentially:

  1. Drain all subscriptions

  2. Stop new messages from being published

  3. Flush any remaining published messages

  4. Close

The API for drain can generally be used instead of close:

As an example of draining a connection:

wg := sync.WaitGroup{}
wg.Add(1)

errCh := make(chan error, 1)

// To simulate a timeout, you would set the DrainTimeout()
// to a value less than the time spent in the message callback,
// so say: nats.DrainTimeout(10*time.Millisecond).

nc, err := nats.Connect("demo.nats.io",
    nats.DrainTimeout(10*time.Second),
    nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
        errCh <- err
    }),
    nats.ClosedHandler(func(_ *nats.Conn) {
        wg.Done()
    }))
if err != nil {
    log.Fatal(err)
}

// Just to not collide using the demo server with other users.
subject := nats.NewInbox()

// Subscribe, but add some delay while processing.
if _, err := nc.Subscribe(subject, func(_ *nats.Msg) {
    time.Sleep(200 * time.Millisecond)
}); err != nil {
    log.Fatal(err)
}

// Publish a message
if err := nc.Publish(subject, []byte("hello")); err != nil {
    log.Fatal(err)
}

// Drain the connection, which will close it when done.
if err := nc.Drain(); err != nil {
    log.Fatal(err)
}

// Wait for the connection to be closed.
wg.Wait()

// Check if there was an error
select {
case e := <-errCh:
    log.Fatal(e)
default:
}
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await();

// Drain the connection, which will close it
CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));

// Wait for the drain to complete
drained.get();
const nc = await connect({ servers: "demo.nats.io" });
const sub = nc.subscribe(createInbox(), () => {});
nc.publish(sub.getSubject());
await nc.drain();
import asyncio
from nats.aio.client import Client as NATS

async def example(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def handler(msg):
        print("[Received] ", msg)
        await nc.publish(msg.reply, b'I can help')

        # Can check whether client is in draining state
        if nc.is_draining:
            print("Connection is draining")

    await nc.subscribe("help", "workers", cb=handler)
    await nc.flush()

    requests = []
    for i in range(0, 10):
        request = nc.request("help", b'help!', timeout=1)
        requests.append(request)

    # Wait for all the responses
    responses = []
    responses = await asyncio.gather(*requests)

    # Gracefully close the connection.
    await nc.drain()

    print("Received {} responses".format(len(responses)))
NATS.start(drain_timeout: 1) do |nc|
  NATS.subscribe('foo', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  NATS.subscribe('bar', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  NATS.subscribe('quux', queue: "workers") do |msg, reply, sub|
    nc.publish(reply, "ACK:#{msg}")
  end

  EM.add_timer(2) do
    next if NATS.draining?

    # Drain gracefully closes the connection.
    NATS.drain do
      puts "Done draining. Connection is closed."
    end
  end
end
static void
onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
{
    printf("Received msg: %s - %.*s\n",
           natsMsg_GetSubject(msg),
           natsMsg_GetDataLength(msg),
           natsMsg_GetData(msg));

    // Add some delay while processing
    nats_Sleep(200);

    // Need to destroy the message!
    natsMsg_Destroy(msg);
}

static void
closeHandler(natsConnection *conn, void *closure)
{
    cond_variable cv = (cond_variable) closure;

    notify_cond_variable(cv);
}

(...)


natsConnection      *conn      = NULL;
natsOptions         *opts      = NULL;
natsSubscription    *sub       = NULL;
natsStatus          s          = NATS_OK;
cond_variable       cv         = new_cond_variable(); // some fictuous way to notify between threads.

s = natsOptions_Create(&opts);
if (s == NATS_OK)
    // Setup a close handler and pass a reference to our condition variable.
    s = natsOptions_SetClosedCB(opts, closeHandler, (void*) cv);
if (s == NATS_OK)
    s = natsConnection_Connect(&conn, opts);

// Subscribe
if (s == NATS_OK)
    s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);

// Publish a message
if (s == NATS_OK)
    s = natsConnection_PublishString(conn, "foo", "hello");

// Drain the connection, which will close it when done.
if (s == NATS_OK)
    s = natsConnection_Drain(conn);

// Wait for the connection to be closed
if (s == NATS_OK)
    cond_variable_wait(cv);

(...)

// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);
natsOptions_Destroy(opts);

The mechanics of drain for a subscription are simpler:

  1. Unsubscribe

  2. Process all cached or inflight messages

  3. Clean up

The API for drain can generally be used instead of unsubscribe:

    nc, err := nats.Connect("demo.nats.io")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    done := sync.WaitGroup{}
    done.Add(1)

    count := 0
    errCh := make(chan error, 1)

    msgAfterDrain := "not this one"

    // Just to not collide using the demo server with other users.
    subject := nats.NewInbox()

    // This callback will process each message slowly
    sub, err := nc.Subscribe(subject, func(m *nats.Msg) {
        if string(m.Data) == msgAfterDrain {
            errCh <- fmt.Errorf("Should not have received this message")
            return
        }
        time.Sleep(100 * time.Millisecond)
        count++
        if count == 2 {
            done.Done()
        }
    })

    // Send 2 messages
    for i := 0; i < 2; i++ {
        nc.Publish(subject, []byte("hello"))
    }

    // Call Drain on the subscription. It unsubscribes but
    // wait for all pending messages to be processed.
    if err := sub.Drain(); err != nil {
        log.Fatal(err)
    }

    // Send one more message, this message should not be received
    nc.Publish(subject, []byte(msgAfterDrain))

    // Wait for the subscription to have processed the 2 messages.
    done.Wait()

    // Now check that the 3rd message was not received
    select {
    case e := <-errCh:
        log.Fatal(e)
    case <-time.After(200 * time.Millisecond):
        // OK!
    }
Connection nc = Nats.connect("nats://demo.nats.io:4222");

// Use a latch to wait for a message to arrive
CountDownLatch latch = new CountDownLatch(1);

// Create a dispatcher and inline message handler
Dispatcher d = nc.createDispatcher((msg) -> {
    String str = new String(msg.getData(), StandardCharsets.UTF_8);
    System.out.println(str);
    latch.countDown();
});

// Subscribe
d.subscribe("updates");

// Wait for a message to come in
latch.await();

// Messages that have arrived will be processed
CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));

// Wait for the drain to complete
drained.get();

// Close the connection
nc.close();
const sub = nc.subscribe(subj, { callback: (_err, _msg) => {} });
nc.publish(subj);
nc.publish(subj);
nc.publish(subj);
await sub.drain();
import asyncio
from nats.aio.client import Client as NATS

async def example(loop):
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222", loop=loop)

    async def handler(msg):
        print("[Received] ", msg)
        await nc.publish(msg.reply, b'I can help')

        # Can check whether client is in draining state
        if nc.is_draining:
            print("Connection is draining")

    sid = await nc.subscribe("help", "workers", cb=handler)
    await nc.flush()

    # Gracefully unsubscribe the subscription
    await nc.drain(sid)
# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain
natsConnection      *conn      = NULL;
natsSubscription    *sub       = NULL;
natsStatus          s          = NATS_OK;

s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);

// Subscribe
if (s == NATS_OK)
    s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);

// Publish 2 messages
if (s == NATS_OK)
{
    int i;
    for (i=0; (s == NATS_OK) && (i<2); i++)
    {
        s = natsConnection_PublishString(conn, "foo", "hello");
    }
}

// Call Drain on the subscription. It unsubscribes but
// wait for all pending messages to be processed.
if (s == NATS_OK)
    s = natsSubscription_Drain(sub);

(...)

// Destroy objects that were created
natsSubscription_Destroy(sub);
natsConnection_Destroy(conn);

Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request-reply or similar.

上一页队列订阅下一页接收结构化数据

最后更新于2年前