Skip to content

Commit

Permalink
Changed subscribe API to async enumerable
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Nov 7, 2023
1 parent 342c3ea commit 7c12697
Show file tree
Hide file tree
Showing 32 changed files with 270 additions and 232 deletions.
15 changes: 7 additions & 8 deletions sandbox/BlazorWasm/Server/NatsServices/WeatherForecastService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ public class WeatherForecastService : IHostedService, IAsyncDisposable

private readonly ILogger<WeatherForecastService> _logger;
private readonly INatsConnection _natsConnection;
private INatsSub<object>? _replySubscription;
private Task? _replyTask;
private CancellationTokenSource? _cts;

public WeatherForecastService(ILogger<WeatherForecastService> logger, INatsConnection natsConnection)
{
_logger = logger;
_natsConnection = natsConnection;
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
_replySubscription = await _natsConnection.SubscribeAsync<object>("weather", cancellationToken: cancellationToken);
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_replyTask = Task.Run(
async () =>
{
await foreach (var msg in _replySubscription.Msgs.ReadAllAsync(cancellationToken))
await foreach (var msg in _natsConnection.SubscribeAsync<object>("weather", cancellationToken: cancellationToken))
{
var forecasts = Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Expand All @@ -40,21 +40,20 @@ public async Task StartAsync(CancellationToken cancellationToken)
},
cancellationToken);
_logger.LogInformation("Weather Forecast Services is running");
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Weather Forecast Services is stopping");
if (_replySubscription != null)
await _replySubscription.UnsubscribeAsync();
_cts?.Cancel();
if (_replyTask != null)
await _replyTask;
}

public async ValueTask DisposeAsync()
{
if (_replySubscription != null)
await _replySubscription.DisposeAsync();
_cts?.Cancel();
if (_replyTask != null)
await _replyTask;
}
Expand Down
14 changes: 5 additions & 9 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
};

// Server
var sub = await conn.SubscribeAsync<int>("foobar");
var cts = new CancellationTokenSource();
var replyTask = Task.Run(async () =>
{
await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in conn.SubscribeAsync<int>("foobar", cancellationToken: cts.Token))
{
await msg.ReplyAsync($"Hello {msg.Data}");
}
Expand All @@ -35,15 +35,13 @@
// Client(response: "Hello 100")
var response = await conn.RequestAsync<int, string>("foobar", 100);

await sub.UnsubscribeAsync();
cts.Cancel();
await replyTask;

// subscribe
var subscription = await conn.SubscribeAsync<Person>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
await foreach (var msg in conn.SubscribeAsync<Person>("foo"))
{
Console.WriteLine($"Received {msg.Data}");
}
Expand Down Expand Up @@ -84,11 +82,9 @@ public Runner(INatsConnection connection)
[RootCommand]
public async Task Run()
{
var subscription = await _connection.SubscribeAsync<string>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
await foreach (var msg in _connection.SubscribeAsync<string>("foo"))
{
Console.WriteLine("Yeah");
}
Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeHeaders/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<byte[]>(subject))
{
Print($"[RCV] {msg.Subject}: {Encoding.UTF8.GetString(msg.Data!)}\n");
if (msg.Headers != null)
Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeModel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<Bar>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<Bar>(subject))
{
Print($"[RCV] {msg.Subject}: {msg.Data}\n");
}
Expand Down
12 changes: 6 additions & 6 deletions sandbox/Example.Core.SubscribeQueueGroup/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
await using var connection1 = new NatsConnection(options);

Print($"[1][SUB] Subscribing to subject '{subject}'...\n");
var sub1 = await connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var cts1 = new CancellationTokenSource();
var task1 = Task.Run(async () =>
{
await foreach (var msg in sub1.Msgs.ReadAllAsync())
await foreach (var msg in connection1.SubscribeAsync<string>(subject, queueGroup: "My-Workers", cancellationToken: cts1.Token))
{
Print($"[1][RCV] {msg.Subject}: {msg.Data}\n");
}
Expand All @@ -26,10 +26,10 @@
await using var connection2 = new NatsConnection(options);

Print($"[2][SUB] Subscribing to subject '{subject}'...\n");
var sub2 = await connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers");
var cts2 = new CancellationTokenSource();
var task2 = Task.Run(async () =>
{
await foreach (var msg in sub2.Msgs.ReadAllAsync())
await foreach (var msg in connection2.SubscribeAsync<string>(subject, queueGroup: "My-Workers", cancellationToken: cts2.Token))
{
Print($"[2][RCV] {msg.Subject}: {msg.Data}\n");
}
Expand All @@ -40,10 +40,10 @@
// ---
// Clean-up
Print($"[1][SUB] Unsubscribing '{subject}'...\n");
await sub1.DisposeAsync();
cts1.Cancel();

Print($"[2][SUB] Unsubscribing '{subject}'...\n");
await sub2.DisposeAsync();
cts2.Cancel();

await Task.WhenAll(task1, task2);

Expand Down
4 changes: 1 addition & 3 deletions sandbox/Example.Core.SubscribeRaw/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

Print($"[SUB] Subscribing to subject '{subject}'...\n");

var sub = await connection.SubscribeAsync<byte[]>(subject);

await foreach (var msg in sub.Msgs.ReadAllAsync())
await foreach (var msg in connection.SubscribeAsync<byte[]>(subject))
{
var data = Encoding.UTF8.GetString(msg.Data!);
Print($"[RCV] {msg.Subject}: {data}\n");
Expand Down
113 changes: 75 additions & 38 deletions sandbox/Example.NativeAot/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,22 @@

await using var nats = new NatsConnection(natsOpts);

await using var sub = await nats.SubscribeAsync<string>(subject: "foo");
var sub = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<string>("foo"))
{
// Outputs 'Hello World'
Console.WriteLine(msg.Data);
break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();

await nats.PublishAsync<string>(subject: "foo", data: "Hello World");

var msg = await sub.Msgs.ReadAsync();

// Outputs 'Hello World'
Console.WriteLine(msg.Data);
await sub;
}

// custom JSON
Expand All @@ -30,17 +35,22 @@

await using var nats = new NatsConnection(natsOpts);

await using var sub = await nats.SubscribeAsync<MyData>(subject: "foo");
var sub = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<MyData>("foo"))
{
// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg.Data);
break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();

await nats.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" });

var msg = await sub.Msgs.ReadAsync();

// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg.Data);
await sub;
}

// custom JSON
Expand All @@ -49,17 +59,22 @@

var serializer = new NatsJsonContextSerializer<MyData>(MyJsonContext.Default);

await using var sub = await nats.SubscribeAsync<MyData>(subject: "foo", serializer: serializer);
var sub = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<MyData>("foo"))
{
// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg.Data);
break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();

await nats.PublishAsync<MyData>(subject: "foo", data: new MyData { Id = 1, Name = "bar" }, serializer: serializer);

var msg = await sub.Msgs.ReadAsync();

// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg.Data);
await sub;
}

// Protobuf
Expand All @@ -68,17 +83,22 @@

await using var nats = new NatsConnection(natsOpts);

await using var sub = await nats.SubscribeAsync<Greeting>(subject: "foo");
var sub = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<Greeting>("foo"))
{
// Outputs '{ "id": 42, "name": "Marvin" }'
Console.WriteLine(msg.Data);
break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();

await nats.PublishAsync(subject: "foo", data: new Greeting { Id = 42, Name = "Marvin" });

var msg = await sub.Msgs.ReadAsync();

// Outputs '{ "id": 42, "name": "Marvin" }'
Console.WriteLine(msg.Data);
await sub;
}

// Protobuf/JSON
Expand All @@ -88,23 +108,34 @@

await using var nats = new NatsConnection(natsOpts);

await using var sub1 = await nats.SubscribeAsync<Greeting>(subject: "greet");
await using var sub2 = await nats.SubscribeAsync<MyData>(subject: "data");
var sub1 = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<Greeting>("greet"))
{
// Outputs '{ "id": 42, "name": "Marvin" }'
Console.WriteLine(msg.Data);
break;
}
});

var sub2 = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<MyData>("data"))
{
// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg.Data);
break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();

await nats.PublishAsync(subject: "greet", data: new Greeting { Id = 42, Name = "Marvin" });
await nats.PublishAsync(subject: "data", data: new MyData { Id = 1, Name = "Bob" });

var msg1 = await sub1.Msgs.ReadAsync();
var msg2 = await sub2.Msgs.ReadAsync();

// Outputs '{ "id": 42, "name": "Marvin" }'
Console.WriteLine(msg1.Data);

// Outputs 'MyData { Id = 1, Name = bar }'
Console.WriteLine(msg2.Data);
await sub1;
await sub2;
}

// Binary
Expand All @@ -114,7 +145,19 @@

await using var nats = new NatsConnection(natsOpts);

await using var sub = await nats.SubscribeAsync<NatsMemoryOwner<byte>>(subject: "foo");
var sub = Task.Run(async () =>
{
await foreach (var msg in nats.SubscribeAsync<NatsMemoryOwner<byte>>(subject: "foo"))
{
using (var memoryOwner = msg.Data)
{
// Outputs 'Hi'
Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span));
}

break;
}
});

// Flush the the network buffers to make sure the subscription request has been processed.
await nats.PingAsync();
Expand All @@ -127,13 +170,7 @@

await nats.PublishAsync(subject: "foo", data: bw);

var msg = await sub.Msgs.ReadAsync();

using (var memoryOwner = msg.Data)
{
// Outputs 'Hi'
Console.WriteLine(Encoding.ASCII.GetString(memoryOwner.Memory.Span));
}
await sub;
}

public class MixedSerializerRegistry : INatsSerializerRegistry
Expand Down
8 changes: 4 additions & 4 deletions sandbox/MinimumWebApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@

var app = builder.Build();

app.MapGet("/subscribe", async (INatsConnection command) =>
app.MapGet("/subscribe", (INatsConnection command) =>
{
var subscription = await command.SubscribeAsync<int>("foo");

_ = Task.Run(async () =>
{
await foreach (var msg in subscription.Msgs.ReadAllAsync())
await foreach (var msg in command.SubscribeAsync<int>("foo"))
{
Console.WriteLine($"Received {msg.Data}");
}
});

return Task.CompletedTask;
});

app.MapGet("/publish", async (INatsConnection command) => await command.PublishAsync("foo", 99));
Expand Down
Loading

0 comments on commit 7c12697

Please sign in to comment.