Skip to content

Commit

Permalink
Initial JetStream support (nats-io#86)
Browse files Browse the repository at this point in the history
* Started JetStream project

* Test projects tidy-up and minor fixes

* Seperated JetStream tests
* Naming fix for JS
* Exclude JetStream from NuGet for now

* Adding generated records (nats-io#88)

* Adding records to jumpstart jetstream

* by hand changes

* Fixed stream model names

* Set minimum stream config defaults

* Fixed JSON types

* better conversion (nats-io#89)

* Increased logging on flaky test

* generate JetStream models with NJsonSchema (nats-io#90)

Signed-off-by: Caleb Lloyd <[email protected]>

* Trace another flaky test

---------

Signed-off-by: Caleb Lloyd <[email protected]>
Co-authored-by: Scott Fauerbach <[email protected]>
Co-authored-by: Caleb Lloyd <[email protected]>
  • Loading branch information
3 people authored Jul 13, 2023
1 parent 5341478 commit 0aaccd7
Show file tree
Hide file tree
Showing 157 changed files with 6,802 additions and 86 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@ jobs:
- name: Build
run: dotnet build -c Debug

- name: Test
- name: Test Core
run: dotnet test -c Debug --no-build tests/NATS.Client.Core.Tests/NATS.Client.Core.Tests.csproj

- name: Test JetStream
run: dotnet test -c Debug --no-build tests/NATS.Client.JetStream.Tests/NATS.Client.JetStream.Tests.csproj

memory_test:
name: memory test
strategy:
Expand Down
30 changes: 30 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core.PublishHeaders
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.Core.SubscribeHeaders", "sandbox\Example.Core.SubscribeHeaders\Example.Core.SubscribeHeaders.csproj", "{A96660DB-DAEB-4C57-8096-F236AC4FA927}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.JetStream", "src\NATS.Client.JetStream\NATS.Client.JetStream.csproj", "{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.JetStream.Tests", "tests\NATS.Client.JetStream.Tests\NATS.Client.JetStream.Tests.csproj", "{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.TestUtilities", "tests\NATS.Client.TestUtilities\NATS.Client.TestUtilities.csproj", "{90E5BF38-70C1-460A-9177-CE42815BDBF5}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{BD234E2E-F51A-4B18-B8BE-8AF6D546BF87}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Schema.Generation", "tools\Schema.Generation\Schema.Generation.csproj", "{B7DD4A9C-2D24-4772-951E-86A665C59ADF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -127,6 +137,22 @@ Global
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A96660DB-DAEB-4C57-8096-F236AC4FA927}.Release|Any CPU.Build.0 = Release|Any CPU
{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB}.Release|Any CPU.Build.0 = Release|Any CPU
{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145}.Release|Any CPU.Build.0 = Release|Any CPU
{90E5BF38-70C1-460A-9177-CE42815BDBF5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90E5BF38-70C1-460A-9177-CE42815BDBF5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90E5BF38-70C1-460A-9177-CE42815BDBF5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90E5BF38-70C1-460A-9177-CE42815BDBF5}.Release|Any CPU.Build.0 = Release|Any CPU
{B7DD4A9C-2D24-4772-951E-86A665C59ADF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B7DD4A9C-2D24-4772-951E-86A665C59ADF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B7DD4A9C-2D24-4772-951E-86A665C59ADF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B7DD4A9C-2D24-4772-951E-86A665C59ADF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -150,6 +176,10 @@ Global
{B26DE6AC-A4D5-4427-8453-EE3514E4B513} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{B0C82F24-BDEC-4420-A02A-F74E2423D755} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{A96660DB-DAEB-4C57-8096-F236AC4FA927} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{56A9B885-6B7E-4BC6-AED0-ADC67C9A68DB} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{2D39F649-C512-4EE5-9DFA-7BD4D9E4F145} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{90E5BF38-70C1-460A-9177-CE42815BDBF5} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{B7DD4A9C-2D24-4772-951E-86A665C59ADF} = {BD234E2E-F51A-4B18-B8BE-8AF6D546BF87}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {8CBB7278-D093-448E-B3DE-B5991209A1AA}
Expand Down
1 change: 1 addition & 0 deletions NATS.Client.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASCII/@EntryIndexedValue">ASCII</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=CR/@EntryIndexedValue">CR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JS/@EntryIndexedValue">JS</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=LF/@EntryIndexedValue">LF</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HMSG/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=HPUB/@EntryIndexedValue">True</s:Boolean>
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.Core/NATS.Client.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@
<InternalsVisibleTo Include="$(AssemblyName).Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="$(AssemblyName).MemoryTests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NatsBenchmark, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
<InternalsVisibleTo Include="NATS.Client.JetStream, PublicKey=0024000004800000940000000602000000240000525341310004000001000100db7da1f2f89089327b47d26d69666fad20861f24e9acdb13965fb6c64dfee8da589b495df37a75e934ddbacb0752a42c40f3dbc79614eec9bb2a0b6741f9e2ad2876f95e74d54c23eef0063eb4efb1e7d824ee8a695b647c113c92834f04a3a83fb60f435814ddf5c4e5f66a168139c4c1b1a50a3e60c164d180e265b1f000cd" />
</ItemGroup>
</Project>
10 changes: 10 additions & 0 deletions src/NATS.Client.Core/NatsRequestExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public static class NatsRequestExtensions
throw new OperationCanceledException("Inbox subscription cancelled");
}

if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null })
{
throw sub.Exception;
}

return null;
}

Expand Down Expand Up @@ -125,6 +130,11 @@ public static class NatsRequestExtensions
throw new OperationCanceledException("Inbox subscription cancelled");
}

if (sub is { EndReason: NatsSubEndReason.Exception, Exception: not null })
{
throw sub.Exception;
}

return null;
}

Expand Down
60 changes: 50 additions & 10 deletions src/NATS.Client.Core/NatsSub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal enum NatsSubEndReason
IdleTimeout,
StartUpTimeout,
Cancelled,
Exception,
}

public abstract class NatsSubBase : INatsSub
Expand All @@ -30,6 +31,7 @@ public abstract class NatsSubBase : INatsSub
private bool _endSubscription;
private int _endReasonRaw;
private int _pendingMsgs;
private Exception? _exception = null;

internal NatsSubBase(
NatsConnection connection,
Expand Down Expand Up @@ -170,6 +172,14 @@ ValueTask INatsSub.ReceiveAsync(

protected abstract ValueTask ReceiveInternalAsync(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payloadBuffer);

public Exception? Exception => Volatile.Read(ref _exception);

protected void SetException(Exception exception)
{
Interlocked.Exchange(ref _exception, exception);
EndSubscription(NatsSubEndReason.Exception);
}

protected void ResetIdleTimeout()
{
_idleTimeoutTimer?.Change(dueTime: _idleTimeout, period: Timeout.InfiniteTimeSpan);
Expand Down Expand Up @@ -275,19 +285,49 @@ protected override async ValueTask ReceiveInternalAsync(string subject, string?
// deserialization exceptions. Currently only way for a user to find out is
// to check the logs created by the client. If the logger isn't hooked up
// they would be quietly ignored and the message would be lost either way.
var natsMsg = NatsMsg<T?>.Build(
subject,
replyTo,
headersBuffer,
payloadBuffer,
Connection,
Connection.HeaderParser,
Serializer);
try
{
var natsMsg = NatsMsg<T?>.Build(
subject,
replyTo,
headersBuffer,
payloadBuffer,
Connection,
Connection.HeaderParser,
Serializer);

await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false);

DecrementMaxMsgs();
}
catch (Exception e)
{
var payload = new Memory<byte>(new byte[payloadBuffer.Length]);
payloadBuffer.CopyTo(payload.Span);

await _msgs.Writer.WriteAsync(natsMsg).ConfigureAwait(false);
Memory<byte> headers = default;
if (headersBuffer != null)
{
headers = new Memory<byte>(new byte[headersBuffer.Value.Length]);
}

DecrementMaxMsgs();
SetException(new NatsSubException($"Message error: {e.Message}", e, payload, headers));
}
}

protected override void TryComplete() => _msgs.Writer.TryComplete();
}

public class NatsSubException : NatsException
{
public NatsSubException(string message, Exception exception, Memory<byte> payload, Memory<byte> headers)
: base(message, exception)
{
Payload = payload;
Headers = headers;
}

public Memory<byte> Payload { get; }

public Memory<byte> Headers { get; }
}
64 changes: 64 additions & 0 deletions src/NATS.Client.JetStream/JSContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.ComponentModel.DataAnnotations;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;

public class JSContext
{
private readonly NatsConnection _nats;
private readonly JSOptions _options;

public JSContext(NatsConnection nats, JSOptions options)
{
_nats = nats;
_options = options;
}

public async ValueTask<JSStream> CreateStream(Action<StreamCreateRequest> request)
{
var requestObj = new StreamCreateRequest();
request(requestObj);

Validator.ValidateObject(requestObj, new ValidationContext(requestObj));

var response =
await _nats.RequestAsync<StreamCreateRequest, StreamCreateResponse>(
$"{_options.Prefix}.STREAM.CREATE.{requestObj.Name}",
requestObj);

// TODO: Better error handling
if (response?.Data == null)
throw new NatsJetStreamException("No response received");

return new JSStream(response.Value.Data);
}
}

public class NatsJetStreamException : NatsException
{
public NatsJetStreamException(string message)
: base(message)
{
}

public NatsJetStreamException(string message, Exception exception)
: base(message, exception)
{
}
}

public record JSOptions
{
public string Prefix { get; init; } = "$JS.API";
}

public class JSStream
{
public JSStream(StreamCreateResponse response)
{
Response = response;
}

public StreamCreateResponse Response { get; }
}
9 changes: 9 additions & 0 deletions src/NATS.Client.JetStream/Models/AccountInfoResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace NATS.Client.JetStream.Models;

/// <summary>
/// A response from the JetStream $JS.API.INFO API
/// </summary>

public record AccountInfoResponse : AccountStats
{
}
66 changes: 66 additions & 0 deletions src/NATS.Client.JetStream/Models/AccountLimits.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
namespace NATS.Client.JetStream.Models;

public record AccountLimits
{
/// <summary>
/// The maximum amount of Memory storage Stream Messages may consume
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_memory")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int MaxMemory { get; set; } = default!;

/// <summary>
/// The maximum amount of File storage Stream Messages may consume
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_storage")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int MaxStorage { get; set; } = default!;

/// <summary>
/// The maximum number of Streams an account can create
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_streams")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int MaxStreams { get; set; } = default!;

/// <summary>
/// The maximum number of Consumer an account can create
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_consumers")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.Never)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int MaxConsumers { get; set; } = default!;

/// <summary>
/// Indicates if Streams created in this account requires the max_bytes property set
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_bytes_required")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool MaxBytesRequired { get; set; } = false;

/// <summary>
/// The maximum number of outstanding ACKs any consumer may configure
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("max_ack_pending")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public int MaxAckPending { get; set; } = default!;

/// <summary>
/// The maximum size any single memory stream may be
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("memory_max_stream_bytes")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int MemoryMaxStreamBytes { get; set; } = -1;

/// <summary>
/// The maximum size any single storage based stream may be
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("storage_max_stream_bytes")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
[System.ComponentModel.DataAnnotations.Range(-1, int.MaxValue)]
public int StorageMaxStreamBytes { get; set; } = -1;
}
15 changes: 15 additions & 0 deletions src/NATS.Client.JetStream/Models/AccountPurgeResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace NATS.Client.JetStream.Models;

/// <summary>
/// A response from the JetStream $JS.API.ACCOUNT.PURGE API
/// </summary>

public record AccountPurgeResponse
{
/// <summary>
/// If the purge operation was succesfully started
/// </summary>
[System.Text.Json.Serialization.JsonPropertyName("initiated")]
[System.Text.Json.Serialization.JsonIgnore(Condition = System.Text.Json.Serialization.JsonIgnoreCondition.WhenWritingDefault)]
public bool Initiated { get; set; } = false;
}
Loading

0 comments on commit 0aaccd7

Please sign in to comment.