cd.mpi

This submodule contains MPI operations. For more information about MPI visit: https://mpi4py.readthedocs.io/en/stable/

Practical Examples

In the example below a server distributes data to workers. The workers process each data item individually and send results back to the server. After all data has been processed and send back, the server has all results in correct order.

>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 2
...
... server = 0
... workers = set(range(ranks)) - {server}
...
... data = range(10)
...
... # Server
... if rank == server:
...     results = mpi.serve(comm, iterator=data, ranks=workers)
...     assert np.allclose(results, np.array(data) ** 2)  # results are in correct order
...     print(results, flush=True)
...
... # Worker
... else:
...     for idx, item in mpi.query(comm, server):
...         result = item ** 2  # process item
...         mpi.send(comm, result, server, tag=idx)  # send result to server

In the example below a server distributes data to workers. The workers process each data item individually and send results to sink. The sink can handle results individually on demenad. In this example results are simply collected and sorted.

>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 3
...
... server = 0
... sink = 1
... workers = set(range(ranks)) - {server, sink}
...
... data = range(10)
...
... # Server
... if rank == server:
...     mpi.serve(comm, iterator=data, ranks=workers)
...
... # Sink
... elif rank == sink:
...     indices, results = [], []
...     for idx, item in mpi.sink(comm, ranks=workers):
...         indices.append(indices), results.append(item)  # handle result
...     results = [x for _, x in sorted(zip(indices, results))]  # sort by index
...     assert np.allclose(results, np.array(data) ** 2)  # results are in correct order
...     print(results, flush=True)
...
... # Worker
... else:
...     for idx, item in mpi.query(comm, server, forward_stop_signal=sink):
...         result = item ** 2  # process item
...         mpi.send(comm, result, sink, tag=idx)  # send result to sink

MPI Operations

all_filter(*a, **k)

All filter.

Filter ranks by condition.

Parameters:
  • comm – MPI Comm.

  • condition – Boolean condition.

Returns:

(ranks that met the condition, ranks that did not meet the condition)

Return type:

Tuple with sets of ranks

get_comm(comm=None, return_ranks=True)
get_hosts(*a, **k)
get_local_comm(*a, **k)

Get local comm.

Split comm by host. The returned comm is an MPI Comm object that connects ranks from the same host.

Parameters:
  • comm – MPI comm.

  • return_ranks – Whether to return local rank and the number of local ranks as well.

  • host – Host name. Will be determined if not provided.

  • node_rank – Node rank. Will be determined if not provided.

Returns:

Local MPI Comm.

get_num_nodes(*a, **k)
has_mpi(check_initialized=True)
query(*a, **k)

Query generator.

Query items from source (serving rank) until receiving StopIteration.

Examples

>>> from celldetection import mpi
>>> server_rank = 0
>>> for idx, item in mpi.query(comm, server_rank):
...     result = process(item)  # handle item
...     mpi.send(comm, result, server_rank, tag=idx)  # send result to server
>>> server_rank = 0
>>> sink_rank = 1
>>> for idx, item in mpi.query(comm, server_rank, sink_rank):
...     result = process(item)  # handle item
...     mpi.send(comm, result, sink_rank, tag=idx)  # send result to sink
Parameters:
  • comm – MPI Comm.

  • source – Source rank.

  • forward_stop_signal – Optional ranks that receive a StopIteration signal if query is terminated.

recv(*a, **k)

Receive.

Receive item from source rank.

Parameters:
  • comm – MPI Comm

  • source – Source rank. Default: any

  • tag – Tag. Default: any

  • status – Optional MPI.Status object.

  • **kwargs – Keyword arguments for comm.recv.

Returns:

Received item and MPI.Status object.

send(*a, **k)

Send.

Send item via MPI Comm comm to destination dest.

Parameters:
  • comm – MPI Comm.

  • item – Item.

  • dest – Destination rank or MPI.Status object.

  • tag – Tag.

  • **kwargs – Keyword arguments for comm.send.

serve(*a, **k)

Serve.

Serves items of iterator to ranks. Once all items have been served, ranks receive StopIteration.

Examples

>>> from celldetection import mpi
... mpi.serve(comm, iterator=range(10), ranks={1, 2, 3})  # serve iterator to ranks
Parameters:
  • comm – MPI Comm.

  • ranks – Client ranks.

  • iterator – Iterator.

  • progress – Whether to show progress.

  • desc – Description, visible in progress report.

  • stats – Dictionary of callbacks: {stat_name: callback}

Returns:

List of results if ranks send results, None otherwise. Results are sorted by received tags.

sink(*a, **k)

Sink generator.

Receive items from ranks until receiving StopIteration.

Examples

>>> from celldetection import mpi
... worker_ranks = {1, 2, 3}
... for idx, item in mpi.sink(comm, ranks=worker_ranks):  # receive items from worker_ranks
...     pass  # handle item
Parameters:
  • comm – MPI Comm.

  • ranks – Source ranks. All sources have to report StopIteration to close sink.