Skip to content

openstb.simulator.cluster.dask_local

Classes:

Name Description
DaskLocalCluster

A Dask cluster running on the local machine.

DaskLocalCluster

DaskLocalCluster(workers, worker_memory=None, total_memory=None, security=True, dashboard_address=None)

Bases: DaskCluster

A Dask cluster running on the local machine.

Parameters:

Name Type Description Default
workers (int, float)

Number of workers to add to the cluster. If a float, this is interpreted as a fraction of the available CPUs. If -1, use all available CPUs. Any other value is taken to be the number of CPUs to use.

required
worker_memory (int, float)

The desired memory limit for the cluster. This can be specified per worker or for all workers; only one may be given (and one must be given). A float indicates a fraction of the total system memory, and an integer the number of bytes. Note that this limit is enforced on a best-effort basis and some workers may exceed it.

None
total_memory (int, float)

The desired memory limit for the cluster. This can be specified per worker or for all workers; only one may be given (and one must be given). A float indicates a fraction of the total system memory, and an integer the number of bytes. Note that this limit is enforced on a best-effort basis and some workers may exceed it.

None
security boolean

If True, self-signed temporary credentials are used to secure communications within the cluster. If False, communications are unencrypted.

True
dashboard_address str

Address the Dask diagnostic dashboard server will listen on, e.g., "localhost:8787" or "0.0.0.0:8787". If not given, the server will be disabled. Note that the logs may still print a dashboard address if disabled, but there will be nothing at that address. This is a known bug in dask.distributed: https://github.com/dask/distributed/issues/7994

None
Source code in openstb/simulator/cluster/dask_local.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    workers: int | float,
    worker_memory: int | float | None = None,
    total_memory: int | float | None = None,
    security: bool = True,
    dashboard_address: str | None = None,
):
    """
    Parameters
    ----------
    workers : int, float
        Number of workers to add to the cluster. If a float, this is interpreted as
        a fraction of the available CPUs. If -1, use all available CPUs. Any other
        value is taken to be the number of CPUs to use.
    worker_memory, total_memory : int, float
        The desired memory limit for the cluster. This can be specified per worker
        or for all workers; only one may be given (and one must be given). A float
        indicates a fraction of the total system memory, and an integer the number
        of bytes. Note that this limit is enforced on a best-effort basis and some
        workers may exceed it.
    security : boolean
        If True, self-signed temporary credentials are used to secure communications
        within the cluster. If False, communications are unencrypted.
    dashboard_address : str, optional
        Address the Dask diagnostic dashboard server will listen on, e.g.,
        "localhost:8787" or "0.0.0.0:8787". If not given, the server will be
        disabled. Note that the logs may still print a dashboard address if
        disabled, but there will be nothing at that address. This is a known bug in
        dask.distributed: https://github.com/dask/distributed/issues/7994

    """
    if isinstance(workers, float):
        self.workers = int(np.fix(workers * CPU_COUNT))
    elif workers == -1:
        self.workers = CPU_COUNT
    else:
        self.workers = workers

    # The distributed LocalCluster takes memory per worker as an integer number of
    # bytes. Convert our inputs.
    if worker_memory is not None and total_memory is not None:
        raise ValueError(
            _("only one of worker_memory and total_memory can be given")
        )
    elif worker_memory is not None:
        if isinstance(worker_memory, float):
            worker_memory = int(np.fix(worker_memory * MEMORY_LIMIT))
        self.memory = worker_memory
    elif total_memory is not None:
        if isinstance(total_memory, float):
            total_memory = int(np.fix(total_memory * MEMORY_LIMIT))
        self.memory = total_memory // self.workers
    else:
        raise ValueError(_("worker_memory or total_memory must be given"))

    self.security = security
    self.dashboard_address = dashboard_address

    self._cluster: distributed.LocalCluster | None = None
    self._client: distributed.Client | None = None