Skip to content

Commit

Permalink
CavernPipe beta implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
VoidXH committed Jan 15, 2025
1 parent 4200ea4 commit 20a1948
Show file tree
Hide file tree
Showing 14 changed files with 358 additions and 23 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ CavernSamples/CavernizeGUI/bin/
CavernSamples/CavernizeGUI/obj/
CavernSamples/CavernizeLive/bin/
CavernSamples/CavernizeLive/obj/
CavernSamples/CavernPipeClient/bin/
CavernSamples/CavernPipeClient/obj/
CavernSamples/CavernPipeServer/bin/
CavernSamples/CavernPipeServer/obj/
CavernSamples/Deconvolver/bin/
Expand Down
12 changes: 12 additions & 0 deletions Cavern.Format/Common/_Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,18 @@ public UnsupportedCodecException(bool needAudio, Codec codec) :
base(string.Format(message, needAudio ? audio : video, codec)) { }
}

/// <summary>
/// Tells if the container can't be determined by file type.
/// </summary>
public class UnsupportedContainerForWriteException : Exception {
const string message = "The {0} container is not supported for writing.";

/// <summary>
/// Tells if the container can't be determined by file type.
/// </summary>
public UnsupportedContainerForWriteException(string fileType) : base(string.Format(message, fileType)) { }
}

/// <summary>
/// Tells if a required feature in the codec is unsupported.
/// </summary>
Expand Down
11 changes: 8 additions & 3 deletions Cavern.Format/Input/EnhancedAC3Reader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,24 @@ public class EnhancedAC3Reader : AudioReader, IMetadataSupplier {
/// Enhanced AC-3 file reader.
/// </summary>
/// <param name="reader">File reader object</param>
public EnhancedAC3Reader(Stream reader) : base(reader) => fileSize = reader.Length;
public EnhancedAC3Reader(Stream reader) : base(reader) => fileSize = GetFileSize(reader);

/// <summary>
/// Enhanced AC-3 file reader.
/// </summary>
/// <param name="path">Input file name</param>
public EnhancedAC3Reader(string path) : base(path) => fileSize = reader.Length;
public EnhancedAC3Reader(string path) : base(path) => fileSize = GetFileSize(reader);

/// <summary>
/// Parse the stream/file length if possible for length calculation and seek support.
/// </summary>
static long GetFileSize(Stream reader) => reader.CanSeek ? reader.Length : -1;

/// <summary>
/// Read the file header.
/// </summary>
public override void ReadHeader() {
decoder = new EnhancedAC3Decoder(BlockBuffer<byte>.Create(reader, FormatConsts.blockSize), fileSize);
decoder = new EnhancedAC3Decoder(BlockBuffer<byte>.CreateForConstantPacketSize(reader, FormatConsts.blockSize), fileSize);
ChannelCount = decoder.ChannelCount;
Length = decoder.Length;
SampleRate = decoder.SampleRate;
Expand Down
2 changes: 1 addition & 1 deletion Cavern.Format/Output/AudioWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static AudioWriter Create(string path, int channelCount, long length, int
path[^3..] switch {
"laf" => new LimitlessAudioFormatWriter(path, length, sampleRate, bits, Listener.Channels),
"wav" => new RIFFWaveWriter(path, channelCount, length, sampleRate, bits),
_ => null,
_ => throw new UnsupportedContainerForWriteException(path[^3..])
};

/// <summary>
Expand Down
17 changes: 17 additions & 0 deletions Cavern.Format/Utilities/BlockBuffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.IO;

using Cavern.Format.Common;
using Cavern.Format.Consts;

namespace Cavern.Format.Utilities {
/// <summary>
Expand Down Expand Up @@ -79,6 +80,22 @@ public static BlockBuffer<byte> Create(Stream reader, int blockSize) {
return new BlockBuffer<byte>(() => reader.ReadBytes(blockSizeCopy), pos => readerCopy.Position = pos);
}

/// <summary>
/// For network streams where there is a constant packet size before the client requires a reply, use that packet size, if it's
/// smaller than the <paramref name="maxBlockSize"/>.
/// </summary>
public static BlockBuffer<byte> CreateForConstantPacketSize(Stream reader, int maxBlockSize) {
int blockSize = maxBlockSize;
try {
if (reader.Length < blockSize) {
blockSize = (int)reader.Length;
}
} catch {
// Stream doesn't support the Length property
}
return Create(reader, blockSize);
}

/// <summary>
/// Flush the current cache and read a new block. This should be called when a stream position changes.
/// </summary>
Expand Down
83 changes: 83 additions & 0 deletions Cavern.Format/Utilities/QueueStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using System;
using System.IO;
using System.Collections.Generic;

namespace Cavern.Format.Utilities {
/// <summary>
/// A thread-safe FIFO <see cref="MemoryStream"/>.
/// </summary>
public class QueueStream : Stream {
/// <summary>
/// The underlying FIFO.
/// </summary>
Queue<byte> queue = new Queue<byte>();

/// <inheritdoc/>
public override bool CanRead {
get {
lock (queue) {
return queue.Count != 0;
}
}
}

/// <inheritdoc/>
public override bool CanSeek => false;

/// <inheritdoc/>
public override bool CanWrite => true;

/// <inheritdoc/>
public override long Length {
get {
lock (queue) {
return queue.Count;
}
}
}

/// <inheritdoc/>
public override long Position {
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}

/// <inheritdoc/>
public override void Flush() { }

/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count) {
int read = 0;
while (read < count) {
lock (queue) {
int readUntil = Math.Min(read + queue.Count, count);
for (; read < readUntil; read++) {
buffer[offset + read] = queue.Dequeue();
}
}
}
return count;
}

/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count) {
lock (queue) {
for (int i = 0; i < count; i++) {
queue.Enqueue(buffer[offset + i]);
}
}
}

/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

/// <inheritdoc/>
public override void SetLength(long value) => throw new NotSupportedException();

/// <inheritdoc/>
protected override void Dispose(bool disposing) {
queue = null;
base.Dispose(disposing);
}
}
}
26 changes: 26 additions & 0 deletions Cavern.Format/Utilities/StreamExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ public static void Read(this Stream reader, byte[] buffer, long start, long leng
}
}

/// <summary>
/// Read more than 2 GB into a buffer, wait until all bytes are available.
/// </summary>
public static void ReadAll(this Stream reader, byte[] buffer, long start, long length) {
long position = start;
length += start;
while (position != length) {
int step = (int)Math.Min(length - position, int.MaxValue);
position += reader.Read(buffer, 0, step);
}
}

/// <summary>
/// Read a fixed-length ASCII string from the stream.
/// </summary>
Expand All @@ -44,13 +56,27 @@ public static string ReadASCII(this Stream reader, int length) {
/// <summary>
/// Read a number of bytes from the stream.
/// </summary>
/// <remarks>Doesn't wait until all bytes are available. For that case, use <see cref="ReadAllBytes(Stream, int)"/> instead.</remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static byte[] ReadBytes(this Stream reader, int length) {
byte[] bytes = new byte[length];
reader.Read(bytes);
return bytes;
}

/// <summary>
/// Read a number of bytes from the stream, wait until all bytes are available.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static byte[] ReadAllBytes(this Stream reader, int length) {
byte[] bytes = new byte[length];
int totalRead = 0;
while (totalRead < length) {
totalRead += reader.Read(bytes, totalRead, length - totalRead);
}
return bytes;
}

/// <summary>
/// Read a number of bytes from the stream.
/// </summary>
Expand Down
20 changes: 20 additions & 0 deletions CavernSamples/CavernPipeClient/CavernPipeClient.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>disable</Nullable>
<Authors>VoidX</Authors>
<Product>CavernPipe Client</Product>
<Description>Reference implementation of the CavernPipe protocol with rendering content according to Cavern settings on the network.</Description>
<Copyright>Copyright © Bence Sgánetz 2016-2025</Copyright>
<PackageProjectUrl>https://cavern.sbence.hu/</PackageProjectUrl>
<RepositoryUrl>https://github.com/VoidXH/Cavern/</RepositoryUrl>
<RepositoryType>git</RepositoryType>
<NeutralLanguage>en</NeutralLanguage>
<ApplicationIcon>..\Icon.ico</ApplicationIcon>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\Cavern.Format\Cavern.Format.csproj" />
</ItemGroup>
</Project>
50 changes: 50 additions & 0 deletions CavernSamples/CavernPipeClient/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.IO.Pipes;

using Cavern;
using Cavern.Format;
using Cavern.Format.Utilities;

if (args.Length < 2) {
Console.WriteLine("Usage: CavernPipeClient.exe <input file name> <output file name>");
return;
}

Listener listener = new();
AudioReader source = AudioReader.Open(args[0]);
source.ReadHeader();
using AudioWriter target = AudioWriter.Create(args[1], Listener.Channels.Length, source.Length, source.SampleRate, BitDepth.Float32);
source.Dispose();
target.WriteHeader();

// Connection
using NamedPipeClientStream pipe = new("CavernPipe");
pipe.Connect();
byte[] pipeHeader = new byte[8]; // Assembled CavernPipe control header
pipeHeader[0] = (byte)target.Bits;
pipeHeader[1] = 6; // Mandatory frames
BitConverter.GetBytes((ushort)target.ChannelCount).CopyTo(pipeHeader, 2);
BitConverter.GetBytes(listener.UpdateRate).CopyTo(pipeHeader, 4);
pipe.Write(pipeHeader, 0, pipeHeader.Length);

// Sending the file or part to the pipe
using FileStream reader = File.OpenRead(args[0]);
long sent = 0;
float[] writeBuffer = [];
byte[] sendBuffer = new byte[1024 * 1024],
receiveBuffer = [];
while (sent < reader.Length) {
int toSend = reader.Read(sendBuffer, 0, sendBuffer.Length);
pipe.Write(BitConverter.GetBytes(toSend));
pipe.Write(sendBuffer, 0, toSend);
sent += toSend;

// If there is incoming data, write it to file
int toReceive = pipe.ReadInt32();
if (receiveBuffer.Length < toReceive) {
receiveBuffer = new byte[toReceive];
writeBuffer = new float[toReceive / sizeof(float)];
}
pipe.ReadAll(receiveBuffer, 0, toReceive);
Buffer.BlockCopy(receiveBuffer, 0, writeBuffer, 0, toReceive);
target.WriteBlock(writeBuffer, 0, toReceive / sizeof(float));
}
48 changes: 48 additions & 0 deletions CavernSamples/CavernPipeServer/CavernPipeProtocol.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System.IO;

using Cavern.Format;
using Cavern.Format.Common;
using Cavern.Format.Utilities;

namespace CavernPipeServer {
/// <summary>
/// Reads CavernPipe control messages.
/// </summary>
public class CavernPipeProtocol {
/// <summary>
/// The PCM format in which the connected client expects the data.
/// </summary>
public BitDepth OutputFormat { get; }

/// <summary>
/// Calculated from byte 2 (number of frames to always render before sending a reply), this is the number of bytes in those frames.
/// If this many bytes are not available, the client must wait for data.
/// </summary>
public int MandatoryBytesToSend { get; }

/// <summary>
/// Number of output channels of the client. If Cavern renders less according to user settings, additional channels are filled with silence.
/// If Cavern renders more, excess channels will be cut off and a warning shall be shown.
/// </summary>
public int OutputChannels { get; }

/// <summary>
/// Number of samples expected in a reply PCM stream.
/// </summary>
public int UpdateRate { get; }

/// <summary>
/// Reads CavernPipe control messages.
/// </summary>
public CavernPipeProtocol(Stream source) {
OutputFormat = (BitDepth)source.ReadByte();
int mandatoryFrames = source.ReadByte();
OutputChannels = source.ReadUInt16();
UpdateRate = source.ReadInt32();
if (UpdateRate <= 0) {
throw new SyncException(); // Extension point for later
}
MandatoryBytesToSend = mandatoryFrames * OutputChannels * UpdateRate * ((int)OutputFormat / 8);
}
}
}
Loading

0 comments on commit 20a1948

Please sign in to comment.