From 0d688242c6fc0f5ad913165942ca1c3fa380228b Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Thu, 4 Apr 2024 11:10:40 -0700 Subject: [PATCH] Vendor ReadWriteLock from ConcurrentUtils --- Project.toml | 3 +- src/MemPool.jl | 1 + src/datastore.jl | 7 +- src/read_write_lock.jl | 157 +++++++++++++++++++++++++++++++++++++++++ src/storage.jl | 4 +- 5 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 src/read_write_lock.jl diff --git a/Project.toml b/Project.toml index 80a8629..a0e2204 100644 --- a/Project.toml +++ b/Project.toml @@ -6,7 +6,6 @@ version = "0.4.7" [deps] ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd" -ConcurrentUtils = "3df5f688-6c4c-4767-8685-17f5ad261477" DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" Mmap = "a63ad114-7e13-5084-954f-fe012c677804" @@ -14,10 +13,10 @@ Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" +UnsafeAtomics = "013be700-e6cd-48c3-b4a1-df204f14c38f" [compat] ConcurrentCollections = "0.1" -ConcurrentUtils = "0.1" DataStructures = "0.18" ScopedValues = "1" julia = "1.8" diff --git a/src/MemPool.jl b/src/MemPool.jl index 98d0219..974de48 100644 --- a/src/MemPool.jl +++ b/src/MemPool.jl @@ -54,6 +54,7 @@ approx_size(f::FileRef) = f.size include("io.jl") include("lock.jl") +include("read_write_lock.jl") include("datastore.jl") """ diff --git a/src/datastore.jl b/src/datastore.jl index bbfed27..ad38b6b 100644 --- a/src/datastore.jl +++ b/src/datastore.jl @@ -1,5 +1,4 @@ using Distributed -import ConcurrentUtils as CU mutable struct DRef owner::Int @@ -515,7 +514,7 @@ function poolget(ref::DRef) original_ref = ref # Check global redirect cache - ref = CU.lock_read(REDIRECT_CACHE_LOCK) do + ref = lock_read(REDIRECT_CACHE_LOCK) do get(REDIRECT_CACHE, ref, ref) end @@ -546,7 +545,7 @@ end function _getlocal(id, remote) state = with_lock(()->datastore[id], datastore_lock) - CU.lock_read(state.lock) do + lock_read(state.lock) do if state.redirect !== nothing return RedirectTo(state.redirect) end @@ -623,7 +622,7 @@ struct RedirectTo end const REDIRECT_CACHE = WeakKeyDict{DRef,DRef}() -const REDIRECT_CACHE_LOCK = CU.ReadWriteLock() +const REDIRECT_CACHE_LOCK = ReadWriteLock() ## Default data directory diff --git a/src/read_write_lock.jl b/src/read_write_lock.jl new file mode 100644 index 0000000..9174604 --- /dev/null +++ b/src/read_write_lock.jl @@ -0,0 +1,157 @@ +# Adapted from ConcurrentUtils/src/read_write_lock.jl + +import UnsafeAtomics + +abstract type AbstractReadWriteLock <: Base.AbstractLock end + +const NOTLOCKED = UInt64(0) +const NREADERS_INC = UInt64(2) +const WRITELOCK_MASK = UInt64(1) + +const NReadersAndWritelock = UInt64 + +mutable struct ReadWriteLock <: AbstractReadWriteLock + @atomic nreaders_and_writelock::NReadersAndWritelock + # TODO: use condition variables with lock-free notify + const lock::ReentrantLock + const cond_read::Threads.Condition + const cond_write::Threads.Condition +end + +function fieldoffset_by_name(T, field) + for idx in 1:nfields(T) + if fieldnames(T)[idx] == field + return fieldoffset(T, idx) + end + end + error("No such field for $T: $field") +end +const OFFSET_NREADERS_AND_WRITELOCK = + fieldoffset_by_name(ReadWriteLock, :nreaders_and_writelock) + +function ReadWriteLock() + lock = ReentrantLock() + cond_read = Threads.Condition(lock) + cond_write = Threads.Condition(lock) + return ReadWriteLock(NOTLOCKED, lock, cond_read, cond_write) +end + +# Not very efficient but lock-free +function trylock_read(rwlock::ReadWriteLock; nspins = -∞, ntries = -∞) + local ns::Int = 0 + local nt::Int = 0 + while true + old = @atomic :monotonic rwlock.nreaders_and_writelock + if iszero(old & WRITELOCK_MASK) + # Try to acquire reader lock without the responsibility to receive or send the + # notification: + old, success = @atomicreplace( + :acquire_release, + :monotonic, + rwlock.nreaders_and_writelock, + old => old + NREADERS_INC, + ) + success && return true + nt += 1 + nt < ntries || return false + end + ns += 1 + ns < nspins || return false + end +end + +function lock_read(rwlock::ReadWriteLock) + + # Using hardware FAA + ptr = Ptr{NReadersAndWritelock}( + pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK, + ) + GC.@preserve rwlock begin + _, n = UnsafeAtomics.modify!(ptr, +, NREADERS_INC, UnsafeAtomics.acq_rel) + end + # n = @atomic :acquire_release rwlock.nreaders_and_writelock += NREADERS_INC + + if iszero(n & WRITELOCK_MASK) + return + end + lock(rwlock.lock) do + while true + local n = @atomic :acquire rwlock.nreaders_and_writelock + if iszero(n & WRITELOCK_MASK) + @assert n > 0 + return + end + wait(rwlock.cond_read) + end + end +end + +function unlock_read(rwlock::ReadWriteLock) + + # Using hardware FAA + ptr = Ptr{NReadersAndWritelock}( + pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK, + ) + GC.@preserve rwlock begin + _, n = UnsafeAtomics.modify!(ptr, -, NREADERS_INC, UnsafeAtomics.acq_rel) + end + # n = @atomic :acquire_release rwlock.nreaders_and_writelock -= NREADERS_INC + + @assert iszero(n & WRITELOCK_MASK) + if iszero(n) + lock(rwlock.lock) do + notify(rwlock.cond_write; all = false) + end + end + return +end + +function Base.trylock(rwlock::ReadWriteLock) + _, success = @atomicreplace( + :acquire_release, + :monotonic, + rwlock.nreaders_and_writelock, + NOTLOCKED => WRITELOCK_MASK, + ) + return success::Bool +end + +function Base.lock(rwlock::ReadWriteLock) + if trylock(rwlock) + return + end + lock(rwlock.lock) do + while true + if trylock(rwlock) + return + end + wait(rwlock.cond_write) + end + end +end + +function Base.unlock(rwlock::ReadWriteLock) + @assert !iszero(rwlock.nreaders_and_writelock & WRITELOCK_MASK) + @atomic :acquire_release rwlock.nreaders_and_writelock &= ~WRITELOCK_MASK + lock(rwlock.lock) do + notify(rwlock.cond_read) + notify(rwlock.cond_write; all = false) + end + return +end + +### +### High-level APIs +### + +lock_read(lck) = lock(lck) +unlock_read(lck) = unlock(lck) + +function lock_read(f, lock) + lock_read(lock) + try + return f() + finally + unlock_read(lock) + end +end diff --git a/src/storage.jl b/src/storage.jl index 9b7f6c5..89289aa 100644 --- a/src/storage.jl +++ b/src/storage.jl @@ -316,7 +316,7 @@ mutable struct RefState # Destructor, if any destructor::Any # A Reader-Writer lock to protect access to this struct - lock::CU.ReadWriteLock + lock::ReadWriteLock # The DRef that this value may be redirecting to redirect::Union{DRef,Nothing} end @@ -326,7 +326,7 @@ RefState(storage::StorageState, size::Integer; RefState(storage, size, tag, leaf_tag, destructor, - CU.ReadWriteLock(), nothing) + ReadWriteLock(), nothing) function Base.getproperty(state::RefState, field::Symbol) if field === :storage throw(ArgumentError("Cannot directly read `:storage` field of `RefState`\nUse `storage_read(state)` instead"))