This feature is the ability to drain connections or subscriptions and then close the connection. Closing a connection (using close()), or unsubscribing from a subscription, are generally considered immediate requests. When you close or unsubscribe the library will halt messages in any pending queue or cache for subscribers. When you drain a subscription or connection, it will process any inflight and cached/pending messages before closing.
Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to delivery timing.
The libraries can provide drain on a connection or on a subscriber, or both.
For a connection the process is essentially:
Drain all subscriptions
Stop new messages from being published
Flush any remaining published messages
Close
The API for drain can generally be used instead of close:
As an example of draining a connection:
wg :=sync.WaitGroup{}wg.Add(1)errCh :=make(chanerror, 1)// To simulate a timeout, you would set the DrainTimeout()// to a value less than the time spent in the message callback,// so say: nats.DrainTimeout(10*time.Millisecond).nc, err := nats.Connect("demo.nats.io", nats.DrainTimeout(10*time.Second), nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) { errCh <- err }), nats.ClosedHandler(func(_ *nats.Conn) { wg.Done() }))if err !=nil { log.Fatal(err)}// Just to not collide using the demo server with other users.subject := nats.NewInbox()// Subscribe, but add some delay while processing.if _, err := nc.Subscribe(subject, func(_ *nats.Msg) { time.Sleep(200* time.Millisecond)}); err !=nil { log.Fatal(err)}// Publish a messageif err := nc.Publish(subject, []byte("hello")); err !=nil { log.Fatal(err)}// Drain the connection, which will close it when done.if err := nc.Drain(); err !=nil { log.Fatal(err)}// Wait for the connection to be closed.wg.Wait()// Check if there was an errorselect {case e :=<-errCh: log.Fatal(e)default:}
Connection nc =Nats.connect("nats://demo.nats.io:4222");// Use a latch to wait for a message to arriveCountDownLatch latch =newCountDownLatch(1);// Create a dispatcher and inline message handlerDispatcher d =nc.createDispatcher((msg) -> {String str =newString(msg.getData(),StandardCharsets.UTF_8);System.out.println(str);latch.countDown();});// Subscribed.subscribe("updates");// Wait for a message to come inlatch.await();// Drain the connection, which will close itCompletableFuture<Boolean> drained =nc.drain(Duration.ofSeconds(10));// Wait for the drain to completedrained.get();
staticvoidonMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg,void*closure){printf("Received msg: %s - %.*s\n", natsMsg_GetSubject(msg), natsMsg_GetDataLength(msg), natsMsg_GetData(msg));// Add some delay while processingnats_Sleep(200);// Need to destroy the message!natsMsg_Destroy(msg);}staticvoidcloseHandler(natsConnection *conn,void*closure){ cond_variable cv = (cond_variable) closure;notify_cond_variable(cv);}(...)natsConnection *conn =NULL;natsOptions *opts =NULL;natsSubscription *sub =NULL;natsStatus s = NATS_OK;cond_variable cv =new_cond_variable(); // some fictuous way to notify between threads.s =natsOptions_Create(&opts);if (s == NATS_OK)// Setup a close handler and pass a reference to our condition variable. s =natsOptions_SetClosedCB(opts, closeHandler, (void*) cv);if (s == NATS_OK) s =natsConnection_Connect(&conn, opts);// Subscribeif (s == NATS_OK) s =natsConnection_Subscribe(&sub, conn,"foo", onMsg,NULL);// Publish a messageif (s == NATS_OK) s =natsConnection_PublishString(conn,"foo","hello");// Drain the connection, which will close it when done.if (s == NATS_OK) s =natsConnection_Drain(conn);// Wait for the connection to be closedif (s == NATS_OK)cond_variable_wait(cv);(...)// Destroy objects that were creatednatsSubscription_Destroy(sub);natsConnection_Destroy(conn);natsOptions_Destroy(opts);
The mechanics of drain for a subscription are simpler:
Unsubscribe
Process all cached or inflight messages
Clean up
The API for drain can generally be used instead of unsubscribe:
nc, err := nats.Connect("demo.nats.io")if err !=nil { log.Fatal(err) }defer nc.Close() done :=sync.WaitGroup{} done.Add(1) count :=0 errCh :=make(chanerror, 1) msgAfterDrain :="not this one"// Just to not collide using the demo server with other users. subject := nats.NewInbox()// This callback will process each message slowly sub, err := nc.Subscribe(subject, func(m *nats.Msg) {ifstring(m.Data) == msgAfterDrain { errCh <- fmt.Errorf("Should not have received this message")return } time.Sleep(100* time.Millisecond) count++if count ==2 { done.Done() } })// Send 2 messagesfor i :=0; i <2; i++ { nc.Publish(subject, []byte("hello")) }// Call Drain on the subscription. It unsubscribes but// wait for all pending messages to be processed.if err := sub.Drain(); err !=nil { log.Fatal(err) }// Send one more message, this message should not be received nc.Publish(subject, []byte(msgAfterDrain))// Wait for the subscription to have processed the 2 messages. done.Wait()// Now check that the 3rd message was not receivedselect {case e :=<-errCh: log.Fatal(e)case<-time.After(200* time.Millisecond):// OK! }
Connection nc =Nats.connect("nats://demo.nats.io:4222");// Use a latch to wait for a message to arriveCountDownLatch latch =newCountDownLatch(1);// Create a dispatcher and inline message handlerDispatcher d =nc.createDispatcher((msg) -> {String str =newString(msg.getData(),StandardCharsets.UTF_8);System.out.println(str);latch.countDown();});// Subscribed.subscribe("updates");// Wait for a message to come inlatch.await();// Messages that have arrived will be processedCompletableFuture<Boolean> drained =d.drain(Duration.ofSeconds(10));// Wait for the drain to completedrained.get();// Close the connectionnc.close();
import asynciofrom nats.aio.client import Client as NATSasyncdefexample(loop): nc =NATS()await nc.connect("nats://127.0.0.1:4222", loop=loop)asyncdefhandler(msg):print("[Received] ", msg)await nc.publish(msg.reply, b'I can help')# Can check whether client is in draining stateif nc.is_draining:print("Connection is draining") sid =await nc.subscribe("help", "workers", cb=handler)await nc.flush()# Gracefully unsubscribe the subscriptionawait nc.drain(sid)
# There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain
natsConnection *conn =NULL;natsSubscription *sub =NULL;natsStatus s = NATS_OK;s =natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);// Subscribeif (s == NATS_OK) s =natsConnection_Subscribe(&sub, conn,"foo", onMsg,NULL);// Publish 2 messagesif (s == NATS_OK){int i;for (i=0; (s == NATS_OK) && (i<2); i++) { s =natsConnection_PublishString(conn,"foo","hello"); }}// Call Drain on the subscription. It unsubscribes but// wait for all pending messages to be processed.if (s == NATS_OK) s =natsSubscription_Drain(sub);(...)// Destroy objects that were creatednatsSubscription_Destroy(sub);natsConnection_Destroy(conn);
Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request-reply or similar.