Skip to content

Commit

Permalink
[IMPROVED] Documentation of jetstream options and Fetch clarification
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jan 11, 2025
1 parent 1e1519f commit 42660b0
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 44 deletions.
71 changes: 35 additions & 36 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,37 @@

This doc covers the basic usage of the `jetstream` package in `nats.go` client.

- [JetStream Simplified Client](#jetstream-simplified-client)
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)

## Overview

Expand Down Expand Up @@ -118,15 +117,15 @@ func main() {
if err != nil {
// handle error
}

for msg := range msgs.Messages() {
msg.Ack()
fmt.Printf("Received a JetStream message via fetch: %s\n", string(msg.Data()))
messageCounter++
}

fmt.Printf("received %d messages\n", messageCounter)

if msgs.Error() != nil {
fmt.Println("Error during Fetch(): ", msgs.Error())
}
Expand Down Expand Up @@ -400,7 +399,7 @@ of messages/bytes. By default, `Fetch()` will wait 30 seconds before timing out
// receive up to 10 messages from the stream
msgs, err := c.Fetch(10)
if err != nil {
// handle error
// handle error
}

for msg := range msgs.Messages() {
Expand Down
12 changes: 9 additions & 3 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type (
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)

// FetchBytes is used to retrieve up to a provided bytes from the
Expand All @@ -82,7 +84,9 @@ type (
// for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, error)

// FetchNoWait is used to retrieve up to a provided number of messages
Expand All @@ -94,7 +98,9 @@ type (
// channel for delivered messages.
//
// Messages channel is always closed, thus it is safe to range over it
// without additional checks.
// without additional checks. After the channel is closed,
// MessageBatch.Error() should be checked to see if there was an error
// during message delivery (e.g. missing heartbeat).
FetchNoWait(batch int) (MessageBatch, error)

// Consume will continuously receive messages and handle them
Expand Down
27 changes: 24 additions & 3 deletions jetstream/jetstream_options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 The NATS Authors
// Copyright 2022-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -104,6 +104,9 @@ func WithGetMsgSubject(subject string) GetMsgOpt {
// PullMaxMessages limits the number of messages to be buffered in the client.
// If not provided, a default of 500 messages will be used.
// This option is exclusive with PullMaxBytes.
//
// PullMaxMessages implements PullConsumeOpt and PullMessagesOpt and thus can
// be used to configure both Consumer.Consume and Consumer.Messages.
type PullMaxMessages int

func (max PullMaxMessages) configureConsume(opts *consumeOpts) error {
Expand All @@ -125,6 +128,9 @@ func (max PullMaxMessages) configureMessages(opts *consumeOpts) error {
// PullExpiry sets timeout on a single pull request, waiting until at least one
// message is available.
// If not provided, a default of 30 seconds will be used.
//
// PullExpiry implements PullConsumeOpt and PullMessagesOpt and thus can
// be used to configure both Consumer.Consume and Consumer.Messages.
type PullExpiry time.Duration

func (exp PullExpiry) configureConsume(opts *consumeOpts) error {
Expand All @@ -148,6 +154,9 @@ func (exp PullExpiry) configureMessages(opts *consumeOpts) error {
// PullMaxBytes limits the number of bytes to be buffered in the client.
// If not provided, the limit is not set (max messages will be used instead).
// This option is exclusive with PullMaxMessages.
//
// PullMaxBytes implements PullConsumeOpt and PullMessagesOpt and thus can
// be used to configure both Consumer.Consume and Consumer.Messages.
type PullMaxBytes int

func (max PullMaxBytes) configureConsume(opts *consumeOpts) error {
Expand All @@ -166,8 +175,11 @@ func (max PullMaxBytes) configureMessages(opts *consumeOpts) error {
return nil
}

// PullThresholdMessages sets the message count on which Consume will trigger
// PullThresholdMessages sets the message count on which consuming will trigger
// new pull request to the server. Defaults to 50% of MaxMessages.
//
// PullThresholdMessages implements PullConsumeOpt and PullMessagesOpt and thus
// can be used to configure both Consumer.Consume and Consumer.Messages.
type PullThresholdMessages int

func (t PullThresholdMessages) configureConsume(opts *consumeOpts) error {
Expand All @@ -180,8 +192,11 @@ func (t PullThresholdMessages) configureMessages(opts *consumeOpts) error {
return nil
}

// PullThresholdBytes sets the byte count on which Consume will trigger
// PullThresholdBytes sets the byte count on which consuming will trigger
// new pull request to the server. Defaults to 50% of MaxBytes (if set).
//
// PullThresholdBytes implements PullConsumeOpt and PullMessagesOpt and thus
// can be used to configure both Consumer.Consume and Consumer.Messages.
type PullThresholdBytes int

func (t PullThresholdBytes) configureConsume(opts *consumeOpts) error {
Expand All @@ -199,6 +214,9 @@ func (t PullThresholdBytes) configureMessages(opts *consumeOpts) error {
// than the idle heartbeat setting, the subscription will be removed
// and error will be passed to the message handler.
// If not provided, a default PullExpiry / 2 will be used (capped at 30 seconds)
//
// PullHeartbeat implements PullConsumeOpt and PullMessagesOpt and thus can
// be used to configure both Consumer.Consume and Consumer.Messages.
type PullHeartbeat time.Duration

func (hb PullHeartbeat) configureConsume(opts *consumeOpts) error {
Expand All @@ -221,6 +239,9 @@ func (hb PullHeartbeat) configureMessages(opts *consumeOpts) error {

// StopAfter sets the number of messages after which the consumer is
// automatically stopped and no more messages are pulled from the server.
//
// StopAfter implements PullConsumeOpt and PullMessagesOpt and thus can
// be used to configure both Consumer.Consume and Consumer.Messages.
type StopAfter int

func (nMsgs StopAfter) configureConsume(opts *consumeOpts) error {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/pull.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2024 The NATS Authors
// Copyright 2022-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down
2 changes: 1 addition & 1 deletion micro/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# NATS micro
# NATS micro [![GoDoc](https://pkg.go.dev/badge/github.com/nats-io/nats.go/micro.svg)](https://pkg.go.dev/github.com/nats-io/nats.go/micro)

- [Overview](#overview)
- [Basic usage](#basic-usage)
Expand Down

0 comments on commit 42660b0

Please sign in to comment.