Skip to content

Commit

Permalink
Set nanny environment variables in config (#5098)
Browse files Browse the repository at this point in the history
With the nanny we have the ability to set environment variables in dask workers.
This makes this configurable with the dask.config system.

Somewhat more controversially, this also sets a few common defaults like
MKL_NUM_THREADS, OMP_NUM_THREADS, and MALLOC_TRIM_THRESHOLD

These *may* have surprising effects, however I suspect that setting
these defaults will help far more people than it would harm.
  • Loading branch information
mrocklin authored Jul 26, 2021
1 parent 145e8aa commit b37ac9d
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 2 deletions.
5 changes: 5 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,11 @@ properties:
See https://docs.dask.org/en/latest/setup/custom-startup.html for more information
environ:
type: object
description: |
Environment variables to set on all worker processes started by nannies
client:
type: object
description: |
Expand Down
4 changes: 4 additions & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ distributed:
nanny:
preload: [] # Run custom modules with Nanny
preload-argv: [] # See https://docs.dask.org/en/latest/setup/custom-startup.html
environ:
MALLOC_TRIM_THRESHOLD_: 65536
OMP_NUM_THREADS: 1
MKL_NUM_THREADS: 1

client:
heartbeat: 5s # Interval between client heartbeats
Expand Down
8 changes: 7 additions & 1 deletion distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ def __init__(
self.death_timeout = parse_timedelta(death_timeout)

self.Worker = Worker if worker_class is None else worker_class
self.env = env or {}
self.env = dask.config.get("distributed.nanny.environ")
for k in self.env:
if k in os.environ:
self.env[k] = os.environ[k]
if env:
self.env.update(env)
self.env = {k: str(v) for k, v in self.env.items()}
self.config = config or dask.config.config
worker_kwargs.update(
{
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def test_schema_is_complete():
with open(schema_fn) as f:
schema = yaml.safe_load(f)

skip = {"default-task-durations", "bokeh-application"}
skip = {"default-task-durations", "bokeh-application", "environ"}

def test_matches(c, s):
if set(c) != set(s["properties"]):
Expand Down
15 changes: 15 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,21 @@ async def test_environment_variable(c, s):
await asyncio.gather(a.close(), b.close())


@gen_cluster(
nthreads=[],
client=True,
config={"distributed.nanny.environ": {"A": 1, "B": 2, "D": 4}},
)
async def test_environment_variable_config(c, s, monkeypatch):
monkeypatch.setenv("D", "123")
async with Nanny(s.address, env={"B": 3, "C": 4}) as n:
results = await c.run(lambda: os.environ)
assert results[n.worker_address]["A"] == "1"
assert results[n.worker_address]["B"] == "3"
assert results[n.worker_address]["C"] == "4"
assert results[n.worker_address]["D"] == "123"


@gen_cluster(nthreads=[], client=True)
async def test_data_types(c, s):
w = await Nanny(s.address, data=dict)
Expand Down

0 comments on commit b37ac9d

Please sign in to comment.