Skip to content

Commit

Permalink
Added JS Stream interface (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk authored Nov 9, 2023
1 parent 6a026a5 commit e02402c
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 13 deletions.
8 changes: 4 additions & 4 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ ValueTask<PubAckResponse> PublishAsync(
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default);
ValueTask<INatsJSStream> CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new stream if it doesn't exist or returns an existing stream with the same name.
Expand All @@ -164,7 +164,7 @@ ValueTask<PubAckResponse> PublishAsync(
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> CreateStreamAsync(
ValueTask<INatsJSStream> CreateStreamAsync(
StreamConfiguration request,
CancellationToken cancellationToken = default);

Expand Down Expand Up @@ -217,7 +217,7 @@ ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> GetStreamAsync(
ValueTask<INatsJSStream> GetStreamAsync(
string stream,
StreamInfoRequest? request = null,
CancellationToken cancellationToken = default);
Expand Down Expand Up @@ -245,7 +245,7 @@ ValueTask<NatsJSStream> UpdateStreamAsync(
/// <remarks>
/// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them.
/// </remarks>
IAsyncEnumerable<NatsJSStream> ListStreamsAsync(
IAsyncEnumerable<INatsJSStream> ListStreamsAsync(
string? subject = default,
CancellationToken cancellationToken = default);
}
121 changes: 121 additions & 0 deletions src/NATS.Client.JetStream/INatsJSStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;

public interface INatsJSStream
{
/// <summary>
/// Stream info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed.
/// </summary>
StreamInfo Info { get; }

/// <summary>
/// Delete this stream.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether delete was successful or not.</returns>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <remarks>After deletion this object can't be used anymore.</remarks>
ValueTask<bool> DeleteAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Purge data from this stream. Leaves the stream.
/// </summary>
/// <param name="request">Purge request.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether delete was successful or not.</returns>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <remarks>After deletion this object can't be used anymore.</remarks>
ValueTask<StreamPurgeResponse> PurgeAsync(StreamPurgeRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Deletes a message from a stream.
/// </summary>
/// <param name="request">Delete message request.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Delete message response</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(StreamMsgDeleteRequest request, CancellationToken cancellationToken = default);

/// <summary>
/// Update stream properties on the server.
/// </summary>
/// <param name="request">Stream update request to be sent to the server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask UpdateAsync(
StreamUpdateRequest request,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name.
/// </summary>
/// <param name="consumer">Name of the consumer.</param>
/// <param name="ackPolicy">Ack policy to use. Must not be set to <c>none</c>. Default is <c>explicit</c>.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer for this stream if it doesn't exists or returns an existing one with the same name.
/// </summary>
/// <param name="request">Consumer creation request to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default);

ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Gets consumer information from the server and creates a NATS JetStream consumer <see cref="NatsJSConsumer"/>.
/// </summary>
/// <param name="consumer">Consumer name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> GetConsumerAsync(string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Enumerates through consumers belonging to this stream.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Async enumerable of consumer objects. Can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <remarks>
/// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them.
/// </remarks>
IAsyncEnumerable<ConsumerInfo> ListConsumersAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Delete a consumer from this stream.
/// </summary>
/// <param name="consumer">Consumer name to be deleted.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether the deletion was successful.</returns>
/// <exception cref="NatsJSException">There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<bool> DeleteConsumerAsync(string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Retrieve the stream info from the server and update this stream.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask RefreshAsync(CancellationToken cancellationToken = default);

ValueTask<NatsMsg<T>> GetDirectAsync<T>(StreamMsgGetRequest request, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

ValueTask<StreamMsgGetResponse> GetAsync(StreamMsgGetRequest request, CancellationToken cancellationToken = default);
}
8 changes: 4 additions & 4 deletions src/NATS.Client.JetStream/NatsJSContext.Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public partial class NatsJSContext
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public ValueTask<NatsJSStream> CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default) =>
public ValueTask<INatsJSStream> CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default) =>
CreateStreamAsync(new StreamCreateRequest { Name = stream, Subjects = subjects }, cancellationToken);

/// <summary>
Expand All @@ -25,7 +25,7 @@ public ValueTask<NatsJSStream> CreateStreamAsync(string stream, string[] subject
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public async ValueTask<NatsJSStream> CreateStreamAsync(
public async ValueTask<INatsJSStream> CreateStreamAsync(
StreamConfiguration request,
CancellationToken cancellationToken = default)
{
Expand Down Expand Up @@ -106,7 +106,7 @@ public async ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
public async ValueTask<NatsJSStream> GetStreamAsync(
public async ValueTask<INatsJSStream> GetStreamAsync(
string stream,
StreamInfoRequest? request = null,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -148,7 +148,7 @@ public async ValueTask<NatsJSStream> UpdateStreamAsync(
/// <remarks>
/// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them.
/// </remarks>
public async IAsyncEnumerable<NatsJSStream> ListStreamsAsync(
public async IAsyncEnumerable<INatsJSStream> ListStreamsAsync(
string? subject = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace NATS.Client.JetStream;
/// <summary>
/// Represents a NATS JetStream stream.
/// </summary>
public class NatsJSStream
public class NatsJSStream : INatsJSStream
{
private readonly NatsJSContext _context;
private readonly string _name;
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public class NatsKVStore : INatsKVStore
private const string NatsSequence = "Nats-Sequence";
private const string NatsTimeStamp = "Nats-Time-Stamp";
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;
private readonly INatsJSStream _stream;

internal NatsKVStore(string bucket, NatsJSContext context, NatsJSStream stream)
internal NatsKVStore(string bucket, NatsJSContext context, INatsJSStream stream)
{
Bucket = bucket;
_context = context;
Expand Down
4 changes: 2 additions & 2 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public class NatsObjStore : INatsObjStore

private readonly NatsObjContext _objContext;
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;
private readonly INatsJSStream _stream;

internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream)
internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, INatsJSStream stream)
{
Bucket = config.Bucket;
_objContext = objContext;
Expand Down

0 comments on commit e02402c

Please sign in to comment.