Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client compatibility tests #159

Merged
merged 7 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions NATS.Client.sln
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.ObjectStore.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Example.ObjectStore", "sandbox\Example.ObjectStore\Example.ObjectStore.csproj", "{51882883-A66E-4F95-A1AB-CFCBF71B4376}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Nats.Client.Compat", "tests\Nats.Client.Compat\Nats.Client.Compat.csproj", "{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services", "src\NATS.Client.Services\NATS.Client.Services.csproj", "{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NATS.Client.Services.Tests", "tests\NATS.Client.Services.Tests\NATS.Client.Services.Tests.csproj", "{749CAE39-4C1E-4627-9E31-A36B987BC453}"
Expand Down Expand Up @@ -217,6 +219,10 @@ Global
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Debug|Any CPU.Build.0 = Debug|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.ActiveCfg = Release|Any CPU
{51882883-A66E-4F95-A1AB-CFCBF71B4376}.Release|Any CPU.Build.0 = Release|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Debug|Any CPU.Build.0 = Debug|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.ActiveCfg = Release|Any CPU
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024}.Release|Any CPU.Build.0 = Release|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
Expand Down Expand Up @@ -284,6 +290,7 @@ Global
{3F8840BA-4F91-4359-AA53-6B26823E7F55} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{BB2F4EEE-1AB3-43F7-B004-6C9B3D52353E} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{51882883-A66E-4F95-A1AB-CFCBF71B4376} = {95A69671-16CA-4133-981C-CC381B7AAA30}
{35578296-FF6C-4BA4-BEE5-C1A6E7DB7024} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{050C63EE-8F1C-4535-9C6C-E12E62A1FF1D} = {4827B3EC-73D8-436D-AE2A-5E29AC95FD0C}
{749CAE39-4C1E-4627-9E31-A36B987BC453} = {C526E8AB-739A-48D7-8FC4-048978C9B650}
{DD0AB72A-D6CD-4054-A9C9-0DCA3EDBA00F} = {95A69671-16CA-4133-981C-CC381B7AAA30}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal static class NatsObjJsonSerializer

[JsonSerializable(typeof(ObjectMetadata))]
[JsonSerializable(typeof(MetaDataOptions))]
[JsonSerializable(typeof(NatsObjLink))]
internal partial class NatsObjJsonSerializerContext : JsonSerializerContext
{
}
45 changes: 41 additions & 4 deletions src/NATS.Client.ObjectStore/Models/ObjectMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ public record ObjectMetadata
[JsonPropertyName("name")]
public string Name { get; set; } = default!;

/// <summary>
/// Object description
/// </summary>
[JsonPropertyName("description")]
public string Description { get; set; } = default!;

/// <summary>
/// Bucket name
/// </summary>
Expand Down Expand Up @@ -52,9 +58,16 @@ public record ObjectMetadata
/// <summary>
/// Object metadata
/// </summary>
[JsonPropertyName("meta")]
[JsonPropertyName("metadata")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public Dictionary<string, string> Metadata { get; set; } = default!;

/// <summary>
/// Object metadata
/// </summary>
[JsonPropertyName("headers")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public Dictionary<string, string> Meta { get; set; } = default!;
public Dictionary<string, string> Headers { get; set; } = default!;

/// <summary>
/// Object deleted
Expand All @@ -68,15 +81,39 @@ public record ObjectMetadata
/// </summary>
[JsonPropertyName("options")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public MetaDataOptions Options { get; set; } = default!;
public MetaDataOptions? Options { get; set; } = default!;
}

public record MetaDataOptions
{
/// <summary>
/// Link
/// </summary>
[JsonPropertyName("link")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public NatsObjLink? Link { get; set; } = default!;

/// <summary>
/// Max chunk size
/// </summary>
[JsonPropertyName("max_chunk_size")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public int MaxChunkSize { get; set; } = default!;
public int? MaxChunkSize { get; set; } = default!;
}

public record NatsObjLink
{
/// <summary>
/// Link name
/// </summary>
[JsonPropertyName("name")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Name { get; set; } = default!;

/// <summary>
/// Bucket name
/// </summary>
[JsonPropertyName("bucket")]
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
public string Bucket { get; set; } = default!;
}
2 changes: 1 addition & 1 deletion src/NATS.Client.ObjectStore/NatsObjConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public record NatsObjConfig(string Bucket)
/// <summary>
/// Type of backing storage to use.
/// </summary>
public NatsObjStorageType? Storage { get; init; }
public NatsObjStorageType Storage { get; init; }

/// <summary>
/// How many replicas to keep for each key.
Expand Down
10 changes: 9 additions & 1 deletion src/NATS.Client.ObjectStore/NatsObjContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,18 @@ public async ValueTask<NatsObjStore> CreateObjectStore(NatsObjConfig config, Can
AllowDirect = true,
Metadata = config.Metadata!,
Retention = StreamConfigurationRetention.limits,
Compression = StreamConfigurationCompression.none,
};

var stream = await _context.CreateStreamAsync(streamConfiguration, cancellationToken);
return new NatsObjStore(config, _context, stream);
return new NatsObjStore(config, this, _context, stream);
}

public async ValueTask<NatsObjStore> GetObjectStoreAsync(string bucket, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);
var stream = await _context.GetStreamAsync($"OBJ_{bucket}", cancellationToken: cancellationToken);
return new NatsObjStore(new NatsObjConfig(bucket), this, _context, stream);
}

/// <summary>
Expand Down
160 changes: 150 additions & 10 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text.Json;
using System.Text.RegularExpressions;
using NATS.Client.Core;
using NATS.Client.Core.Internal;
Expand All @@ -21,15 +23,18 @@ public class NatsObjStore
private const string RollupSubject = "sub";

private static readonly NatsHeaders NatsRollupHeaders = new() { { NatsRollup, RollupSubject } };

private static readonly Regex ValidObjectRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);

private readonly string _bucket;
private readonly NatsObjContext _objContext;
private readonly NatsJSContext _context;
private readonly NatsJSStream _stream;

internal NatsObjStore(NatsObjConfig config, NatsJSContext context, NatsJSStream stream)
internal NatsObjStore(NatsObjConfig config, NatsObjContext objContext, NatsJSContext context, NatsJSStream stream)
{
_bucket = config.Bucket;
_objContext = objContext;
_context = context;
_stream = stream;
}
Expand Down Expand Up @@ -62,11 +67,20 @@ public async ValueTask<ObjectMetadata> GetAsync(string key, Stream stream, bool

var info = await GetInfoAsync(key, cancellationToken: cancellationToken);

if (info.Options?.Link is { } link)
{
var store = await _objContext.GetObjectStoreAsync(link.Bucket, cancellationToken).ConfigureAwait(false);
return await store.GetAsync(link.Name, stream, leaveOpen, cancellationToken).ConfigureAwait(false);
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<IMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
GetChunkSubject(info.Nuid),
new NatsJSOrderedPushConsumerOpts { DeliverPolicy = ConsumerConfigurationDeliverPolicy.all },
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = ConsumerConfigurationDeliverPolicy.all,
},
new NatsSubOpts(),
cancellationToken);

Expand Down Expand Up @@ -180,19 +194,19 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
meta.Nuid = nuid;
meta.MTime = DateTimeOffset.UtcNow;

if (meta.Options == null!)
meta.Options ??= new MetaDataOptions
{
meta.Options = new MetaDataOptions { MaxChunkSize = DefaultChunkSize };
}
MaxChunkSize = DefaultChunkSize,
};

if (meta.Options.MaxChunkSize == 0)
if (meta.Options.MaxChunkSize is null or <= 0)
{
meta.Options.MaxChunkSize = DefaultChunkSize;
}

var size = 0;
var chunks = 0;
var chunkSize = meta.Options.MaxChunkSize;
var chunkSize = meta.Options.MaxChunkSize!.Value;

string digest;
using (var sha256 = SHA256.Create())
Expand Down Expand Up @@ -266,7 +280,10 @@ public async ValueTask<ObjectMetadata> PutAsync(ObjectMetadata meta, Stream stre
{
await _context.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
subject: $"{_context.Opts.Prefix}.STREAM.PURGE.OBJ_{_bucket}",
request: new StreamPurgeRequest { Filter = GetChunkSubject(info.Nuid) },
request: new StreamPurgeRequest
{
Filter = GetChunkSubject(info.Nuid),
},
cancellationToken);
}
catch (NatsJSApiException e)
Expand All @@ -279,6 +296,82 @@ await _context.JSRequestResponseAsync<StreamPurgeRequest, StreamPurgeResponse>(
return meta;
}

public async ValueTask<ObjectMetadata> UpdateMetaAsync(string key, ObjectMetadata meta, CancellationToken cancellationToken = default)
{
ValidateObjectName(meta.Name);

var info = await GetInfoAsync(key, cancellationToken: cancellationToken).ConfigureAwait(false);

if (key != meta.Name)
{
// Make sure the new name is available
try
{
await GetInfoAsync(meta.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
throw new NatsObjException($"Object already exists: {meta.Name}");
}
catch (NatsObjNotFoundException)
{
}
}

info.Name = meta.Name;
info.Description = meta.Description;
info.Metadata = meta.Metadata;
info.Headers = meta.Headers;

await PublishMeta(info, cancellationToken);

return info;
}

public async ValueTask<ObjectMetadata> AddLinkAsync(string link, ObjectMetadata target, CancellationToken cancellationToken = default)
{
ValidateObjectName(link);
ValidateObjectName(target.Name);

if (target.Deleted)
{
throw new NatsObjException("Can't link to a deleted object");
}

if (target.Options?.Link is not null)
{
throw new NatsObjException("Can't link to a linked object");
}

try
{
var checkLink = await GetInfoAsync(link, showDeleted: true, cancellationToken: cancellationToken).ConfigureAwait(false);
if (checkLink.Options?.Link is null)
{
throw new NatsObjException("Object already exists");
}
}
catch (NatsObjNotFoundException)
{
}

var info = new ObjectMetadata
{
Name = link,
Bucket = _bucket,
Nuid = NewNuid(),
Options = new MetaDataOptions
{
Link = new NatsObjLink
{
Name = target.Name,
Bucket = target.Bucket,
},
},
};

await PublishMeta(info, cancellationToken);

return info;
}

/// <summary>
/// Get object metadata by key.
/// </summary>
Expand All @@ -291,12 +384,16 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
{
ValidateObjectName(key);

var request = new StreamMsgGetRequest { LastBySubj = GetMetaSubject(key) };
var request = new StreamMsgGetRequest
{
LastBySubj = GetMetaSubject(key),
};
try
{
var response = await _stream.GetAsync(request, cancellationToken);

var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(Convert.FromBase64String(response.Message.Data))) ?? throw new NatsObjException("Can't deserialize object metadata");
var base64String = Convert.FromBase64String(response.Message.Data);
var data = NatsObjJsonSerializer.Default.Deserialize<ObjectMetadata>(new ReadOnlySequence<byte>(base64String)) ?? throw new NatsObjException("Can't deserialize object metadata");

if (!showDeleted && data.Deleted)
{
Expand All @@ -316,6 +413,44 @@ public async ValueTask<ObjectMetadata> GetInfoAsync(string key, bool showDeleted
}
}

public async IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts = default, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
opts ??= new NatsObjWatchOpts();

var deliverPolicy = ConsumerConfigurationDeliverPolicy.all;
if (opts.UpdatesOnly)
{
deliverPolicy = ConsumerConfigurationDeliverPolicy.@new;
}

await using var pushConsumer = new NatsJSOrderedPushConsumer<NatsMemoryOwner<byte>>(
_context,
$"OBJ_{_bucket}",
$"$O.{_bucket}.M.>",
new NatsJSOrderedPushConsumerOpts
{
DeliverPolicy = deliverPolicy,
},
new NatsSubOpts(),
cancellationToken);

pushConsumer.Init();

await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken))
{
if (pushConsumer.IsDone)
continue;
using (msg.Data)
{
var info = JsonSerializer.Deserialize(msg.Data.Memory.Span, NatsObjJsonSerializerContext.Default.ObjectMetadata);
if (info != null)
{
yield return info;
}
}
}
}

/// <summary>
/// Delete an object by key.
/// </summary>
Expand Down Expand Up @@ -382,3 +517,8 @@ private void ValidateObjectName(string name)
}
}
}

public record NatsObjWatchOpts
{
public bool UpdatesOnly { get; init; }
}
2 changes: 1 addition & 1 deletion src/NATS.Client.Services/NatsSvcConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ public NatsSvcConfig(string name, string version)
/// Stats handler. JSON object returned by this handler will be included in
/// the service stats <c>data</c> property.
/// </summary>
public Func<JsonNode>? StatsHandler { get; init; }
public Func<INatsSvcEndpoint, JsonNode>? StatsHandler { get; init; }
}
Loading