diff --git a/src/Playground/Test1.cs b/src/Playground/Test1.cs index 5b54b7a..7be4e0d 100644 --- a/src/Playground/Test1.cs +++ b/src/Playground/Test1.cs @@ -108,7 +108,7 @@ public static void TestReverseIterator( { x += " ooops!"; return true; - }, out _); + }); } maintainer.WaitForBackgroundThreads(); } diff --git a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs index 1382481..156f681 100644 --- a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs +++ b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs @@ -49,7 +49,9 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode) ++y; return true; }, - out _ + (in int _, long _, OperationResult result) => + { + } ); Interlocked.Increment(ref off); } @@ -117,7 +119,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode) { ++y; return true; - }, out _); + }); Interlocked.Increment(ref off); } @@ -183,7 +185,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode) { ++y; return true; - }, out _); + }); Interlocked.Increment(ref off); } diff --git a/src/ZoneTree.UnitTests/ReplicatorTests.cs b/src/ZoneTree.UnitTests/ReplicatorTests.cs new file mode 100644 index 0000000..8ae1fd4 --- /dev/null +++ b/src/ZoneTree.UnitTests/ReplicatorTests.cs @@ -0,0 +1,78 @@ +using Tenray.ZoneTree.Core; + +namespace Tenray.ZoneTree.UnitTests; + +public sealed class ReplicatorTests +{ + [Test] + public void TestReplicator() + { + var dataPath = "data/TestReplicator"; + if (Directory.Exists(dataPath)) + Directory.Delete(dataPath, true); + var recordCount = 50_000; + var keyCount = 15_000; + var maxMemory = 10_000; + void CreateData() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/source") + .SetMutableSegmentMaxItemCount(maxMemory) + .OpenOrCreate(); + + using var replica = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/replica") + .SetMutableSegmentMaxItemCount(maxMemory) + .OpenOrCreate(); + + using var replicator = new Replicator(replica, dataPath + "/replica-op-index"); + using var maintainer1 = zoneTree.CreateMaintainer(); + using var maintainer2 = replica.CreateMaintainer(); + var random = new Random(); + int replicated = 0; + Parallel.For(0, recordCount, (i) => + { + var key = i % keyCount; + var value = random.Next(); + var opIndex = zoneTree.Upsert(key, value); + Task.Run(() => + { + replicator.OnUpsert(key, value, opIndex); + Interlocked.Increment(ref replicated); + }); + }); + while (replicated < recordCount) Task.Delay(500).Wait(); + maintainer1.EvictToDisk(); + maintainer2.EvictToDisk(); + maintainer1.WaitForBackgroundThreads(); + maintainer2.WaitForBackgroundThreads(); + } + + void TestEqual() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/source") + .Open(); + + using var replica = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/replica") + .Open(); + + using var iterator1 = zoneTree.CreateIterator(); + using var iterator2 = replica.CreateIterator(); + while (true) + { + var n1 = iterator1.Next(); + var n2 = iterator2.Next(); + Assert.That(n2, Is.EqualTo(n1)); + if (!n1) break; + Assert.That(iterator2.Current, Is.EqualTo(iterator1.Current)); + } + zoneTree.Maintenance.Drop(); + replica.Maintenance.Drop(); + } + + CreateData(); + TestEqual(); + } +} diff --git a/src/ZoneTree.UnitTests/StringTreeTests.cs b/src/ZoneTree.UnitTests/StringTreeTests.cs index 030498a..0a8a505 100644 --- a/src/ZoneTree.UnitTests/StringTreeTests.cs +++ b/src/ZoneTree.UnitTests/StringTreeTests.cs @@ -114,7 +114,7 @@ public void HelloWorldTest2() { x += "b"; return true; - }, out _); + }); zoneTree.TryGet(39, out value); Assert.That(value, Is.EqualTo("Hello Zone Tree!b")); } diff --git a/src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs b/src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs index c430e4b..0fd2ec4 100644 --- a/src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs +++ b/src/ZoneTree/Collections/BTree/BTree.NodeIterator.cs @@ -12,8 +12,6 @@ public sealed class NodeIterator public TValue[] Values { get; } - public long[] OpIndexes { get; } - public TKey CurrentKey => Keys[CurrentIndex]; public TValue CurrentValue => Values[CurrentIndex]; @@ -28,14 +26,12 @@ public NodeIterator( BTree tree, LeafNode leafNode, TKey[] keys, - TValue[] values, - long[] opIndexes = null) + TValue[] values) { Tree = tree; Node = leafNode; Keys = keys; Values = values; - OpIndexes = opIndexes; } public NodeIterator GetPreviousNodeIterator() diff --git a/src/ZoneTree/Collections/BTree/BTree.Write.OpIndex.cs b/src/ZoneTree/Collections/BTree/BTree.Write.OpIndex.cs new file mode 100644 index 0000000..9973a1b --- /dev/null +++ b/src/ZoneTree/Collections/BTree/BTree.Write.OpIndex.cs @@ -0,0 +1,101 @@ +using Tenray.ZoneTree.Exceptions; + +namespace Tenray.ZoneTree.Collections.BTree; + +public delegate TValue GetValueDelegate(long opIndex); + +/// +/// In memory B+Tree. +/// This class is thread-safe. +/// +/// Key Type +/// Value Type +public sealed partial class BTree +{ + public bool Upsert(in TKey key, GetValueDelegate valueGetter, out TValue value, out long opIndex) + { + if (IsReadOnly) + throw new BTreeIsReadOnlyException(); + try + { + WriteLock(); + while (true) + { + var root = Root; + root.WriteLock(); + if (root != Root) + { + root.WriteUnlock(); + continue; + } + + if (!root.IsFull) + { + return UpsertNonFull(root, in key, valueGetter, out value, out opIndex); + } + var newRoot = new Node(GetNodeLocker(), NodeSize); + newRoot.Children[0] = root; + newRoot.WriteLock(); + TrySplitChild(newRoot, 0, root); + var result = UpsertNonFull(newRoot, in key, valueGetter, out value, out opIndex); + Root = newRoot; + root.WriteUnlock(); + return result; + } + } + catch (Exception) + { + Root.WriteUnlock(); + throw; + } + finally + { + WriteUnlock(); + } + } + + bool UpsertNonFull(Node node, in TKey key, GetValueDelegate valueGetter, out TValue value, out long opIndex) + { + while (true) + { + var found = node.TryGetPosition(Comparer, in key, out var position); + if (node is LeafNode leaf) + { + opIndex = OpIndexProvider.NextId(); + if (found) + { + value = valueGetter(opIndex); + leaf.Update(position, in key, value); + node.WriteUnlock(); + return false; + } + value = valueGetter(opIndex); + leaf.Insert(position, in key, value); + Interlocked.Increment(ref _length); + node.WriteUnlock(); + return true; + } + if (found) + ++position; + var child = node.Children[position]; + child.WriteLock(); + if (child.IsFull) + { + var splitted = TrySplitChild(node, position, child); + child.WriteUnlock(); + if (!splitted) + { + continue; + } + + if (Comparer.Compare(in key, in node.Keys[position]) >= 0) + ++position; + + child = node.Children[position]; + child.WriteLock(); + } + node.WriteUnlock(); + node = child; + } + } +} \ No newline at end of file diff --git a/src/ZoneTree/Core/Replicator.cs b/src/ZoneTree/Core/Replicator.cs new file mode 100644 index 0000000..5750179 --- /dev/null +++ b/src/ZoneTree/Core/Replicator.cs @@ -0,0 +1,63 @@ +namespace Tenray.ZoneTree.Core; + +using System.Collections.Concurrent; +using System.Collections.Generic; +using Tenray.ZoneTree.Collections; + +public sealed class Replicator : IDisposable +{ + readonly IZoneTree Replica; + + readonly IZoneTree LatestOpIndexes; + + readonly IMaintainer Maintainer; + + bool isDisposed; + + public Replicator( + IZoneTree replica, + string dataPath, + Action> configure = null) + { + this.Replica = replica; + var factory = new ZoneTreeFactory() + .SetDataDirectory(dataPath); + if (configure != null) configure(factory); + LatestOpIndexes = factory.OpenOrCreate(); + Maintainer = LatestOpIndexes.CreateMaintainer(); + Maintainer.EnableJobForCleaningInactiveCaches = true; + } + + public void OnUpsert(TKey key, TValue value, long opIndex) + { + LatestOpIndexes.TryAtomicAddOrUpdate( + key, + (ref long newOpIndex) => + { + newOpIndex = opIndex; + return true; + }, + (ref long existingOpIndex) => + { + if (opIndex < existingOpIndex) + return false; + existingOpIndex = opIndex; + return true; + }, + (in long _, long _, OperationResult result) => + { + if (result == OperationResult.Cancelled) return; + Replica.Upsert(key, value); + }); + } + + public void Dispose() + { + if (isDisposed) return; + isDisposed = true; + Maintainer.EvictToDisk(); + Maintainer.WaitForBackgroundThreads(); + Maintainer.Dispose(); + LatestOpIndexes.Dispose(); + } +} diff --git a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs index 55c486c..d3def6d 100644 --- a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs +++ b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs @@ -1,4 +1,5 @@ using Tenray.ZoneTree.Collections; +using Tenray.ZoneTree.Collections.BTree; using Tenray.ZoneTree.Exceptions; using Tenray.ZoneTree.Segments; @@ -170,17 +171,25 @@ public bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex) } } + static OperationResultDelegate EmptyOperationResultDelegate = + (in TValue value, long opIndex, OperationResult result) => + { + }; + public bool TryAtomicAddOrUpdate( in TKey key, in TValue valueToAdd, ValueUpdaterDelegate valueUpdater, - out long opIndex) + OperationResultDelegate result) { if (IsReadOnly) throw new ZoneTreeIsReadOnlyException(); AddOrUpdateResult status; IMutableSegment mutableSegment; - opIndex = 0; + var opIndex = 0L; + if (result == null) + result = EmptyOperationResultDelegate; + while (true) { lock (AtomicUpdateLock) @@ -192,17 +201,40 @@ public bool TryAtomicAddOrUpdate( } else if (mutableSegment.TryGet(in key, out var existing)) { - if (!valueUpdater(ref existing)) return false; + if (!valueUpdater(ref existing)) + { + result(in existing, 0, OperationResult.Cancelled); + return false; + } status = mutableSegment.Upsert(key, existing, out opIndex); + if (status == AddOrUpdateResult.UPDATED) + { + result(in existing, opIndex, OperationResult.Updated); + return false; + } } else if (TryGetFromReadonlySegments(in key, out existing)) { - if (!valueUpdater(ref existing)) return false; + if (!valueUpdater(ref existing)) + { + result(in existing, 0, OperationResult.Cancelled); + return false; + } status = mutableSegment.Upsert(key, existing, out opIndex); + if (status == AddOrUpdateResult.ADDED) + { + result(in existing, opIndex, OperationResult.Updated); + return false; + } } else { status = mutableSegment.Upsert(key, valueToAdd, out opIndex); + if (status == AddOrUpdateResult.ADDED) + { + result(in existing, opIndex, OperationResult.Added); + return true; + } } } switch (status) @@ -213,7 +245,85 @@ public bool TryAtomicAddOrUpdate( MoveMutableSegmentForward(mutableSegment); continue; default: - return status == AddOrUpdateResult.ADDED; + throw new Exception("Impossible."); + } + } + } + + public bool TryAtomicAddOrUpdate( + in TKey key, + ValueAdderDelegate valueAdder, + ValueUpdaterDelegate valueUpdater, + OperationResultDelegate result) + { + if (IsReadOnly) + throw new ZoneTreeIsReadOnlyException(); + AddOrUpdateResult status; + IMutableSegment mutableSegment; + var opIndex = 0L; + if (result == null) + result = EmptyOperationResultDelegate; + while (true) + { + lock (AtomicUpdateLock) + { + mutableSegment = MutableSegment; + if (mutableSegment.IsFrozen) + { + status = AddOrUpdateResult.RETRY_SEGMENT_IS_FULL; + } + else if (mutableSegment.TryGet(in key, out var existing)) + { + if (!valueUpdater(ref existing)) + { + result(in existing, 0, OperationResult.Cancelled); + return false; + } + status = mutableSegment.Upsert(key, existing, out opIndex); + if (status == AddOrUpdateResult.UPDATED) + { + result(in existing, opIndex, OperationResult.Updated); + return false; + } + } + else if (TryGetFromReadonlySegments(in key, out existing)) + { + if (!valueUpdater(ref existing)) + { + result(in existing, 0, OperationResult.Cancelled); + return false; + } + status = mutableSegment.Upsert(key, existing, out opIndex); + if (status == AddOrUpdateResult.ADDED) + { + result(in existing, opIndex, OperationResult.Updated); + return false; + } + } + else + { + if (!valueAdder(ref existing)) + { + result(in existing, 0, OperationResult.Cancelled); + return false; + } + status = mutableSegment.Upsert(key, existing, out opIndex); + if (status == AddOrUpdateResult.ADDED) + { + result(in existing, opIndex, OperationResult.Added); + return true; + } + } + } + switch (status) + { + case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + continue; + case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: + MoveMutableSegmentForward(mutableSegment); + continue; + default: + throw new Exception("Impossible."); } } } @@ -249,6 +359,27 @@ public long Upsert(in TKey key, in TValue value) } } + public long Upsert(in TKey key, GetValueDelegate valueGetter) + { + if (IsReadOnly) + throw new ZoneTreeIsReadOnlyException(); + while (true) + { + var mutableSegment = MutableSegment; + var status = mutableSegment.Upsert(key, valueGetter, out var opIndex); + switch (status) + { + case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + continue; + case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: + MoveMutableSegmentForward(mutableSegment); + continue; + default: + return opIndex; + } + } + } + public bool TryDelete(in TKey key, out long opIndex) { if (IsReadOnly) diff --git a/src/ZoneTree/Directory.Build.props b/src/ZoneTree/Directory.Build.props index 4017992..4b303f9 100644 --- a/src/ZoneTree/Directory.Build.props +++ b/src/ZoneTree/Directory.Build.props @@ -5,8 +5,8 @@ Ahmed Yasin Koculu ZoneTree ZoneTree - 1.8.2.0 - 1.8.2.0 + 1.8.3.0 + 1.8.3.0 Ahmed Yasin Koculu ZoneTree ZoneTree is a persistent, high-performance, transactional, ACID-compliant ordered key-value database for NET. It can operate in memory or on local/cloud storage. diff --git a/src/ZoneTree/IZoneTree.cs b/src/ZoneTree/IZoneTree.cs index 32950af..ab5a461 100644 --- a/src/ZoneTree/IZoneTree.cs +++ b/src/ZoneTree/IZoneTree.cs @@ -1,4 +1,5 @@ -using Tenray.ZoneTree.Comparers; +using Tenray.ZoneTree.Collections.BTree; +using Tenray.ZoneTree.Comparers; using Tenray.ZoneTree.Logger; using Tenray.ZoneTree.Serializers; @@ -99,19 +100,37 @@ bool TryAtomicGetAndUpdate( bool TryAtomicUpdate(in TKey key, in TValue value, out long opIndex); /// - /// Attempts to add or update the specified key and value atomically across LSM-Tree segments. + /// Attempts to add or update the specified key and value atomically across LSM-Tree segments and calls the result delegate atomically. + /// valueUpdater can be called one or more times. /// /// The key of the element to add. /// The value of the element to add. It can be null. /// The delegate function that updates the value. - /// The operation index. + /// The operation result delegate. /// true if the key/value pair was added; /// false, if the key/value pair was updated. bool TryAtomicAddOrUpdate( in TKey key, in TValue valueToAdd, ValueUpdaterDelegate valueUpdater, - out long opIndex); + OperationResultDelegate result = null); + + /// + /// Attempts to add or update the specified key and value atomically across LSM-Tree segments and calls the result delegate atomically. + /// valueAdder can be called one or more times. + /// valueUpdater can be called one or more times. + /// + /// The key of the element to add. + /// he delegate function that adds the value. + /// The delegate function that updates the value. + /// The operation result delegate. + /// true if the key/value pair was added; + /// false, if the key/value pair was updated. + bool TryAtomicAddOrUpdate( + in TKey key, + ValueAdderDelegate valueAdder, + ValueUpdaterDelegate valueUpdater, + OperationResultDelegate result = null); /// /// Adds or updates the specified key/value pair atomically across LSM-Tree segments. @@ -153,6 +172,16 @@ bool TryAtomicAddOrUpdate( /// The operation index. It can be used to distrubute the operations in stable order. long Upsert(in TKey key, in TValue value); + /// + /// Adds or updates the specified key with a value getter. + /// Value getter receives the operation index as an argument. + /// It is useful when the user wants to save the operation index into the record. + /// + /// The key of the element to upsert. + /// The delegate provides the value to upsert. + /// The operation index. It can be used to distrubute the operations in stable order. + long Upsert(in TKey key, GetValueDelegate valueGetter); + /// /// Attempts to delete the specified key. /// @@ -266,4 +295,28 @@ IZoneTreeIterator CreateReverseIterator( /// The value type /// The value as a reference to be updated. /// true if the value is updated, false otherwise. -public delegate bool ValueUpdaterDelegate(ref TValue value); \ No newline at end of file +public delegate bool ValueUpdaterDelegate(ref TValue value); + +/// +/// Value adder delegate. +/// +/// The value type +/// The value as a reference to be added. +/// true if the value is added, false otherwise. +public delegate bool ValueAdderDelegate(ref TValue value); + +public enum OperationResult +{ + Added, + Updated, + Cancelled +} + +/// +/// Value adder delegate. +/// +/// The value type +/// The value that has been updated or added. +/// The operation index. +/// The operation result. +public delegate void OperationResultDelegate(in TValue value, long opIndex, OperationResult operationResult); \ No newline at end of file diff --git a/src/ZoneTree/Segments/IMutableSegment.cs b/src/ZoneTree/Segments/IMutableSegment.cs index f659de8..1869b5d 100644 --- a/src/ZoneTree/Segments/IMutableSegment.cs +++ b/src/ZoneTree/Segments/IMutableSegment.cs @@ -1,4 +1,5 @@ using Tenray.ZoneTree.Collections; +using Tenray.ZoneTree.Collections.BTree; using Tenray.ZoneTree.Core; namespace Tenray.ZoneTree.Segments; @@ -13,6 +14,11 @@ public interface IMutableSegment : IReadOnlySegment AddOrUpdateResult Upsert(in TKey key, in TValue value, out long opIndex); + AddOrUpdateResult Upsert( + in TKey key, + GetValueDelegate valueGetter, + out long opIndex); + AddOrUpdateResult Delete(in TKey key, out long opIndex); void Freeze(); diff --git a/src/ZoneTree/Segments/InMemory/MutableSegment.cs b/src/ZoneTree/Segments/InMemory/MutableSegment.cs index 87b82a7..fb134ef 100644 --- a/src/ZoneTree/Segments/InMemory/MutableSegment.cs +++ b/src/ZoneTree/Segments/InMemory/MutableSegment.cs @@ -172,6 +172,36 @@ public AddOrUpdateResult Upsert(in TKey key, in TValue value, out long opIndex) } } + public AddOrUpdateResult Upsert( + in TKey key, + GetValueDelegate valueGetter, + out long opIndex) + { + try + { + Interlocked.Increment(ref WritesInProgress); + + if (IsFrozenFlag) + { + opIndex = 0; + return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } + + if (BTree.Length >= MutableSegmentMaxItemCount) + { + opIndex = 0; + return AddOrUpdateResult.RETRY_SEGMENT_IS_FULL; + } + var result = BTree.Upsert(in key, valueGetter, out var value, out opIndex); + WriteAheadLog.Append(in key, in value, opIndex); + return result ? AddOrUpdateResult.ADDED : AddOrUpdateResult.UPDATED; + } + finally + { + Interlocked.Decrement(ref WritesInProgress); + } + } + public AddOrUpdateResult Delete(in TKey key, out long opIndex) { try