diff --git a/README.md b/README.md index 40ae042b..ab451e08 100644 --- a/README.md +++ b/README.md @@ -24,20 +24,20 @@ In addition, some concurrency and parallelism primitives are provided: ## Usage -The user can create several thread pools. These pools use regular posix threads, -but the threads are spread across multiple domains (on OCaml 5), which enables -parallelism. +The user can create several thread pools (implementing the interface `Runner.t`). +These pools use regular posix threads, but the threads are spread across +multiple domains (on OCaml 5), which enables parallelism. -The function `Pool.run_async pool task` runs `task()` on one of the workers -of `pool`, as soon as one is available. No result is returned. +The function `Runner.run_async pool task` schedules `task()` to run on one of +the workers of `pool`, as soon as one is available. No result is returned by `run_async`. ```ocaml # #require "threads";; -# let pool = Moonpool.Pool.create ~min:4 ();; +# let pool = Moonpool.Fifo_pool.create ~min:4 ();; val pool : Moonpool.Runner.t = # begin - Moonpool.Pool.run_async pool + Moonpool.Runner.run_async pool (fun () -> Thread.delay 0.1; print_endline "running from the pool"); @@ -49,11 +49,13 @@ running from the pool - : unit = () ``` -To wait until the task is done, you can use `Pool.run_wait_block` instead: +To wait until the task is done, you can use `Runner.run_wait_block`[^1] instead: + +[^1]: beware of deadlock! See documentation for more details. ```ocaml # begin - Moonpool.Pool.run_wait_block pool + Moonpool.Runner.run_wait_block pool (fun () -> Thread.delay 0.1; print_endline "running from the pool"); @@ -155,7 +157,7 @@ val expected_sum : int = 5050 On OCaml 5, again using effect handlers, the module `Fork_join` implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model). -It must run on a pool (using [Pool.run] or inside a future via [Future.spawn]). +It must run on a pool (using [Runner.run_async] or inside a future via [Fut.spawn]). ```ocaml # let rec select_sort arr i len = @@ -257,7 +259,7 @@ This works for OCaml >= 4.08. the same pool, too — this is useful for threads blocking on IO). A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core. - Multiple threads have to share a single core and do not run in parallel on it[^1]. + Multiple threads have to share a single core and do not run in parallel on it[^2]. We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool. TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores, @@ -273,4 +275,4 @@ MIT license. $ opam install moonpool ``` -[^1]: let's not talk about hyperthreading. +[^2]: let's not talk about hyperthreading. diff --git a/benchs/fib_rec.ml b/benchs/fib_rec.ml index 385bfed4..06341ce1 100644 --- a/benchs/fib_rec.ml +++ b/benchs/fib_rec.ml @@ -21,7 +21,7 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let create_pool ~psize ~kind () = match kind with | "fifo" -> Fifo_pool.create ~min:psize () - | "pool" -> Pool.create ~min:psize () + | "pool" -> Ws_pool.create ~min:psize () | _ -> assert false let run ~psize ~n ~seq ~niter ~kind () : unit = @@ -38,7 +38,7 @@ let run ~psize ~n ~seq ~niter ~kind () : unit = in Printf.printf "fib %d = %d\n%!" n res done; - if not seq then Pool.shutdown (Lazy.force pool) + if not seq then Ws_pool.shutdown (Lazy.force pool) let () = let n = ref 40 in diff --git a/benchs/pi.ml b/benchs/pi.ml index 01017ae9..65304a80 100644 --- a/benchs/pi.ml +++ b/benchs/pi.ml @@ -21,9 +21,9 @@ let with_pool ~kind f = match kind with | "pool" -> if !j = 0 then - Pool.with_ ~per_domain:1 f + Ws_pool.with_ ~per_domain:1 f else - Pool.with_ ~min:!j f + Ws_pool.with_ ~min:!j f | "fifo" -> if !j = 0 then Fifo_pool.with_ ~per_domain:1 f @@ -35,7 +35,7 @@ let with_pool ~kind f = let run_par1 ~kind (num_steps : int) : float = let@ pool = with_pool ~kind () in - let num_tasks = Pool.size pool in + let num_tasks = Ws_pool.size pool in let step = 1. /. float num_steps in let global_sum = Lock.create 0. in @@ -64,12 +64,12 @@ let run_par1 ~kind (num_steps : int) : float = let run_fork_join ~kind num_steps : float = let@ pool = with_pool ~kind () in - let num_tasks = Pool.size pool in + let num_tasks = Ws_pool.size pool in let step = 1. /. float num_steps in let global_sum = Lock.create 0. in - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.for_ ~chunk_size:(3 + (num_steps / num_tasks)) num_steps diff --git a/src/fut.ml b/src/fut.ml index 42767b61..0a5332ed 100644 --- a/src/fut.ml +++ b/src/fut.ml @@ -97,7 +97,7 @@ let spawn ~on f : _ t = fulfill promise res in - Pool.run_async on task; + Runner.run_async on task; fut let reify_error (f : 'a t) : 'a or_error t = @@ -131,7 +131,7 @@ let map ?on ~f fut : _ t = match on with | None -> map_and_fulfill () - | Some on -> Pool.run_async on map_and_fulfill); + | Some on -> Runner.run_async on map_and_fulfill); fut2 @@ -158,14 +158,14 @@ let bind ?on ~f fut : _ t = | None -> apply_f_to_res r | Some on -> let fut2, promise = make () in - Pool.run_async on (bind_and_fulfill r promise); + Runner.run_async on (bind_and_fulfill r promise); fut2) | None -> let fut2, promise = make () in on_result fut (fun r -> match on with | None -> bind_and_fulfill r promise () - | Some on -> Pool.run_async on (bind_and_fulfill r promise)); + | Some on -> Runner.run_async on (bind_and_fulfill r promise)); fut2 @@ -403,7 +403,7 @@ module type INFIX = sig end module Infix_ (X : sig - val pool : Pool.t option + val pool : Runner.t option end) : INFIX = struct let[@inline] ( >|= ) x f = map ?on:X.pool ~f x let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x @@ -420,7 +420,7 @@ end) include Infix_local module Infix (X : sig - val pool : Pool.t + val pool : Runner.t end) = Infix_ (struct let pool = Some X.pool diff --git a/src/moonpool.ml b/src/moonpool.ml index fb0a3661..ed1af755 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -9,7 +9,8 @@ module Chan = Chan module Fork_join = Fork_join module Fut = Fut module Lock = Lock -module Pool = Pool +module Pool = Fifo_pool +module Ws_pool = Ws_pool module Runner = Runner module Fifo_pool = Fifo_pool diff --git a/src/moonpool.mli b/src/moonpool.mli index 6edeb1e7..4028e858 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -9,10 +9,14 @@ primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}). *) -module Pool = Pool +module Ws_pool = Ws_pool module Fifo_pool = Fifo_pool module Runner = Runner +module Pool = Fifo_pool +[@@deprecated "use Fifo_pool or Ws_pool"] +(** Default pool. Please explicitly pick an implementation instead. *) + val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t (** Similar to {!Thread.create}, but it picks a background domain at random to run the thread. This ensures that we don't always pick the same domain diff --git a/src/pool.ml b/src/ws_pool.ml similarity index 91% rename from src/pool.ml rename to src/ws_pool.ml index 01a093b3..fcf5eed9 100644 --- a/src/pool.ml +++ b/src/ws_pool.ml @@ -4,9 +4,6 @@ include Runner let ( let@ ) = ( @@ ) -type thread_loop_wrapper = - thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit - type worker_state = { mutable thread: Thread.t; q: task WSQ.t; (** Work stealing queue *) @@ -227,7 +224,6 @@ let shutdown_ ~wait (self : state) : unit = type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> - ?thread_wrappers:thread_loop_wrapper list -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?min:int -> @@ -236,9 +232,8 @@ type ('a, 'b) create_args = (** Arguments used in {!create}. See {!create} for explanations. *) let create ?(on_init_thread = default_thread_init_exit_) - ?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = []) - ?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1) - ?(per_domain = 0) () : t = + ?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ()) + ?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t = (* wrapper *) let around_task = match around_task with @@ -294,16 +289,9 @@ let create ?(on_init_thread = default_thread_init_exit_) on_init_thread ~dom_id:dom_idx ~t_id (); let run () = worker_thread_ pool runner w ~on_exn ~around_task in - (* the actual worker loop is [worker_thread_], with all - wrappers for this pool and for all pools (global_thread_wrappers_) *) - let run' = - List.fold_left - (fun run f -> f ~thread ~pool:runner run) - run thread_wrappers - in (* now run the main loop *) - Fun.protect run' ~finally:(fun () -> + Fun.protect run ~finally:(fun () -> (* on termination, decrease refcount of underlying domain *) D_pool_.decr_on dom_idx); on_exit_thread ~dom_id:dom_idx ~t_id () @@ -335,11 +323,11 @@ let create ?(on_init_thread = default_thread_init_exit_) runner -let with_ ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task - ?min ?per_domain () f = +let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain + () f = let pool = - create ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task - ?min ?per_domain () + create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain + () in let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in f pool diff --git a/src/pool.mli b/src/ws_pool.mli similarity index 76% rename from src/pool.mli rename to src/ws_pool.mli index f7a42633..4775024c 100644 --- a/src/pool.mli +++ b/src/ws_pool.mli @@ -1,7 +1,13 @@ -(** Thread pool. +(** Work-stealing thread pool. - A pool of threads. The pool contains a fixed number of threads that - wait for work items to come, process these, and loop. + A pool of threads with a worker-stealing scheduler. + The pool contains a fixed number of threads that wait for work + items to come, process these, and loop. + + This is good for CPU-intensive tasks that feature a lot of small tasks. + Note that tasks will not always be processed in the order they are + scheduled, so this is not great for workloads where the latency + of individual tasks matter (for that see {!Fifo_pool}). This implements {!Runner.t} since 0.3. @@ -15,18 +21,9 @@ include module type of Runner -type thread_loop_wrapper = - thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit -(** A thread wrapper [f] takes the current thread, the current pool, - and the worker function [loop : unit -> unit] which is - the worker's main loop, and returns a new loop function. - By default it just returns the same loop function but it can be used - to install tracing, effect handlers, etc. *) - type ('a, 'b) create_args = ?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) -> ?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) -> - ?thread_wrappers:thread_loop_wrapper list -> ?on_exn:(exn -> Printexc.raw_backtrace -> unit) -> ?around_task:(t -> 'b) * (t -> 'b -> unit) -> ?min:int -> @@ -47,8 +44,6 @@ val create : (unit -> t, _) create_args If both [min] and [per_domain] are specified, the maximum of both [min] and [per_domain * num_of_domains] is used. @param on_exit_thread called at the end of each thread in the pool - @param thread_wrappers a list of {!thread_loop_wrapper} functions - to use for this pool's workers. @param around_task a pair of [before, after], where [before pool] is called before a task is processed, on the worker thread about to run it, and returns [x]; and [after pool x] is called by diff --git a/test/effect-based/t_fib1.ml b/test/effect-based/t_fib1.ml index e8d2f534..ca3f2861 100644 --- a/test/effect-based/t_fib1.ml +++ b/test/effect-based/t_fib1.ml @@ -26,13 +26,13 @@ let fib ~on x : int Fut.t = let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int = - let pool = Pool.create ~min:8 () in + let pool = Ws_pool.create ~min:8 () in fib ~on:pool 40 |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let pool = Pool.create ~min:8 () in + let pool = Ws_pool.create ~min:8 () in assert ( List.init 10 (fib ~on:pool) @@ -42,7 +42,7 @@ let run_test () = let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in let res = Fut.join_array fibs |> Fut.wait_block in - Pool.shutdown pool; + Ws_pool.shutdown pool; assert (res = Ok (Array.make 3 fib_40)) diff --git a/test/effect-based/t_fib_fork_join.ml b/test/effect-based/t_fib_fork_join.ml index c6898833..bdf60337 100644 --- a/test/effect-based/t_fib_fork_join.ml +++ b/test/effect-based/t_fib_fork_join.ml @@ -27,13 +27,13 @@ let fib ~on x : int Fut.t = let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int = - let pool = Pool.create ~min:8 () in + let pool = Ws_pool.create ~min:8 () in fib ~on:pool 40 |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let pool = Pool.create ~min:8 () in + let pool = Ws_pool.create ~min:8 () in assert ( List.init 10 (fib ~on:pool) @@ -43,7 +43,7 @@ let run_test () = let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in let res = Fut.join_array fibs |> Fut.wait_block in - Pool.shutdown pool; + Ws_pool.shutdown pool; assert (res = Ok (Array.make 3 fib_40)) diff --git a/test/effect-based/t_fib_fork_join_all.ml b/test/effect-based/t_fib_fork_join_all.ml index e1ae83f4..ed82902e 100644 --- a/test/effect-based/t_fib_fork_join_all.ml +++ b/test/effect-based/t_fib_fork_join_all.ml @@ -22,13 +22,13 @@ let rec fib x : int = ) let fib_40 : int = - let@ pool = Pool.with_ ~min:8 () in + let@ pool = Ws_pool.with_ ~min:8 () in Fut.spawn ~on:pool (fun () -> fib 40) |> Fut.wait_block_exn let () = Printf.printf "fib 40 = %d\n%!" fib_40 let run_test () = - let@ pool = Pool.with_ ~min:8 () in + let@ pool = Ws_pool.with_ ~min:8 () in let fut = Fut.spawn ~on:pool (fun () -> @@ -37,7 +37,7 @@ let run_test () = in let res = Fut.wait_block_exn fut in - Pool.shutdown pool; + Ws_pool.shutdown pool; assert (res = (Array.make 3 fib_40 |> Array.to_list)) diff --git a/test/effect-based/t_fork_join.ml b/test/effect-based/t_fork_join.ml index 7fc8fa31..5b467187 100644 --- a/test/effect-based/t_fork_join.ml +++ b/test/effect-based/t_fork_join.ml @@ -5,11 +5,11 @@ let ( let@ ) = ( @@ ) open! Moonpool -let pool = Pool.create ~min:4 () +let pool = Ws_pool.create ~min:4 () let () = let x = - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> let x, y = Fork_join.both (fun () -> @@ -25,7 +25,7 @@ let () = let () = try - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.both_ignore (fun () -> Thread.delay 0.005) (fun () -> @@ -36,21 +36,21 @@ let () = let () = let par_sum = - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.all_init 42 (fun i -> i * i) |> List.fold_left ( + ) 0) in let exp_sum = List.init 42 (fun x -> x * x) |> List.fold_left ( + ) 0 in assert (par_sum = exp_sum) let () = - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.for_ 0 (fun _ _ -> assert false)); () let () = let total_sum = Atomic.make 0 in - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.for_ ~chunk_size:5 100 (fun low high -> (* iterate on the range sequentially. The range should have 5 items or less. *) let local_sum = ref 0 in @@ -63,7 +63,7 @@ let () = let () = let total_sum = Atomic.make 0 in - Pool.run_wait_block pool (fun () -> + Ws_pool.run_wait_block pool (fun () -> Fork_join.for_ ~chunk_size:1 100 (fun low high -> assert (low = high); ignore (Atomic.fetch_and_add total_sum low : int))); @@ -270,7 +270,7 @@ end let t_eval = let arb = Q.set_stats [ "size", Evaluator.size ] Evaluator.arb in Q.Test.make ~name:"same eval" arb (fun e -> - let@ pool = Pool.with_ ~min:4 () in + let@ pool = Ws_pool.with_ ~min:4 () in (* Printf.eprintf "eval %s\n%!" (Evaluator.show e); *) let x = Evaluator.eval_seq e in let y = Evaluator.eval_fork_join ~pool e in @@ -288,8 +288,8 @@ let t_for_nested ~min ~chunk_size () = let ref_l2 = List.map (List.map neg) ref_l1 in let l1, l2 = - let@ pool = Pool.with_ ~min () in - let@ () = Pool.run_wait_block pool in + let@ pool = Ws_pool.with_ ~min () in + let@ () = Ws_pool.run_wait_block pool in let l1 = Fork_join.map_list ~chunk_size (Fork_join.map_list ~chunk_size neg) l in @@ -310,8 +310,8 @@ let t_map ~chunk_size () = Q.Test.make ~name:"map1" Q.(small_list small_int |> Q.set_stats [ "len", List.length ]) (fun l -> - let@ pool = Pool.with_ ~min:4 () in - let@ () = Pool.run_wait_block pool in + let@ pool = Ws_pool.with_ ~min:4 () in + let@ () = Ws_pool.run_wait_block pool in let a1 = Fork_join.map_list ~chunk_size string_of_int l |> Array.of_list diff --git a/test/effect-based/t_fork_join_heavy.ml b/test/effect-based/t_fork_join_heavy.ml index be86299a..ad9f7044 100644 --- a/test/effect-based/t_fork_join_heavy.ml +++ b/test/effect-based/t_fork_join_heavy.ml @@ -27,8 +27,8 @@ let run ~min () = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "step" in let l1, l2 = - let@ pool = Pool.with_ ~min () in - let@ () = Pool.run_wait_block pool in + let@ pool = Ws_pool.with_ ~min () in + let@ () = Ws_pool.run_wait_block pool in let l1, l2 = Fork_join.both diff --git a/test/effect-based/t_futs1.ml b/test/effect-based/t_futs1.ml index be58f50b..182ca9d5 100644 --- a/test/effect-based/t_futs1.ml +++ b/test/effect-based/t_futs1.ml @@ -2,7 +2,7 @@ open! Moonpool -let pool = Pool.create ~min:4 () +let pool = Ws_pool.create ~min:4 () let () = let fut = Array.init 10 (fun i -> Fut.spawn ~on:pool (fun () -> i)) in diff --git a/test/effect-based/t_many.ml b/test/effect-based/t_many.ml index 23e1a929..4362932c 100644 --- a/test/effect-based/t_many.ml +++ b/test/effect-based/t_many.ml @@ -34,15 +34,15 @@ let () = run ~pool ()); (print_endline "with WS(1)"; - let@ pool = Pool.with_ ~min:1 () in + let@ pool = Ws_pool.with_ ~min:1 () in run ~pool ()); (print_endline "with WS(2)"; - let@ pool = Pool.with_ ~min:2 () in + let@ pool = Ws_pool.with_ ~min:2 () in run ~pool ()); (print_endline "with WS(4)"; - let@ pool = Pool.with_ ~min:4 () in + let@ pool = Ws_pool.with_ ~min:4 () in run ~pool ()); () diff --git a/test/effect-based/t_sort.ml b/test/effect-based/t_sort.ml index a732c740..8d3fe17c 100644 --- a/test/effect-based/t_sort.ml +++ b/test/effect-based/t_sort.ml @@ -59,7 +59,7 @@ let rec quicksort arr i len : unit = (fun () -> quicksort arr !low (len - (!low - i))) ) -let pool = Moonpool.Pool.create ~min:8 () +let pool = Moonpool.Ws_pool.create ~min:8 () let () = let arr = Array.init 400_000 (fun _ -> Random.int 300_000) in diff --git a/test/t_bench1.ml b/test/t_bench1.ml index abf4a7f2..95cd87a5 100644 --- a/test/t_bench1.ml +++ b/test/t_bench1.ml @@ -8,7 +8,7 @@ let rec fib x = let run ~psize ~n ~j () : _ Fut.t = Printf.printf "pool size=%d, n=%d, j=%d\n%!" psize n j; - let pool = Pool.create ~min:psize ~per_domain:0 () in + let pool = Ws_pool.create ~min:psize ~per_domain:0 () in (* TODO: a ppx for tracy so we can use instrumentation *) let loop () = diff --git a/test/t_chan_train.ml b/test/t_chan_train.ml index 5d1c40ef..bb3e24f7 100644 --- a/test/t_chan_train.ml +++ b/test/t_chan_train.ml @@ -1,7 +1,7 @@ open Moonpool (* large pool, some of our tasks below are long lived *) -let pool = Pool.create ~min:30 () +let pool = Ws_pool.create ~min:30 () open (val Fut.infix pool) diff --git a/test/t_fib.ml b/test/t_fib.ml index f54d7118..3a98e395 100644 --- a/test/t_fib.ml +++ b/test/t_fib.ml @@ -5,7 +5,7 @@ let ( let@ ) = ( @@ ) let with_pool ~kind () f = match kind with | `Fifo_pool -> Fifo_pool.with_ ~min:4 () f - | `Pool -> Pool.with_ ~min:4 () f + | `Ws_pool -> Ws_pool.with_ ~min:4 () f let rec fib x = if x <= 1 then @@ -18,7 +18,7 @@ let () = assert (List.init 10 fib = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let run_test ~pool () = let fibs = Array.init 30 (fun n -> Fut.spawn ~on:pool (fun () -> fib n)) in let res = Fut.join_array fibs |> Fut.wait_block in - Pool.shutdown pool; + Ws_pool.shutdown pool; assert ( res @@ -74,5 +74,5 @@ let run ~kind () = Array.iter Thread.join jobs let () = - run ~kind:`Pool (); + run ~kind:`Ws_pool (); run ~kind:`Fifo_pool () diff --git a/test/t_fib_rec.ml b/test/t_fib_rec.ml index 01cd7c16..de704f34 100644 --- a/test/t_fib_rec.ml +++ b/test/t_fib_rec.ml @@ -25,9 +25,9 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ]) let fib_40 : int lazy_t = lazy (let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "fib40" in - let pool = Pool.create ~min:8 () in + let pool = Ws_pool.create ~min:8 () in let r = fib ~on:pool 40 |> Fut.wait_block_exn in - Pool.shutdown pool; + Ws_pool.shutdown pool; r) let run_test ~pool () = @@ -49,7 +49,7 @@ let run_test ~pool () = let run_test_size ~size () = Printf.printf "test pool(%d)\n%!" size; - let@ pool = Pool.with_ ~min:size () in + let@ pool = Ws_pool.with_ ~min:size () in run_test ~pool () let run_test_fifo ~size () = diff --git a/test/t_futs1.ml b/test/t_futs1.ml index 930c8bdc..ee2d96a6 100644 --- a/test/t_futs1.ml +++ b/test/t_futs1.ml @@ -1,7 +1,7 @@ open! Moonpool -let pool = Pool.create ~min:4 () -let pool2 = Pool.create ~min:2 () +let pool = Ws_pool.create ~min:4 () +let pool2 = Ws_pool.create ~min:2 () let () = let fut = Fut.return 1 in diff --git a/test/t_props.ml b/test/t_props.ml index be586251..9fa64fbe 100644 --- a/test/t_props.ml +++ b/test/t_props.ml @@ -8,7 +8,7 @@ let add_test t = tests := t :: !tests let with_pool ~kind () f = match kind with | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f - | `Pool -> Pool.with_ ~min:4 ~per_domain:1 () f + | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f let () = add_test @@ fun ~kind -> @@ -48,7 +48,7 @@ let () = let () = let tests = - List.map (fun t -> [ t ~kind:`Fifo_pool; t ~kind:`Pool ]) !tests + List.map (fun t -> [ t ~kind:`Fifo_pool; t ~kind:`Ws_pool ]) !tests |> List.flatten in QCheck_base_runner.run_tests_main tests diff --git a/test/t_resource.ml b/test/t_resource.ml index 005ed4c3..c990f708 100644 --- a/test/t_resource.ml +++ b/test/t_resource.ml @@ -5,7 +5,7 @@ let ( let@ ) = ( @@ ) let with_pool ~kind () f = match kind with | `Fifo_pool -> Fifo_pool.with_ ~min:4 ~per_domain:1 () f - | `Pool -> Pool.with_ ~min:4 ~per_domain:1 () f + | `Ws_pool -> Ws_pool.with_ ~min:4 ~per_domain:1 () f (* test proper resource handling *) let run ~kind () = @@ -18,10 +18,10 @@ let run ~kind () = (* allocate a new pool at each iteration *) let@ p = with_pool ~kind () in - Pool.run_wait_block p (fun () -> Atomic.incr a) + Ws_pool.run_wait_block p (fun () -> Atomic.incr a) done; assert (Atomic.get a = 1_000) let () = - run ~kind:`Pool (); + run ~kind:`Ws_pool (); run ~kind:`Fifo_pool () diff --git a/test/t_tree_futs.ml b/test/t_tree_futs.ml index 83a9d80c..3507be0a 100644 --- a/test/t_tree_futs.ml +++ b/test/t_tree_futs.ml @@ -5,7 +5,7 @@ let ( let@ ) = ( @@ ) let with_pool ~kind ~j () f = match kind with | `Fifo_pool -> Fifo_pool.with_ ~min:j () f - | `Pool -> Pool.with_ ~min:j () f + | `Ws_pool -> Ws_pool.with_ ~min:j () f type 'a tree = | Leaf of 'a @@ -88,5 +88,5 @@ let () = (* Tracy_client_trace.setup (); *) - run_main ~kind:`Pool (); + run_main ~kind:`Ws_pool (); run_main ~kind:`Fifo_pool () diff --git a/test/t_unfair.ml b/test/t_unfair.ml index b6dc5884..f535a450 100644 --- a/test/t_unfair.ml +++ b/test/t_unfair.ml @@ -15,32 +15,32 @@ let run ~kind () = let on_init_thread ~dom_id:_ ~t_id () = Trace.set_thread_name (Printf.sprintf "pool worker %d" t_id) and around_task = - ( (fun self -> Trace.counter_int "n_tasks" (Pool.num_tasks self)), - fun self () -> Trace.counter_int "n_tasks" (Pool.num_tasks self) ) + ( (fun self -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self)), + fun self () -> Trace.counter_int "n_tasks" (Ws_pool.num_tasks self) ) in match kind with | `Simple -> Fifo_pool.create ~min:3 ~on_init_thread ~around_task () - | `Pool -> Pool.create ~min:3 ~on_init_thread ~around_task () + | `Ws_pool -> Ws_pool.create ~min:3 ~on_init_thread ~around_task () in (* make all threads busy *) - Pool.run_async pool (sleep_for 0.01); - Pool.run_async pool (sleep_for 0.01); - Pool.run_async pool (sleep_for 0.05); + Ws_pool.run_async pool (sleep_for 0.01); + Ws_pool.run_async pool (sleep_for 0.01); + Ws_pool.run_async pool (sleep_for 0.05); let t = Unix.gettimeofday () in for _i = 1 to 100 do let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "schedule step" in - Pool.run_async pool (sleep_for 0.001); - Pool.run_async pool (sleep_for 0.001); - Pool.run_async pool (sleep_for 0.01) + Ws_pool.run_async pool (sleep_for 0.001); + Ws_pool.run_async pool (sleep_for 0.001); + Ws_pool.run_async pool (sleep_for 0.01) done; - Printf.printf "pool size: %d\n%!" (Pool.num_tasks pool); + Printf.printf "pool size: %d\n%!" (Ws_pool.num_tasks pool); (let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "shutdown" in - Pool.shutdown pool); - Printf.printf "pool size after shutdown: %d\n%!" (Pool.num_tasks pool); + Ws_pool.shutdown pool); + Printf.printf "pool size after shutdown: %d\n%!" (Ws_pool.num_tasks pool); let elapsed = Unix.gettimeofday () -. t in Printf.printf "elapsed: %.4fs\n%!" elapsed @@ -49,4 +49,4 @@ let () = let@ () = Trace_tef.with_setup () in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in run ~kind:`Simple (); - run ~kind:`Pool () + run ~kind:`Ws_pool ()