Skip to content

Commit

Permalink
rename Pool to Ws_pool; deprecated Moonpool.Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Oct 26, 2023
1 parent 30035fa commit 3e614ec
Show file tree
Hide file tree
Showing 25 changed files with 108 additions and 118 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ In addition, some concurrency and parallelism primitives are provided:

## Usage

The user can create several thread pools. These pools use regular posix threads,
but the threads are spread across multiple domains (on OCaml 5), which enables
parallelism.
The user can create several thread pools (implementing the interface `Runner.t`).
These pools use regular posix threads, but the threads are spread across
multiple domains (on OCaml 5), which enables parallelism.

The function `Pool.run_async pool task` runs `task()` on one of the workers
of `pool`, as soon as one is available. No result is returned.
The function `Runner.run_async pool task` schedules `task()` to run on one of
the workers of `pool`, as soon as one is available. No result is returned by `run_async`.

```ocaml
# #require "threads";;
# let pool = Moonpool.Pool.create ~min:4 ();;
# let pool = Moonpool.Fifo_pool.create ~min:4 ();;
val pool : Moonpool.Runner.t = <abstr>
# begin
Moonpool.Pool.run_async pool
Moonpool.Runner.run_async pool
(fun () ->
Thread.delay 0.1;
print_endline "running from the pool");
Expand All @@ -49,11 +49,13 @@ running from the pool
- : unit = ()
```

To wait until the task is done, you can use `Pool.run_wait_block` instead:
To wait until the task is done, you can use `Runner.run_wait_block`[^1] instead:

[^1]: beware of deadlock! See documentation for more details.

```ocaml
# begin
Moonpool.Pool.run_wait_block pool
Moonpool.Runner.run_wait_block pool
(fun () ->
Thread.delay 0.1;
print_endline "running from the pool");
Expand Down Expand Up @@ -155,7 +157,7 @@ val expected_sum : int = 5050

On OCaml 5, again using effect handlers, the module `Fork_join`
implements the [fork-join model](https://en.wikipedia.org/wiki/Fork%E2%80%93join_model).
It must run on a pool (using [Pool.run] or inside a future via [Future.spawn]).
It must run on a pool (using [Runner.run_async] or inside a future via [Fut.spawn]).

```ocaml
# let rec select_sort arr i len =
Expand Down Expand Up @@ -257,7 +259,7 @@ This works for OCaml >= 4.08.
the same pool, too — this is useful for threads blocking on IO).

A useful analogy is that each domain is a bit like a CPU core, and `Thread.t` is a logical thread running on a core.
Multiple threads have to share a single core and do not run in parallel on it[^1].
Multiple threads have to share a single core and do not run in parallel on it[^2].
We can therefore build pools that spread their worker threads on multiple cores to enable parallelism within each pool.

TODO: actually use https://github.com/haesbaert/ocaml-processor to pin domains to cores,
Expand All @@ -273,4 +275,4 @@ MIT license.
$ opam install moonpool
```

[^1]: let's not talk about hyperthreading.
[^2]: let's not talk about hyperthreading.
4 changes: 2 additions & 2 deletions benchs/fib_rec.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])
let create_pool ~psize ~kind () =
match kind with
| "fifo" -> Fifo_pool.create ~min:psize ()
| "pool" -> Pool.create ~min:psize ()
| "pool" -> Ws_pool.create ~min:psize ()
| _ -> assert false

let run ~psize ~n ~seq ~niter ~kind () : unit =
Expand All @@ -38,7 +38,7 @@ let run ~psize ~n ~seq ~niter ~kind () : unit =
in
Printf.printf "fib %d = %d\n%!" n res
done;
if not seq then Pool.shutdown (Lazy.force pool)
if not seq then Ws_pool.shutdown (Lazy.force pool)

let () =
let n = ref 40 in
Expand Down
10 changes: 5 additions & 5 deletions benchs/pi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ let with_pool ~kind f =
match kind with
| "pool" ->
if !j = 0 then
Pool.with_ ~per_domain:1 f
Ws_pool.with_ ~per_domain:1 f
else
Pool.with_ ~min:!j f
Ws_pool.with_ ~min:!j f
| "fifo" ->
if !j = 0 then
Fifo_pool.with_ ~per_domain:1 f
Expand All @@ -35,7 +35,7 @@ let with_pool ~kind f =
let run_par1 ~kind (num_steps : int) : float =
let@ pool = with_pool ~kind () in

let num_tasks = Pool.size pool in
let num_tasks = Ws_pool.size pool in

let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in
Expand Down Expand Up @@ -64,12 +64,12 @@ let run_par1 ~kind (num_steps : int) : float =
let run_fork_join ~kind num_steps : float =
let@ pool = with_pool ~kind () in

let num_tasks = Pool.size pool in
let num_tasks = Ws_pool.size pool in

let step = 1. /. float num_steps in
let global_sum = Lock.create 0. in

Pool.run_wait_block pool (fun () ->
Ws_pool.run_wait_block pool (fun () ->
Fork_join.for_
~chunk_size:(3 + (num_steps / num_tasks))
num_steps
Expand Down
12 changes: 6 additions & 6 deletions src/fut.ml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ let spawn ~on f : _ t =
fulfill promise res
in

Pool.run_async on task;
Runner.run_async on task;
fut

let reify_error (f : 'a t) : 'a or_error t =
Expand Down Expand Up @@ -131,7 +131,7 @@ let map ?on ~f fut : _ t =

match on with
| None -> map_and_fulfill ()
| Some on -> Pool.run_async on map_and_fulfill);
| Some on -> Runner.run_async on map_and_fulfill);

fut2

Expand All @@ -158,14 +158,14 @@ let bind ?on ~f fut : _ t =
| None -> apply_f_to_res r
| Some on ->
let fut2, promise = make () in
Pool.run_async on (bind_and_fulfill r promise);
Runner.run_async on (bind_and_fulfill r promise);
fut2)
| None ->
let fut2, promise = make () in
on_result fut (fun r ->
match on with
| None -> bind_and_fulfill r promise ()
| Some on -> Pool.run_async on (bind_and_fulfill r promise));
| Some on -> Runner.run_async on (bind_and_fulfill r promise));

fut2

Expand Down Expand Up @@ -403,7 +403,7 @@ module type INFIX = sig
end

module Infix_ (X : sig
val pool : Pool.t option
val pool : Runner.t option
end) : INFIX = struct
let[@inline] ( >|= ) x f = map ?on:X.pool ~f x
let[@inline] ( >>= ) x f = bind ?on:X.pool ~f x
Expand All @@ -420,7 +420,7 @@ end)
include Infix_local

module Infix (X : sig
val pool : Pool.t
val pool : Runner.t
end) =
Infix_ (struct
let pool = Some X.pool
Expand Down
3 changes: 2 additions & 1 deletion src/moonpool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ module Chan = Chan
module Fork_join = Fork_join
module Fut = Fut
module Lock = Lock
module Pool = Pool
module Pool = Fifo_pool
module Ws_pool = Ws_pool
module Runner = Runner
module Fifo_pool = Fifo_pool

Expand Down
6 changes: 5 additions & 1 deletion src/moonpool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@
primitives such as guarding locks ({!Lock.t}) and futures ({!Fut.t}).
*)

module Pool = Pool
module Ws_pool = Ws_pool
module Fifo_pool = Fifo_pool
module Runner = Runner

module Pool = Fifo_pool
[@@deprecated "use Fifo_pool or Ws_pool"]
(** Default pool. Please explicitly pick an implementation instead. *)

val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t
(** Similar to {!Thread.create}, but it picks a background domain at random
to run the thread. This ensures that we don't always pick the same domain
Expand Down
26 changes: 7 additions & 19 deletions src/pool.ml → src/ws_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ include Runner

let ( let@ ) = ( @@ )

type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit

type worker_state = {
mutable thread: Thread.t;
q: task WSQ.t; (** Work stealing queue *)
Expand Down Expand Up @@ -227,7 +224,6 @@ let shutdown_ ~wait (self : state) : unit =
type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
Expand All @@ -236,9 +232,8 @@ type ('a, 'b) create_args =
(** Arguments used in {!create}. See {!create} for explanations. *)

let create ?(on_init_thread = default_thread_init_exit_)
?(on_exit_thread = default_thread_init_exit_) ?(thread_wrappers = [])
?(on_exn = fun _ _ -> ()) ?around_task ?min:(min_threads = 1)
?(per_domain = 0) () : t =
?(on_exit_thread = default_thread_init_exit_) ?(on_exn = fun _ _ -> ())
?around_task ?min:(min_threads = 1) ?(per_domain = 0) () : t =
(* wrapper *)
let around_task =
match around_task with
Expand Down Expand Up @@ -294,16 +289,9 @@ let create ?(on_init_thread = default_thread_init_exit_)
on_init_thread ~dom_id:dom_idx ~t_id ();
let run () = worker_thread_ pool runner w ~on_exn ~around_task in
(* the actual worker loop is [worker_thread_], with all
wrappers for this pool and for all pools (global_thread_wrappers_) *)
let run' =
List.fold_left
(fun run f -> f ~thread ~pool:runner run)
run thread_wrappers
in
(* now run the main loop *)
Fun.protect run' ~finally:(fun () ->
Fun.protect run ~finally:(fun () ->
(* on termination, decrease refcount of underlying domain *)
D_pool_.decr_on dom_idx);
on_exit_thread ~dom_id:dom_idx ~t_id ()
Expand Down Expand Up @@ -335,11 +323,11 @@ let create ?(on_init_thread = default_thread_init_exit_)
runner
let with_ ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task
?min ?per_domain () f =
let with_ ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
() f =
let pool =
create ?on_init_thread ?on_exit_thread ?thread_wrappers ?on_exn ?around_task
?min ?per_domain ()
create ?on_init_thread ?on_exit_thread ?on_exn ?around_task ?min ?per_domain
()
in
let@ () = Fun.protect ~finally:(fun () -> shutdown pool) in
f pool
23 changes: 9 additions & 14 deletions src/pool.mli → src/ws_pool.mli
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
(** Thread pool.
(** Work-stealing thread pool.
A pool of threads. The pool contains a fixed number of threads that
wait for work items to come, process these, and loop.
A pool of threads with a worker-stealing scheduler.
The pool contains a fixed number of threads that wait for work
items to come, process these, and loop.
This is good for CPU-intensive tasks that feature a lot of small tasks.
Note that tasks will not always be processed in the order they are
scheduled, so this is not great for workloads where the latency
of individual tasks matter (for that see {!Fifo_pool}).
This implements {!Runner.t} since 0.3.
Expand All @@ -15,18 +21,9 @@

include module type of Runner

type thread_loop_wrapper =
thread:Thread.t -> pool:t -> (unit -> unit) -> unit -> unit
(** A thread wrapper [f] takes the current thread, the current pool,
and the worker function [loop : unit -> unit] which is
the worker's main loop, and returns a new loop function.
By default it just returns the same loop function but it can be used
to install tracing, effect handlers, etc. *)

type ('a, 'b) create_args =
?on_init_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?on_exit_thread:(dom_id:int -> t_id:int -> unit -> unit) ->
?thread_wrappers:thread_loop_wrapper list ->
?on_exn:(exn -> Printexc.raw_backtrace -> unit) ->
?around_task:(t -> 'b) * (t -> 'b -> unit) ->
?min:int ->
Expand All @@ -47,8 +44,6 @@ val create : (unit -> t, _) create_args
If both [min] and [per_domain] are specified, the maximum of both
[min] and [per_domain * num_of_domains] is used.
@param on_exit_thread called at the end of each thread in the pool
@param thread_wrappers a list of {!thread_loop_wrapper} functions
to use for this pool's workers.
@param around_task a pair of [before, after], where [before pool] is called
before a task is processed,
on the worker thread about to run it, and returns [x]; and [after pool x] is called by
Expand Down
6 changes: 3 additions & 3 deletions test/effect-based/t_fib1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ let fib ~on x : int Fut.t =
let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])

let fib_40 : int =
let pool = Pool.create ~min:8 () in
let pool = Ws_pool.create ~min:8 () in
fib ~on:pool 40 |> Fut.wait_block_exn

let () = Printf.printf "fib 40 = %d\n%!" fib_40

let run_test () =
let pool = Pool.create ~min:8 () in
let pool = Ws_pool.create ~min:8 () in

assert (
List.init 10 (fib ~on:pool)
Expand All @@ -42,7 +42,7 @@ let run_test () =
let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in

let res = Fut.join_array fibs |> Fut.wait_block in
Pool.shutdown pool;
Ws_pool.shutdown pool;

assert (res = Ok (Array.make 3 fib_40))

Expand Down
6 changes: 3 additions & 3 deletions test/effect-based/t_fib_fork_join.ml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ let fib ~on x : int Fut.t =
let () = assert (List.init 10 fib_direct = [ 1; 1; 2; 3; 5; 8; 13; 21; 34; 55 ])

let fib_40 : int =
let pool = Pool.create ~min:8 () in
let pool = Ws_pool.create ~min:8 () in
fib ~on:pool 40 |> Fut.wait_block_exn

let () = Printf.printf "fib 40 = %d\n%!" fib_40

let run_test () =
let pool = Pool.create ~min:8 () in
let pool = Ws_pool.create ~min:8 () in

assert (
List.init 10 (fib ~on:pool)
Expand All @@ -43,7 +43,7 @@ let run_test () =
let fibs = Array.init 3 (fun _ -> fib ~on:pool 40) in

let res = Fut.join_array fibs |> Fut.wait_block in
Pool.shutdown pool;
Ws_pool.shutdown pool;

assert (res = Ok (Array.make 3 fib_40))

Expand Down
6 changes: 3 additions & 3 deletions test/effect-based/t_fib_fork_join_all.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ let rec fib x : int =
)

let fib_40 : int =
let@ pool = Pool.with_ ~min:8 () in
let@ pool = Ws_pool.with_ ~min:8 () in
Fut.spawn ~on:pool (fun () -> fib 40) |> Fut.wait_block_exn

let () = Printf.printf "fib 40 = %d\n%!" fib_40

let run_test () =
let@ pool = Pool.with_ ~min:8 () in
let@ pool = Ws_pool.with_ ~min:8 () in

let fut =
Fut.spawn ~on:pool (fun () ->
Expand All @@ -37,7 +37,7 @@ let run_test () =
in

let res = Fut.wait_block_exn fut in
Pool.shutdown pool;
Ws_pool.shutdown pool;

assert (res = (Array.make 3 fib_40 |> Array.to_list))

Expand Down
Loading

0 comments on commit 3e614ec

Please sign in to comment.