cd.mpi
This submodule contains MPI operations. For more information about MPI visit: https://mpi4py.readthedocs.io/en/stable/
Practical Examples
In the example below a server
distributes data
to workers
. The workers process each data item individually
and send results back to the server. After all data has been processed and send back, the server has all results
in correct order.
>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 2
...
... server = 0
... workers = set(range(ranks)) - {server}
...
... data = range(10)
...
... # Server
... if rank == server:
... results = mpi.serve(comm, iterator=data, ranks=workers)
... assert np.allclose(results, np.array(data) ** 2) # results are in correct order
... print(results, flush=True)
...
... # Worker
... else:
... for idx, item in mpi.query(comm, server):
... result = item ** 2 # process item
... mpi.send(comm, result, server, tag=idx) # send result to server
In the example below a server
distributes data
to workers
. The workers process each data item individually
and send results to sink
. The sink can handle results individually on demenad. In this example results are simply
collected and sorted.
>>> from celldetection import mpi
... import numpy as np
...
... comm, rank, ranks = mpi.get_comm(return_ranks=True)
... assert ranks >= 3
...
... server = 0
... sink = 1
... workers = set(range(ranks)) - {server, sink}
...
... data = range(10)
...
... # Server
... if rank == server:
... mpi.serve(comm, iterator=data, ranks=workers)
...
... # Sink
... elif rank == sink:
... indices, results = [], []
... for idx, item in mpi.sink(comm, ranks=workers):
... indices.append(indices), results.append(item) # handle result
... results = [x for _, x in sorted(zip(indices, results))] # sort by index
... assert np.allclose(results, np.array(data) ** 2) # results are in correct order
... print(results, flush=True)
...
... # Worker
... else:
... for idx, item in mpi.query(comm, server, forward_stop_signal=sink):
... result = item ** 2 # process item
... mpi.send(comm, result, sink, tag=idx) # send result to sink
MPI Operations
- all_filter(*a, **k)
All filter.
Filter ranks by condition.
- Parameters
comm – MPI Comm.
condition – Boolean condition.
- Returns
(ranks that met the condition, ranks that did not meet the condition)
- Return type
Tuple with sets of ranks
- get_comm(*a, **k)
- get_hosts(*a, **k)
- get_local_comm(*a, **k)
Get local comm.
Split
comm
by host. The returned comm is an MPI Comm object that connects ranks from the same host.- Parameters
comm – MPI comm.
return_ranks – Whether to return local rank and the number of local ranks as well.
host – Host name. Will be determined if not provided.
node_rank – Node rank. Will be determined if not provided.
- Returns
Local MPI Comm.
- query(*a, **k)
Query generator.
Query items from source (serving rank) until receiving StopIteration.
Examples
>>> from celldetection import mpi >>> server_rank = 0 >>> for idx, item in mpi.query(comm, server_rank): ... result = process(item) # handle item ... mpi.send(comm, result, server_rank, tag=idx) # send result to server
>>> server_rank = 0 >>> sink_rank = 1 >>> for idx, item in mpi.query(comm, server_rank, sink_rank): ... result = process(item) # handle item ... mpi.send(comm, result, sink_rank, tag=idx) # send result to sink
- Parameters
comm – MPI Comm.
source – Source rank.
forward_stop_signal – Optional ranks that receive a StopIteration signal if query is terminated.
- recv(*a, **k)
Receive.
Receive item from
source
rank.- Parameters
comm – MPI Comm
source – Source rank. Default: any
tag – Tag. Default: any
status – Optional
MPI.Status
object.**kwargs – Keyword arguments for comm.recv.
- Returns
Received item and
MPI.Status
object.
- send(*a, **k)
Send.
Send
item
via MPI Commcomm
to destinationdest
.- Parameters
comm – MPI Comm.
item – Item.
dest – Destination rank or MPI.Status object.
tag – Tag.
**kwargs – Keyword arguments for comm.send.
- serve(*a, **k)
Serve.
Serves items of iterator to ranks. Once all items have been served, ranks receive StopIteration.
Examples
>>> from celldetection import mpi ... mpi.serve(comm, iterator=range(10), ranks={1, 2, 3}) # serve iterator to ranks
- Parameters
comm – MPI Comm.
ranks – Client ranks.
iterator – Iterator.
progress – Whether to show progress.
desc – Description, visible in progress report.
stats – Dictionary of callbacks: {stat_name: callback}
- Returns
List of results if ranks send results, None otherwise. Results are sorted by received tags.
- sink(*a, **k)
Sink generator.
Receive items from ranks until receiving StopIteration.
Examples
>>> from celldetection import mpi ... worker_ranks = {1, 2, 3} ... for idx, item in mpi.sink(comm, ranks=worker_ranks): # receive items from worker_ranks ... pass # handle item
- Parameters
comm – MPI Comm.
ranks – Source ranks. All sources have to report StopIteration to close sink.