#!/usr/bin/env python
# Copyright 2020 Scott Wales
# author: Scott Wales <scott.wales@unimelb.edu.au>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NCI Specific functions and utilities
"""
import dask.distributed
import os
import tempfile
_dask_client = None
_tmpdir = None
[docs]def Client(threads=1, malloc_trim_threshold=None):
"""Start a Dask client at NCI
An appropriate client will be started for the current system
Args:
threads: Number of threads per worker process. The total number of
workers will be ncpus/threads, so that each thread gets its own
CPU
malloc_trim_threshold: Threshold for automatic memory trimming. Can be
either a string e.g. '64kib' or a number of bytes e.g. 65536.
Smaller values may reduce out of memory errors at the cost of
running slower
https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory
"""
if os.environ["HOSTNAME"].startswith("ood"):
return OODClient(threads, malloc_trim_threshold)
else:
return GadiClient(threads, malloc_trim_threshold)
[docs]def OODClient(threads=1, malloc_trim_threshold=None):
"""Start a Dask client on OOD
This function is mostly to be consistent with the Gadi version
Args:
threads: Number of threads per worker process. The total number of
workers will be ncpus/threads, so that each thread gets its own
CPU
malloc_trim_threshold: Threshold for automatic memory trimming. Can be
either a string e.g. '64kib' or a number of bytes e.g. 65536.
Smaller values may reduce out of memory errors at the cost of
running slower
https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory
"""
global _dask_client, _tmpdir
env = {}
if malloc_trim_threshold is not None:
env["MALLOC_TRIM_THRESHOLD_"] = str(
dask.utils.parse_bytes(malloc_trim_threshold)
)
if _dask_client is None:
try:
# Works in sidebar and can follow the link
dask.config.set(
{
"distributed.dashboard.link": f'/node/{os.environ["host"]}/{os.environ["port"]}/proxy/{{port}}/status'
}
)
except KeyError:
# Works in sidebar, but can't follow the link
dask.config.set({"distributed.dashboard.link": "/proxy/{port}/status"})
_dask_client = dask.distributed.Client(threads_per_worker=threads, env=env)
return _dask_client
[docs]def GadiClient(threads=1, malloc_trim_threshold=None):
"""Start a Dask client on Gadi
If run on a compute node it will check the PBS resources to know how many
CPUs and the amount of memory that is available.
If run on a login node it will ask for 2 workers each with a 1GB memory
limit
Args:
threads: Number of threads per worker process. The total number of
workers will be $PBS_NCPUS/threads, so that each thread gets its own
CPU
malloc_trim_threshold: Threshold for automatic memory trimming. Can be
either a string e.g. '64kib' or a number of bytes e.g. 65536.
Smaller values may reduce out of memory errors at the cost of
running slower
https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory
"""
global _dask_client, _tmpdir
env = {}
if malloc_trim_threshold is not None:
env["MALLOC_TRIM_THRESHOLD_"] = str(
dask.utils.parse_bytes(malloc_trim_threshold)
)
if _dask_client is None:
_tmpdir = tempfile.TemporaryDirectory("dask-worker-space")
if os.environ["HOSTNAME"].startswith("gadi-login"):
_dask_client = dask.distributed.Client(
n_workers=2,
threads_per_worker=threads,
memory_limit="1000mb",
local_directory=_tmpdir.name,
env=env,
)
else:
workers = int(os.environ["PBS_NCPUS"]) // threads
_dask_client = dask.distributed.Client(
n_workers=workers,
threads_per_worker=threads,
memory_limit=int(os.environ["PBS_VMEM"]) / workers,
local_directory=_tmpdir.name,
env=env,
)
return _dask_client