管理流和消费者

Streams and durable consumers can be defined administratively outside the application (typically using the NATS CLI Tool) in which case the application only needs to know about the well-known names of the durable consumers it wants to use. But you can also manage streams and consumers programmatically.

Common stream management operations are:

  • Add a stream. Adding a stream is an idempotent function, which means that if a stream does not exist, it will be created, and if a stream already exists, then the add operation will succeed only if the existing stream matches exactly the attributes specified in the 'add' call.

  • Delete a stream.

  • Purge a stream (delete all the messages stored in the stream)

  • Get or remove a specific message from a stream by sequence number

  • Add or update (or delete) a consumer

  • Get info and statistics on streams/consumers/account. Get/remove/get information on individual messages stored in a stream.

func ExampleJetStreamManager() {
	nc, _ := nats.Connect("localhost")

	js, _ := nc.JetStream()

	// Create a stream
	js.AddStream(&nats.StreamConfig{
		Name:     "FOO",
		Subjects: []string{"foo"},
		MaxBytes: 1024,
	})

	// Update a stream
	js.UpdateStream(&nats.StreamConfig{
		Name:     "FOO",
		MaxBytes: 2048,
	})

	// Create a durable consumer
	js.AddConsumer("FOO", &nats.ConsumerConfig{
		Durable: "BAR",
	})

	// Get information about all streams (with Context JSOpt)
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	for info := range js.StreamsInfo(nats.Context(ctx)) {
		fmt.Println("stream name:", info.Config.Name)
	}

	// Get information about all consumers (with MaxWait JSOpt)
	for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) {
		fmt.Println("consumer name:", info.Name)
	}

	// Delete a consumer
	js.DeleteConsumer("FOO", "BAR")

	// Delete a stream
	js.DeleteStream("FOO")
}

最后更新于