cd.mpi
This submodule contains MPI operations. For more information about MPI visit: https://mpi4py.readthedocs.io/en/stable/
Practical Examples
In the example below a server
distributes data
to workers
. The workers process each data item individually
and send results back to the server. After all data has been processed and send back, the server has all results
in correct order.
>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 2
...
... server = 0
... workers = set(range(ranks)) - {server}
...
... data = range(10)
...
... # Server
... if rank == server:
... results = mpi.serve(comm, iterator=data, ranks=workers)
... assert np.allclose(results, np.array(data) ** 2) # results are in correct order
... print(results, flush=True)
...
... # Worker
... else:
... for idx, item in mpi.query(comm, server):
... result = item ** 2 # process item
... mpi.send(comm, result, server, tag=idx) # send result to server
In the example below a server
distributes data
to workers
. The workers process each data item individually
and send results to sink
. The sink can handle results individually on demenad. In this example results are simply
collected and sorted.
>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 3
...
... server = 0
... sink = 1
... workers = set(range(ranks)) - {server, sink}
...
... data = range(10)
...
... # Server
... if rank == server:
... mpi.serve(comm, iterator=data, ranks=workers)
...
... # Sink
... elif rank == sink:
... indices, results = [], []
... for idx, item in mpi.sink(comm, ranks=workers):
... indices.append(indices), results.append(item) # handle result
... results = [x for _, x in sorted(zip(indices, results))] # sort by index
... assert np.allclose(results, np.array(data) ** 2) # results are in correct order
... print(results, flush=True)
...
... # Worker
... else:
... for idx, item in mpi.query(comm, server, forward_stop_signal=sink):
... result = item ** 2 # process item
... mpi.send(comm, result, sink, tag=idx) # send result to sink
MPI Operations
- all_filter(*a, **k)
All filter.
Filter ranks by condition.
- Parameters:
comm – MPI Comm.
condition – Boolean condition.
- Returns:
(ranks that met the condition, ranks that did not meet the condition)
- Return type:
Tuple with sets of ranks
- get_comm(comm=None, return_ranks=True)
- get_hosts(*a, **k)
- get_local_comm(*a, **k)
Get local comm.
Split
comm
by host. The returned comm is an MPI Comm object that connects ranks from the same host.- Parameters:
comm – MPI comm.
return_ranks – Whether to return local rank and the number of local ranks as well.
host – Host name. Will be determined if not provided.
node_rank – Node rank. Will be determined if not provided.
- Returns:
Local MPI Comm.
- get_num_nodes(*a, **k)
- has_mpi(check_initialized=True)
- query(*a, **k)
Query generator.
Query items from source (serving rank) until receiving StopIteration.
Examples
>>> from celldetection import mpi >>> server_rank = 0 >>> for idx, item in mpi.query(comm, server_rank): ... result = process(item) # handle item ... mpi.send(comm, result, server_rank, tag=idx) # send result to server
>>> server_rank = 0 >>> sink_rank = 1 >>> for idx, item in mpi.query(comm, server_rank, sink_rank): ... result = process(item) # handle item ... mpi.send(comm, result, sink_rank, tag=idx) # send result to sink
- Parameters:
comm – MPI Comm.
source – Source rank.
forward_stop_signal – Optional ranks that receive a StopIteration signal if query is terminated.
- recv(*a, **k)
Receive.
Receive item from
source
rank.- Parameters:
comm – MPI Comm
source – Source rank. Default: any
tag – Tag. Default: any
status – Optional
MPI.Status
object.**kwargs – Keyword arguments for comm.recv.
- Returns:
Received item and
MPI.Status
object.
- send(*a, **k)
Send.
Send
item
via MPI Commcomm
to destinationdest
.- Parameters:
comm – MPI Comm.
item – Item.
dest – Destination rank or MPI.Status object.
tag – Tag.
**kwargs – Keyword arguments for comm.send.
- serve(*a, **k)
Serve.
Serves items of iterator to ranks. Once all items have been served, ranks receive StopIteration.
Examples
>>> from celldetection import mpi ... mpi.serve(comm, iterator=range(10), ranks={1, 2, 3}) # serve iterator to ranks
- Parameters:
comm – MPI Comm.
ranks – Client ranks.
iterator – Iterator.
progress – Whether to show progress.
desc – Description, visible in progress report.
stats – Dictionary of callbacks: {stat_name: callback}
- Returns:
List of results if ranks send results, None otherwise. Results are sorted by received tags.
- sink(*a, **k)
Sink generator.
Receive items from ranks until receiving StopIteration.
Examples
>>> from celldetection import mpi ... worker_ranks = {1, 2, 3} ... for idx, item in mpi.sink(comm, ranks=worker_ranks): # receive items from worker_ranks ... pass # handle item
- Parameters:
comm – MPI Comm.
ranks – Source ranks. All sources have to report StopIteration to close sink.