Skip to content

Commit

Permalink
refactoring to use rawSocket and pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed Mar 22, 2022
1 parent 291e0a8 commit 7a7662d
Show file tree
Hide file tree
Showing 26 changed files with 902 additions and 341 deletions.
17 changes: 17 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
root = true

[*]
insert_final_newline = true
indent_style = space
indent_size = 4
trim_trailing_whitespace = true

[*{_AssemblyInfo.cs,.notsupported.cs}]
generated_code = true

# C# files
[*.cs]
charset = utf-8-bom

csharp_style_namespace_declarations = file_scoped
dotnet_style_require_accessibility_modifiers = never
2 changes: 2 additions & 0 deletions sandbox/ConsoleApp/ConsoleApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

<ItemGroup>
<PackageReference Include="MessagePack" Version="2.3.85" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="ZLogger" Version="1.6.1" />
</ItemGroup>

<ItemGroup>
Expand Down
47 changes: 21 additions & 26 deletions sandbox/ConsoleApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,34 @@

using AlterNats;
using MessagePack;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Buffers;
using System.IO.Pipelines;
using System.Reflection;
using System.Text;
using System.Threading.Tasks.Sources;
using ZLogger;

var provider = new ServiceCollection()
.AddLogging(x =>
{
x.ClearProviders();
x.SetMinimumLevel(LogLevel.Trace);
x.AddZLoggerConsole();
})
.BuildServiceProvider();

var loggerFactory = provider.GetRequiredService<ILoggerFactory>();

var conn = new NatsConnection(NatsOptions.Default with { LoggerFactory = loggerFactory });


for (int i = 0; i < 10000; i++)
{
conn.Ping();
}

await using var conn = new NatsConnection();

//Console.WriteLine("GO PING");
var t = Foo();
//var b = conn.PingAsync();
//var c = conn.PingAsync();
//var d = conn.PingAsync();
//var e = conn.PingAsync();
//// await a;
//await b;
//await c;
//await d;
//await e;
//Console.WriteLine("END PING");


Console.ReadLine();

await t;

Console.WriteLine("READ STOP");
Console.ReadLine();


async Task Foo()
{
await conn.PingAsync();
throw new Exception("YEAH?");
}
21 changes: 12 additions & 9 deletions src/AlterNats/AlterNats.csproj
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.2" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.2" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="$(AssemblyName).Tests" />
</ItemGroup>
</Project>
4 changes: 3 additions & 1 deletion src/AlterNats/Commands/CommandBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ internal abstract class CommandBase<TSelf> : ICommand, IObjectPoolNode<TSelf>
TSelf? nextNode;
public ref TSelf? NextNode => ref nextNode;

public abstract string WriteTraceMessage { get; }

public virtual void Return()
{
pool.TryPush(Unsafe.As<TSelf>(this));
}

public abstract void Write(ILogger logger, ProtocolWriter writer);
public abstract void Write(ProtocolWriter writer);
}
55 changes: 55 additions & 0 deletions src/AlterNats/Commands/FixedArrayBufferWriter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System.Buffers;
using System.Runtime.CompilerServices;

namespace AlterNats.Commands;

internal sealed class FixedArrayBufferWriter : IBufferWriter<byte>
{
byte[] buffer;
int written;

public ReadOnlyMemory<byte> WrittenMemory => buffer.AsMemory(0, written);

public FixedArrayBufferWriter()
{
buffer = new byte[65535];
written = 0;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Reset()
{
written = 0;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Advance(int count)
{
written += count;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Memory<byte> GetMemory(int sizeHint = 0)
{
if (buffer.Length - written < sizeHint)
{
Resize(sizeHint);
}
return buffer.AsMemory(written);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Span<byte> GetSpan(int sizeHint = 0)
{
if (buffer.Length - written < sizeHint)
{
Resize(sizeHint);
}
return buffer.AsSpan(written);
}

void Resize(int sizeHint)
{
Array.Resize(ref buffer, Math.Max(sizeHint, buffer.Length * 2));
}
}
3 changes: 2 additions & 1 deletion src/AlterNats/Commands/ICommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ namespace AlterNats.Commands;
internal interface ICommand
{
void Return();
void Write(ILogger logger, ProtocolWriter writer);
void Write(ProtocolWriter writer);
string WriteTraceMessage { get; }
}
30 changes: 27 additions & 3 deletions src/AlterNats/Commands/PingCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public static PingCommand Create(List<PingCommand> queue)
return result;
}

public override void Write(ILogger logger, ProtocolWriter writer)
{
logger.LogTrace("Write PONG Command to buffer.");
public override string WriteTraceMessage => "Write PING Command to buffer.";

public override void Write(ProtocolWriter writer)
{
if (queue != null)
{
lock (queue)
Expand Down Expand Up @@ -114,3 +114,27 @@ void IThreadPoolWorkItem.Execute()
core.SetResult(null!);
}
}


internal sealed class LightPingCommand : CommandBase<LightPingCommand>
{
LightPingCommand()
{
}

public static LightPingCommand Create()
{
if (!pool.TryPop(out var result))
{
result = new LightPingCommand();
}
return result;
}

public override string WriteTraceMessage => "Write PING Command to buffer.";

public override void Write(ProtocolWriter writer)
{
writer.WritePing();
}
}
5 changes: 3 additions & 2 deletions src/AlterNats/Commands/PongCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ public static PongCommand Create()
return result;
}

public override void Write(ILogger logger, ProtocolWriter writer)
public override string WriteTraceMessage => "Write PONG Command to buffer.";

public override void Write(ProtocolWriter writer)
{
logger.LogTrace("Write PONG Command to buffer.");
writer.WritePong();
}
}
8 changes: 3 additions & 5 deletions src/AlterNats/Commands/ProtocolWriter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System.Buffers.Binary;
using System.Buffers.Text;
using System.IO.Pipelines;
using System.Buffers.Text;
using System.Text;
using System.Text.Json;

Expand All @@ -11,9 +9,9 @@ internal sealed class ProtocolWriter
const int MaxIntStringLength = 10; // int.MaxValue.ToString().Length
const int NewLineLength = 2; // \r\n

readonly PipeWriter writer;
readonly FixedArrayBufferWriter writer; // where T : IBufferWriter<byte>

public ProtocolWriter(PipeWriter writer)
public ProtocolWriter(FixedArrayBufferWriter writer)
{
this.writer = writer;
}
Expand Down
6 changes: 4 additions & 2 deletions src/AlterNats/Commands/SubscribeCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ public static SubscribeCommand Create(int subscriptionId, NatsKey subject)
return result;
}

public override void Write(ILogger logger, ProtocolWriter writer)
public override string WriteTraceMessage => throw new NotImplementedException();

public override void Write(ProtocolWriter writer)
{
logger.LogTrace("Write SUB Command to buffer.");
// logger.LogTrace("Write SUB Command to buffer.");
writer.WriteSubscribe(subscriptionId, subject!);
}

Expand Down
63 changes: 63 additions & 0 deletions src/AlterNats/Internal/AppendableSequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System.Buffers;

namespace AlterNats.Internal;

// TODO: for reuse, add reset.
internal sealed class AppendableSequence
{
Segment? first;
Segment? last;
int length;

public ReadOnlySequence<byte> AsReadOnlySequence() => new ReadOnlySequence<byte>(first!, 0, last!, last!.Memory.Length);

public int Count => length;

public AppendableSequence()
{
}

public void Slice(long start)
{
// TODO:impl slice
throw new NotImplementedException();
}

public void Append(ReadOnlyMemory<byte> buffer)
{
if (length == 0)
{
var first = new Segment(buffer);
this.first = first;
last = first;
length = buffer.Length;
}
else
{
var newLast = new Segment(buffer);
last!.SetNextSegment(newLast);
newLast.SetRunningIndex(length);
last = newLast;
length += buffer.Length;
}
}

// TODO:Segment should uses ObjectPool
internal class Segment : ReadOnlySequenceSegment<byte>
{
public Segment(ReadOnlyMemory<byte> buffer)
{
Memory = buffer;
}

internal void SetRunningIndex(long runningIndex)
{
this.RunningIndex = runningIndex;
}

internal void SetNextSegment(Segment? nextSegment)
{
this.Next = nextSegment;
}
}
}
Loading

0 comments on commit 7a7662d

Please sign in to comment.