From 12c9407b14c13a5366d13e667aad6569c9eab417 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Mon, 13 Sep 2021 13:53:41 -0400 Subject: [PATCH 1/4] Make worker state variable threadsafe --- src/cluster.jl | 61 +++++++++++++++++++++++++++++--------- src/managers.jl | 2 +- test/distributed_exec.jl | 1 + test/threads.jl | 63 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 14 deletions(-) create mode 100644 test/threads.jl diff --git a/src/cluster.jl b/src/cluster.jl index d8cc052..bdc16e8 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -100,9 +100,9 @@ mutable struct Worker add_msgs::Array{Any,1} @atomic gcflag::Bool state::WorkerState - c_state::Condition # wait for state changes - ct_time::Float64 # creation time - conn_func::Any # used to setup connections lazily + c_state::Threads.Condition # wait for state changes, lock for state + ct_time::Float64 # creation time + conn_func::Any # used to setup connections lazily r_stream::IO w_stream::IO @@ -134,7 +134,7 @@ mutable struct Worker if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func) + w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func) w.initialized = Event() register_worker(w) w @@ -144,12 +144,16 @@ mutable struct Worker end function set_worker_state(w, state) - w.state = state - notify(w.c_state; all=true) + lock(w.c_state) do + w.state = state + notify(w.c_state; all=true) + end end function check_worker_state(w::Worker) + lock(w.c_state) if w.state === W_CREATED + unlock(w.c_state) if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -169,6 +173,8 @@ function check_worker_state(w::Worker) errormonitor(t) wait_for_conn(w) end + else + unlock(w.c_state) end end @@ -187,13 +193,25 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) + lock(w.c_state) if w.state === W_CREATED + unlock(w.c_state) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") - @async (sleep(timeout); notify(w.c_state; all=true)) - wait(w.c_state) - w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + T = Threads.@spawn begin + sleep($timeout) + lock(w.c_state) do + notify(w.c_state; all=true) + end + end + errormonitor(T) + lock(w.c_state) do + wait(w.c_state) + w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") + end + else + unlock(w.c_state) end nothing end @@ -491,7 +509,10 @@ function addprocs_locked(manager::ClusterManager; kwargs...) while true if isempty(launched) istaskdone(t_launch) && break - @async (sleep(1); notify(launch_ntfy)) + @async begin + sleep(1) + notify(launch_ntfy) + end wait(launch_ntfy) end @@ -645,7 +666,12 @@ function create_worker(manager, wconfig) # require the value of config.connect_at which is set only upon connection completion for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) - (jw.state === W_CREATED) && wait(jw.c_state) + # wait for wl to join + lock(jw.c_state) do + if jw.state === W_CREATED + wait(jw.c_state) + end + end push!(join_list, jw) end end @@ -668,7 +694,12 @@ function create_worker(manager, wconfig) end for wl in wlist - (wl.state === W_CREATED) && wait(wl.c_state) + lock(wl.c_state) do + if wl.state === W_CREATED + # wait for wl to join + wait(wl.c_state) + end + end push!(join_list, wl) end end @@ -685,7 +716,11 @@ function create_worker(manager, wconfig) @async manage(w.manager, w.id, w.config, :register) # wait for rr_ntfy_join with timeout timedout = false - @async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1)) + @async begin + sleep($timeout) + timedout = true + put!(rr_ntfy_join, 1) + end wait(rr_ntfy_join) if timedout error("worker did not connect within $timeout seconds") diff --git a/src/managers.jl b/src/managers.jl index 57f5859..1f85e79 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -176,7 +176,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: # Wait for all launches to complete. @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - @async try + @async try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e print(stderr, "exception launching on machine $(machine) : $(e)\n") diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 648cde5..06600a8 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -1925,4 +1925,5 @@ end # Run topology tests last after removing all workers, since a given # cluster at any time only supports a single topology. rmprocs(workers()) +include("threads.jl") include("topology.jl") diff --git a/test/threads.jl b/test/threads.jl new file mode 100644 index 0000000..57d99b7 --- /dev/null +++ b/test/threads.jl @@ -0,0 +1,63 @@ +using Test +using Distributed, Base.Threads +using Base.Iterators: product + +exeflags = ("--startup-file=no", + "--check-bounds=yes", + "--depwarn=error", + "--threads=2") + +function call_on(f, wid, tid) + remotecall(wid) do + t = Task(f) + ccall(:jl_set_task_tid, Cvoid, (Any, Cint), t, tid - 1) + schedule(t) + @assert threadid(t) == tid + t + end +end + +# Run function on process holding the data to only serialize the result of f. +# This becomes useful for things that cannot be serialized (e.g. running tasks) +# or that would be unnecessarily big if serialized. +fetch_from_owner(f, rr) = remotecall_fetch(f ∘ fetch, rr.where, rr) + +isdone(rr) = fetch_from_owner(istaskdone, rr) +isfailed(rr) = fetch_from_owner(istaskfailed, rr) + +@testset "RemoteChannel allows put!/take! from thread other than 1" begin + ws = ts = product(1:2, 1:2) + @testset "from worker $w1 to $w2 via 1" for (w1, w2) in ws + @testset "from thread $w1.$t1 to $w2.$t2" for (t1, t2) in ts + # We want (the default) lazyness, so that we wait for `Worker.c_state`! + procs_added = addprocs(2; exeflags, lazy=true) + @everywhere procs_added using Base.Threads + + p1 = procs_added[w1] + p2 = procs_added[w2] + chan_id = first(procs_added) + chan = RemoteChannel(chan_id) + send = call_on(p1, t1) do + put!(chan, nothing) + end + recv = call_on(p2, t2) do + take!(chan) + end + + # Wait on the spawned tasks on the owner + @sync begin + Threads.@spawn fetch_from_owner(wait, recv) + Threads.@spawn fetch_from_owner(wait, send) + end + + # Check the tasks + @test isdone(send) + @test isdone(recv) + + @test !isfailed(send) + @test !isfailed(recv) + + rmprocs(procs_added) + end + end +end From 5ced7b84268fab1b53c63177d38f233502839120 Mon Sep 17 00:00:00 2001 From: Valentin Churavy Date: Tue, 28 Sep 2021 23:51:17 -0400 Subject: [PATCH 2/4] use atomic field --- src/cluster.jl | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index bdc16e8..ac6e3f2 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -99,7 +99,7 @@ mutable struct Worker del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels? add_msgs::Array{Any,1} @atomic gcflag::Bool - state::WorkerState + @atomic state::WorkerState c_state::Threads.Condition # wait for state changes, lock for state ct_time::Float64 # creation time conn_func::Any # used to setup connections lazily @@ -145,15 +145,13 @@ end function set_worker_state(w, state) lock(w.c_state) do - w.state = state + @atomic w.state = state notify(w.c_state; all=true) end end function check_worker_state(w::Worker) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) if !isclusterlazy() if PGRP.topology === :all_to_all # Since higher pids connect with lower pids, the remote worker @@ -173,9 +171,8 @@ function check_worker_state(w::Worker) errormonitor(t) wait_for_conn(w) end - else - unlock(w.c_state) end + return nothing end exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker) @@ -193,9 +190,7 @@ function exec_conn_func(w::Worker) end function wait_for_conn(w) - lock(w.c_state) if w.state === W_CREATED - unlock(w.c_state) timeout = worker_timeout() - (time() - w.ct_time) timeout <= 0 && error("peer $(w.id) has not connected to $(myid())") @@ -210,8 +205,6 @@ function wait_for_conn(w) wait(w.c_state) w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds") end - else - unlock(w.c_state) end nothing end @@ -667,8 +660,8 @@ function create_worker(manager, wconfig) for jw in PGRP.workers if (jw.id != 1) && (jw.id < w.id) # wait for wl to join - lock(jw.c_state) do - if jw.state === W_CREATED + if jw.state === W_CREATED + lock(jw.c_state) do wait(jw.c_state) end end From a58c1dc456a7489672e13b7c6d84a68e7b8e65fc Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sat, 14 Oct 2023 10:18:05 -0700 Subject: [PATCH 3/4] init_multi: Be more thread-safe --- src/cluster.jl | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/cluster.jl b/src/cluster.jl index ac6e3f2..e676af7 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -1318,18 +1318,16 @@ end using Random: randstring -let inited = false - # do initialization that's only needed when there is more than 1 processor - global function init_multi() - if !inited - inited = true - push!(Base.package_callbacks, _require_callback) - atexit(terminate_all_workers) - init_bind_addr() - cluster_cookie(randstring(HDR_COOKIE_LEN)) - end - return nothing +# do initialization that's only needed when there is more than 1 processor +const inited = Threads.Atomic{Bool}(false) +function init_multi() + if !Threads.atomic_cas!(inited, false, true) + push!(Base.package_callbacks, _require_callback) + atexit(terminate_all_workers) + init_bind_addr() + cluster_cookie(randstring(HDR_COOKIE_LEN)) end + return nothing end function init_parallel() From 88680df3f1a2d88f7cfc867dfe95ad9b71968fdf Mon Sep 17 00:00:00 2001 From: Julian P Samaroo Date: Sat, 14 Oct 2023 10:18:56 -0700 Subject: [PATCH 4/4] Add gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..df02284 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +Manifest.toml +*.swp