NATS文档
  • 欢迎
  • 发行备注
    • 最新情况
      • NATS 2.2
      • NATS 2.0
  • NATS 概念
    • 概览
      • 比较 NATS
    • 什么是NATS
      • 演练安装
    • 基于主题的消息
    • 核心NATS
      • 发布和订阅
        • 发布/订阅演 练
      • 请求和响应
        • 请求/响应 演练
      • 队列组
        • 队列 演练
    • JetStream
      • 流
      • 消费者
        • 示例
      • JetStream 演练
      • 键值对存储
        • 键值对存储演练
      • 对象存储
        • 对象存储演练
    • 主题映射与分区
    • NATS服务器基础架构
      • NATS部署架构适配
    • 安全
    • 连接性
  • 使用 NATS
    • NATS工具
      • nats
        • nats基准测试
      • nk
      • nsc
        • 基础
        • 流
        • 服务
        • 签名密钥
        • 撤销
        • 管理操作
      • nats-top
        • 教程
    • 用NATS开发
      • 一个NATS应用的解剖
      • 连接
        • 连接到默认服务器
        • 连接到特定服务器
        • 连接到群集
        • 连接名称
        • 用用户名和密码做认证
        • 用令牌做认证
        • 用NKey做认证
        • 用一个可信文件做认证
        • 用TLS加密连接
        • 设置连接超时
        • 乒乓协议
        • 关闭响应消息
        • 杂技功能
        • 自动恢复
          • 禁用自动重连
          • 设置自动重新连接的最大次数
          • 随机
          • 重连尝试之间暂停
          • 关注重连事件
          • 重连尝试期间缓存消息
        • 监视连接
          • 关注连接事件
          • 低速消费者
      • 接收消息
        • 同步订阅
        • 异步订阅
        • 取消订阅
        • N个消息后取消订阅
        • 回复一个消息
        • 通配符订阅
        • 队列订阅
        • 断开连接前清除消息
        • 接收结构化数据
      • 发送消息
        • 包含一个回复主题
        • 请求回复语义
        • 缓存刷入和乒
        • 发送结构化数据
      • JetStream
        • 深入JetStream模型
        • 管理流和消费者
        • 消费者详情
        • 发布到流
        • 使用键值对存储
        • 使用对象存储
      • 教程
        • 用go做个自定义拨号器
  • 运行一个NATS服务
    • 安装、运行和部署NATS服务
      • 安装一个NATS服务
      • 运行和部署一个NATS服务
      • Windows服务
      • 信号
    • 环境约束
    • NATS和Docker
      • 教程
      • Docker Swarm
      • Python 和 NGS 运行在Docker
      • JetStream
    • NATS和Kubernetes
      • 用Helm 部署NATS
      • 创建一个Kubernetes群集
      • NATS群集和认证管理
      • 用cfssl保护NATS群集
      • 用负载均衡来保护外部的NATS访问
      • 在Digital Ocean用Helm创建超级NATS群集
      • 使用Helm从0到K8S再到叶子节点
    • NATS服务的客户端
    • 配置 NATS服务
      • 配置 JetStream
        • 配置管理 Management
          • NATS管理命令行
          • 地形
          • GitHub Actions
          • Kubernetes控制器
      • 群集
        • 群集配置
        • JetStream 群集
          • 管理
      • 网关超级群集
        • 配置
      • 叶子节点
        • 配置
        • JetStream在叶子节点
      • 安全加固NATS
        • 使用 TLS
        • 认证
          • 令牌
          • 用户名/密码
          • TLS认证
            • 群集中的TLS认证
          • NKeys
          • 认证超时
          • 去中心化的 JWT 认证/授权
            • 使用解析器查找帐户
            • 内存解析器教程
            • 混合认证/授权安装
        • 授权
        • 基于账户的多租户
        • OCSP Stapling
      • 日志
      • 使用监控
      • MQTT
        • 配置
      • 配置主题映射
      • 系统事件
        • 系统时间和去中心化的JWT教程
      • WebSocket
        • 配置
    • 管理和监控你的NATS服务基础架构
      • 监控
        • 监控 JetStream
      • 管理 JetStream
        • 账号信息
        • 命名流,消费者和账号
        • 流
        • 消费者
        • 数据复制
        • 灾难回复
        • 加密Rest
      • 管理JWT安全
        • 深入JWT指南
      • 升级一个群集
      • 慢消费者
      • 信号
      • 跛脚鸭模式
  • 参考
    • 常见问题
    • NATS协议
      • 协议演示
      • 客户端协议
        • 开发一个客户端
      • NATS群集协议
      • JetStream API参考
  • 遗产
    • STAN='NATS流'
      • STAN概念
        • 和NATS的关系
        • 客户端连接
        • 频道
          • 消息日志
          • 订阅
            • 通常的
            • 持久化的
            • 队列组
            • 重新投递
        • 存储接口
        • 存储加密
        • 群集
          • Supported Stores
          • Configuration
          • Auto Configuration
          • Containers
        • Fault Tolerance
          • Active Server
          • Standby Servers
          • Shared State
          • Failover
        • Partitioning
        • Monitoring
          • Endpoints
      • Developing With STAN
        • Connecting to NATS Streaming Server
        • Publishing to a Channel
        • Receiving Messages from a Channel
        • Durable Subscriptions
        • Queue Subscriptions
        • Acknowledgements
        • The Streaming Protocol
      • STAN NATS Streaming Server
        • Installing
        • Running
        • Configuring
          • Command Line Arguments
          • Configuration File
          • Store Limits
          • Persistence
            • File Store
            • SQL Store
          • Securing
        • Process Signaling
        • Windows Service
        • Embedding NATS Streaming Server
        • Docker Swarm
        • Kubernetes
          • NATS Streaming with Fault Tolerance.
    • nats账号服务
      • Basics
      • Inspecting JWTs
      • Directory Store
      • Update Notifications
由 GitBook 提供支持
在本页
  • Creating, and deleting KV buckets
  • Getting
  • Putting
  • Deleting
  • Getting all the keys
  • Getting the history for a key
  • Watching for changes
  1. 使用 NATS
  2. 用NATS开发
  3. JetStream

使用键值对存储

上一页发布到流下一页使用对象存储

最后更新于2年前

As the Key Value Store is built on top of the JetStream persistence layer you obtain a KeyValueManager object from your JetStream .

Creating, and deleting KV buckets

You can create as many independent key/value store instance, called 'buckets', as you need. Buckets are typically created, purged or deleted administratively (e.g. using the nats CLI tool), but this can also be done using one of the following KeyValueManager calls:

// KeyValue will lookup and bind to an existing KeyValue store.
KeyValue(bucket string) (KeyValue, error)
// CreateKeyValue will create a KeyValue store with the following configuration.
CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
DeleteKeyValue(bucket string) error
/**
 * Create a key value store.
 * @param config the key value configuration
 * @return bucket info
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException;

/**
* Get the list of bucket names.
* @return list of bucket names
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
List<String> getBucketNames() throws IOException, JetStreamApiException, InterruptedException;

/**
* Gets the info for an existing bucket.
* @param bucketName the bucket name to use
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @return the bucket status object
*/
KeyValueStatus getBucketInfo(String bucketName) throws IOException, JetStreamApiException;

/**
* Deletes an existing bucket. Will throw a JetStreamApiException if the delete fails.
* @param bucketName the stream name to use.
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String bucketName) throws IOException, JetStreamApiException;
  static async create(
    js: JetStreamClient,
    name: string,
    opts: Partial<KvOptions> = {},
  ): Promise<KV>

static async bind(
    js: JetStreamClient,
    name: string,
    opts: Partial<{ codec: KvCodecs }> = {},
): Promise<KV>

destroy(): Promise<boolean>
# from the JetStreamContext

async def key_value(self, bucket: str) -> KeyValue:

async def create_key_value(
    self,
    config: Optional[api.KeyValueConfig] = None,
    **params,
) -> KeyValue:
    """
    create_key_value takes an api.KeyValueConfig and creates a KV in JetStream.
    """
    
async def delete_key_value(self, bucket: str) -> bool:
    """
    delete_key_value deletes a JetStream KeyValue store by destroying
    the associated stream.
    """  
NATS_EXTERN natsStatus 	kvConfig_Init (kvConfig *cfg)
 	Initializes a KeyValue configuration structure.
 
NATS_EXTERN natsStatus 	js_CreateKeyValue (kvStore **new_kv, jsCtx *js, kvConfig *cfg)
 	Creates a KeyValue store with a given configuration.
 
NATS_EXTERN natsStatus 	js_KeyValue (kvStore **new_kv, jsCtx *js, const char *bucket)
 	Looks-up and binds to an existing KeyValue store.
 
NATS_EXTERN natsStatus 	js_DeleteKeyValue (jsCtx *js, const char *bucket)
 	Deletes a KeyValue store.
 
NATS_EXTERN void 	kvStore_Destroy (kvStore *kv)
 	Destroys a KeyValue store object.
/// <summary>
/// Create a key value store.
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="config">the key value configuration</param>
/// <returns></returns>
KeyValueStatus Create(KeyValueConfiguration config);

/// <summary>
/// Get the list of bucket names.
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <returns>list of bucket names</returns>
IList<string> GetBucketNames();

/// <summary>
/// Gets the info for an existing bucket.
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="bucketName">the bucket name to use</param>
/// <returns>the bucket status object</returns>
KeyValueStatus GetBucketInfo(string bucketName);

/// <summary>
/// Deletes an existing bucket. Will throw a NATSJetStreamException if the delete fails.
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="bucketName"></param>
void Delete(string bucketName);

Getting

You can do a get to get the current value on a key, or ask to get a specific revision of the value.

// Get returns the latest value for the key.
Get(key string) (entry KeyValueEntry, err error)
// GetRevision returns a specific revision value for the key.
GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
/**
* Get the entry for a key
* @param key the key
* @return the KvEntry object or null if not found.
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
KeyValueEntry get(String key) throws IOException, JetStreamApiException;

/**
* Get the specific revision of an entry for a key.
* @param key the key
* @param revision the revision
* @return the KvEntry object or null if not found.
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws IllegalArgumentException the server is not JetStream enabled
*/
KeyValueEntry get(String key, long revision) throws IOException, JetStreamApiException;
async get(k: string): Promise<KvEntry | null>
async def get(self, key: str) -> Entry:
   """
   get returns the latest value for the key.
   """
NATS_EXTERN natsStatus 	kvStore_Get (kvEntry **new_entry, kvStore *kv, const char *key)
 	Returns the latest entry for the key.
 
NATS_EXTERN natsStatus 	kvStore_GetRevision (kvEntry **new_entry, kvStore *kv, const char *key, uint64_t revision)
 	Returns the entry at the specific revision for the key.
/// <summary>
/// Get the entry for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <returns>The entry</returns>
KeyValueEntry Get(string key);

/// <summary>
/// Get the specific revision of an entry for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="revision">the specific revision</param>
/// <returns>The entry</returns>
KeyValueEntry Get(string key, ulong revision);

Putting

The key is always a string, you can simply use Put to store a byte array, or the convenience PutString to put a string. For 'compare and set' functionality you can use Create and Update.

Put(key string, value []byte) (revision uint64, err error)
// PutString will place the string for the key into the store.
PutString(key string, value string) (revision uint64, err error)
// Create will add the key/value pair if it does not exist.
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value if the latest revision matches.
Update(key string, value []byte, last uint64) (revision uint64, err error)
/**
 * Put a byte[] as the value for a key
 * @param key the key
 * @param value the bytes of the value
 * @return the revision number for the key
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
long put(String key, byte[] value) throws IOException, JetStreamApiException;

/**
 * Put a string as the value for a key
 * @param key the key
 * @param value the UTF-8 string
 * @return the revision number for the key
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
long put(String key, String value) throws IOException, JetStreamApiException;

/**
 * Put a long as the value for a key
 * @param key the key
 * @param value the number
 * @return the revision number for the key
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
long put(String key, Number value) throws IOException, JetStreamApiException;

/**
 * Put as the value for a key iff the key does not exist (there is no history)
 * or is deleted (history shows the key is deleted)
 * @param key the key
 * @param value the bytes of the value
 * @return the revision number for the key
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
long create(String key, byte[] value) throws IOException, JetStreamApiException;

/**
 * Put as the value for a key iff the key exists and its last revision matches the expected
 * @param key the key
 * @param value the bytes of the value
 * @param expectedRevision the expected last revision
 * @return the revision number for the key
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws IllegalArgumentException the server is not JetStream enabled
 */
long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException;
  async put(
    k: string,
    data: Uint8Array,
    opts: Partial<KvPutOptions> = {},
  ): Promise<number>

create(k: string, data: Uint8Array): Promise<number>    
    
update(k: string, data: Uint8Array, version: number): Promise<number>
async def put(self, key: str, value: bytes) -> int:
    """
    put will place the new value for the key into the store
    and return the revision number.
    """
    
async def update(self, key: str, value: bytes, last: int) -> int:
    """
    update will update the value iff the latest revision matches.
    """    
NATS_EXTERN natsStatus 	kvStore_Put (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
 	Places the new value for the key into the store.
 
NATS_EXTERN natsStatus 	kvStore_PutString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
 	Places the new value (as a string) for the key into the store.
 
NATS_EXTERN natsStatus 	kvStore_Create (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len)
 	Places the value for the key into the store if and only if the key does not exist.
 
NATS_EXTERN natsStatus 	kvStore_CreateString (uint64_t *rev, kvStore *kv, const char *key, const char *data)
 	Places the value (as a string) for the key into the store if and only if the key does not exist.
 
NATS_EXTERN natsStatus 	kvStore_Update (uint64_t *rev, kvStore *kv, const char *key, const void *data, int len, uint64_t last)
 	Updates the value for the key into the store if and only if the latest revision matches.
 
NATS_EXTERN natsStatus 	kvStore_UpdateString (uint64_t *rev, kvStore *kv, const char *key, const char *data, uint64_t last)
 	Updates the value (as a string) for the key into the store if and only if the latest revision matches.
/// <summary>
/// Put a byte[] as the value for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="value">the bytes of the value</param>
/// <returns>the revision number for the key</returns>
ulong Put(string key, byte[] value);

/// <summary>
/// Put a string as the value for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="value">the UTF-8 string</param>
/// <returns>the revision number for the key</returns>
ulong Put(string key, string value);

/// <summary>
///Put a long as the value for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="value">the number</param>
/// <returns>the revision number for the key</returns>
ulong Put(string key, long value);

/// <summary>
/// Put as the value for a key iff the key does not exist (there is no history)
/// or is deleted (history shows the key is deleted)
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="value">the bytes of the value</param>
/// <returns>the revision number for the key</returns>
ulong Create(string key, byte[] value);

/// <summary>
/// Put as the value for a key iff the key exists and its last revision matches the expected
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <param name="value">the bytes of the value</param>
/// <param name="expectedRevision"></param>
/// <returns>the revision number for the key</returns>
ulong Update(string key, byte[] value, ulong expectedRevision);

Deleting

You can delete a specific key, or purge the whole key/value bucket.

// Delete will place a delete marker and leave all revisions.
Delete(key string) error
// Purge will place a delete marker and remove all previous revisions.
Purge(key string) error
/**
* Soft deletes the key by placing a delete marker.
* @param key the key
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void delete(String key) throws IOException, JetStreamApiException;

/**
* Purge all values/history from the specific key
* @param key the key
* @throws IOException covers various communication issues with the NATS
*         server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
void purge(String key) throws IOException, JetStreamApiException;
delete(k: string): Promise<void>
    
purge(k: string): Promise<void>
async def delete(self, key: str) -> bool:
    """
    delete will place a delete marker and remove all previous revisions.
    """
    
async def purge(self, key: str) -> bool:
    """
    purge will remove the key and all revisions.
    """    
NATS_EXTERN natsStatus 	kvStore_Delete (kvStore *kv, const char *key)
 	Deletes a key by placing a delete marker and leaving all revisions.
 
NATS_EXTERN natsStatus 	kvStore_Purge (kvStore *kv, const char *key, kvPurgeOptions *opts)
 	Deletes a key by placing a purge marker and removing all revisions.
 	
NATS_EXTERN natsStatus 	kvStore_PurgeDeletes (kvStore *kv, kvPurgeOptions *opts)
 	Purge and removes delete markers.
/// <summary>
/// Soft deletes the key by placing a delete marker. 
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
void Delete(string key);

/// <summary>
/// Purge all values/history from the specific key. 
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
void Purge(string key);

/// <summary>
/// Remove history from all keys that currently are deleted or purged,
/// using a default KeyValuePurgeOptions
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
void PurgeDeletes();

/// <summary>
/// Remove history from all keys that currently are deleted or purged
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
void PurgeDeletes(KeyValuePurgeOptions options);

Getting all the keys

You can get the list of all the keys currently having a value associated using Keys()

// Keys will return all keys.
Keys(opts ...WatchOpt) ([]string, error)
/**
 * Get a list of the keys in a bucket.
 * @return List of keys
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws InterruptedException if the thread is interrupted
 */
List<String> keys() throws IOException, JetStreamApiException, InterruptedException;
async keys(k = ">"): Promise<QueuedIterator<string>>
NATS_EXTERN natsStatus 	kvStore_Keys (kvKeysList *list, kvStore *kv, kvWatchOptions *opts)
 	Returns all keys in the bucket.
 
NATS_EXTERN void 	kvKeysList_Destroy (kvKeysList *list)
 	Destroys this list of KeyValue store key strings.
/// <summary>
/// Get a list of the keys in a bucket.
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <returns>The list of keys</returns>
IList<string> Keys();

Getting the history for a key

The JetStream key/value store has a feature you don't usually find in key/value stores: the ability to keep a history of the values associated with a key (rather than just the current value). The depth of the history is specified when the key/value bucket is created, and the default is a history depth of 1 (i.e. no history).

// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
/**
 * Get the history (list of KeyValueEntry) for a key
 * @param key the key
 * @return List of KvEntry
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws InterruptedException if the thread is interrupted
 */
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;
async history(
    opts: { key?: string; headers_only?: boolean } = {},
  ): Promise<QueuedIterator<KvEntry>>
NATS_EXTERN natsStatus 	kvStore_History (kvEntryList *list, kvStore *kv, const char *key, kvWatchOptions *opts)
 	Returns all historical entries for the key.
 
NATS_EXTERN void 	kvEntryList_Destroy (kvEntryList *list)
 	Destroys this list of KeyValue store entries.
/// <summary>
/// Get the history (list of KeyValueEntry) for a key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// </summary>
/// <param name="key">the key</param>
/// <returns>The list of KeyValueEntry</returns>
IList<KeyValueEntry> History(string key);

Watching for changes

Watching a key/value bucket is like subscribing to updates: you provide a callback and you can watch all of the keys in the bucket or specify which specific key(s) you want to be kept updated about.

// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
/**
 * Watch updates for a specific key
 * @param key the key
 * @param watcher the watcher
 * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
 * @return The KeyValueWatchSubscription
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws InterruptedException if the thread is interrupted
 */
NatsKeyValueWatchSubscription watch(String key, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;

/**
 * Watch updates for all keys
 * @param watcher the watcher
 * @param watchOptions the watch options to apply. If multiple conflicting options are supplied, the last options wins.
 * @return The KeyValueWatchSubscription
 * @throws IOException covers various communication issues with the NATS
 *         server such as timeout or interruption
 * @throws JetStreamApiException the request had an error related to the data
 * @throws InterruptedException if the thread is interrupted
 */
NatsKeyValueWatchSubscription watchAll(KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException, InterruptedException;
  async watch(
    opts: {
      key?: string;
      headers_only?: boolean;
      initializedFn?: callbackFn;
    } = {},
  ): Promise<QueuedIterator<KvEntry>>
NATS_EXTERN natsStatus 	kvStore_Watch (kvWatcher **new_watcher, kvStore *kv, const char *keys, kvWatchOptions *opts)
 	Returns a watcher for any updates to keys that match the keys argument.
 
NATS_EXTERN natsStatus 	kvStore_WatchAll (kvWatcher **new_watcher, kvStore *kv, kvWatchOptions *opts)
 	Returns a watcher for any updates to any keys of the KeyValue store bucket.
/// <summary>
/// Watch updates for a specific key
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// WARNING: This api requires an internal consumer the enforces ordering of messages.
/// This portion of the implementation is not complete yet. If there was some sort of
/// error from the server and messages were skipped or came out of order the data received
/// would be incomplete. While this is an edge case, it can still technically happen. 
/// </summary>
/// <param name="key">the key</param>
/// <param name="watcher">the watcher</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns></returns>
KeyValueWatchSubscription Watch(string key, IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);

/// <summary>
/// Watch updates for all keys
/// THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
/// WARNING: This api requires an internal consumer the enforces ordering of messages.
/// This portion of the implementation is not complete yet. If there was some sort of
/// error from the server and messages were skipped or came out of order the data received
/// would be incomplete. While this is an edge case, it can still technically happen. 
/// </summary>
/// <param name="watcher">the watcher</param>
/// <param name="watchOptions">the watch options to apply. If multiple conflicting options are supplied, the last options wins.</param>
/// <returns>The KeyValueWatchSubscription</returns>
KeyValueWatchSubscription WatchAll(IKeyValueWatcher watcher, params KeyValueWatchOption[] watchOptions);
context