Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6f9b850
Re-implement parallel sync using the NBX algorithm.
friedmud Aug 19, 2018
54b6b9a
Add get_unique_tag autoselection mechanism
roystgnr Dec 4, 2018
b2d3819
Finish implementation of NBX and use it for both push and pull
friedmud Nov 21, 2018
806c539
Use automatic get_unique_tag where it makes sense
roystgnr Dec 4, 2018
e569581
Verify and comment on automatic get_unique_tag()
roystgnr Dec 4, 2018
824ad1c
Move possibly_receive() to MPI-independent section
roystgnr Dec 5, 2018
046cbb3
Add header to get inline Comm::max(int)
roystgnr Dec 5, 2018
df1cccb
Unit tests for MessageTag get_unique_tag()
roystgnr Dec 6, 2018
8dc93c9
Run MessageTag unit tests
roystgnr Dec 6, 2018
6453fe1
Start adding unit tests for parallel_sync.h
roystgnr Jan 10, 2019
cdb0744
Build parallel_sync_test.C
roystgnr Jan 10, 2019
877da9b
Re-bootstrap
roystgnr Jan 24, 2019
a1f0524
Assert processor ids are in-range
roystgnr Jan 10, 2019
37ac473
Add unit test for M->M pull syncs
roystgnr Jan 10, 2019
f13e778
Add push-vectors-of-vectors unit test
roystgnr Jan 10, 2019
666a099
Add pull-vector-of-vectors parallel_sync test
roystgnr Jan 10, 2019
2a96106
Allow parallel_sync push to handle M > N
roystgnr Jan 10, 2019
d94412d
parallel_sync unit tests for oversized data
roystgnr Jan 10, 2019
ce8d5a4
Support for push_parallel(multimap<vector>)
roystgnr Jan 11, 2019
dece6fb
Unit tests for push_parallel(multimap<vector>)
roystgnr Jan 11, 2019
fc87a15
Unit tests for push(multimap<vec<vec>>)
roystgnr Jan 24, 2019
f988717
multimap use/support in pull_parallel_foo
roystgnr Jan 29, 2019
3569dff
Oversized parallel vec<scalar> pulls work now
roystgnr Jan 31, 2019
2593b79
Fix PullVecVec test, remove unused variables
roystgnr Feb 1, 2019
b895125
Enable PullVecVec test
roystgnr Feb 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 56 additions & 7 deletions include/parallel/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,22 @@ class Communicator
const communicator & get() const { return _communicator; }

/**
* Get a tag that is unique to this Communicator.
* Get a tag that is unique to this Communicator. A requested tag
* value may be provided. If no request is made then an automatic
* unique tag value will be generated; such usage of
* get_unique_tag() must be done on every processor in a consistent
* order.
*
* \note If people are also using magic numbers or copying
* communicators around then we can't guarantee the tag is unique to
* this MPI_Comm.
* raw communicators around then we can't guarantee the tag is
* unique to this MPI_Comm.
*
* \note Leaving \p tagvalue unspecified is recommended in most
* cases. Manually selecting tag values is dangerous, as tag values may be
* freed and reselected earlier than expected in asynchronous
* communication algorithms.
*/
MessageTag get_unique_tag(int tagvalue) const;
MessageTag get_unique_tag(int tagvalue = MessageTag::invalid_tag) const;

/**
* Reference an already-acquired tag, so that we know it will
Expand Down Expand Up @@ -191,10 +200,15 @@ class Communicator
processor_id_type _rank, _size;
SendMode _send_mode;

// mutable used_tag_values - not thread-safe, but then Parallel::
// isn't thread-safe in general.
// mutable used_tag_values and tag_queue - not thread-safe, but then
// Parallel:: isn't thread-safe in general.
mutable std::map<int, unsigned int> used_tag_values;
bool _I_duped_it;
mutable int _next_tag;

int _max_tag;

// Keep track of duplicate/split operations so we know when to free
bool _I_duped_it;

// Communication operations:
public:
Expand All @@ -214,6 +228,11 @@ class Communicator
*/
void barrier () const;

/**
* Start a barrier that doesn't block
*/
void nonblocking_barrier (Request & req) const;

/**
* Verify that a local variable has the same value on all processors.
* Containers must have the same value in every entry.
Expand Down Expand Up @@ -413,6 +432,36 @@ class Communicator
Request & req,
const MessageTag & tag=any_tag) const;

/**
* Nonblocking-receive from one processor with user-defined type.
*
* Checks to see if a message can be received from the
* src_processor_id . If so, it starts a non-blocking
* receive using the passed in request and returns true
*
* Otherwise - if there is no message to receive it returns false
*
* Note: The buf does NOT need to properly sized before this call
* this will resize the buffer automatically
*
* If \p T is a container, container-of-containers, etc., then
* \p type should be the DataType of the underlying fixed-size
* entries in the container(s).
*
* @param src_processor_id The pid to receive from or "any".
* will be set to the actual src being receieved from
* @param buf THe buffer to receive into
* @param type The intrinsic datatype to receive
* @param req The request to use
* @param tag The tag to use
*/
template <typename T, typename A>
bool possibly_receive (unsigned int & src_processor_id,
std::vector<T,A> & buf,
const DataType & type,
Request & req,
const MessageTag & tag) const;

/**
* Blocking-send range-of-pointers to one processor. This
* function does not send the raw pointers, but rather constructs
Expand Down
95 changes: 95 additions & 0 deletions include/parallel/parallel_implementation.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ inline status Communicator::probe (const unsigned int src_processor_id,

status stat;

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi
(MPI_Probe (src_processor_id, tag.value(), this->get(), &stat));

Expand All @@ -197,6 +200,9 @@ inline Status Communicator::packed_range_probe (const unsigned int src_processor

int int_flag;

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi(MPI_Iprobe(src_processor_id,
tag.value(),
this->get(),
Expand All @@ -218,6 +224,8 @@ inline void Communicator::send (const unsigned int dest_processor_id,

T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());

libmesh_assert_less(dest_processor_id, this->size());

libmesh_call_mpi
(((this->send_mode() == SYNCHRONOUS) ?
MPI_Ssend : MPI_Send) (dataptr,
Expand All @@ -240,6 +248,8 @@ inline void Communicator::send (const unsigned int dest_processor_id,

T * dataptr = buf.empty() ? nullptr : const_cast<T *>(buf.data());

libmesh_assert_less(dest_processor_id, this->size());

libmesh_call_mpi
(((this->send_mode() == SYNCHRONOUS) ?
MPI_Issend : MPI_Isend) (dataptr,
Expand All @@ -266,6 +276,8 @@ inline void Communicator::send (const unsigned int dest_processor_id,

T * dataptr = const_cast<T*> (&buf);

libmesh_assert_less(dest_processor_id, this->size());

libmesh_call_mpi
(((this->send_mode() == SYNCHRONOUS) ?
MPI_Ssend : MPI_Send) (dataptr,
Expand All @@ -288,6 +300,8 @@ inline void Communicator::send (const unsigned int dest_processor_id,

T * dataptr = const_cast<T*>(&buf);

libmesh_assert_less(dest_processor_id, this->size());

libmesh_call_mpi
(((this->send_mode() == SYNCHRONOUS) ?
MPI_Issend : MPI_Isend) (dataptr,
Expand Down Expand Up @@ -417,6 +431,8 @@ inline void Communicator::send (const unsigned int dest_processor_id,
{
LOG_SCOPE("send()", "Parallel");

libmesh_assert_less(dest_processor_id, this->size());

libmesh_call_mpi
(((this->send_mode() == SYNCHRONOUS) ?
MPI_Issend : MPI_Isend) (buf.empty() ? nullptr : const_cast<T*>(buf.data()),
Expand Down Expand Up @@ -778,6 +794,9 @@ inline Status Communicator::receive (const unsigned int src_processor_id,
// datatype so we can later query the size
Status stat(this->probe(src_processor_id, tag), StandardType<T>(&buf));

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi
(MPI_Recv (&buf, 1, StandardType<T>(&buf), src_processor_id,
tag.value(), this->get(), stat.get()));
Expand All @@ -795,6 +814,9 @@ inline void Communicator::receive (const unsigned int src_processor_id,
{
LOG_SCOPE("receive()", "Parallel");

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi
(MPI_Irecv (&buf, 1, StandardType<T>(&buf), src_processor_id,
tag.value(), this->get(), req.get()));
Expand Down Expand Up @@ -929,6 +951,9 @@ inline Status Communicator::receive (const unsigned int src_processor_id,

buf.resize(stat.size());

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

// Use stat.source() and stat.tag() in the receive - if
// src_processor_id is or tag is "any" then we want to be sure we
// try to receive the same message we just probed.
Expand All @@ -953,6 +978,9 @@ inline void Communicator::receive (const unsigned int src_processor_id,
{
LOG_SCOPE("receive()", "Parallel");

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi
(MPI_Irecv (buf.empty() ? nullptr : buf.data(),
cast_int<int>(buf.size()), type, src_processor_id,
Expand Down Expand Up @@ -1191,6 +1219,10 @@ inline void Communicator::send_receive(const unsigned int dest_processor_id,
return;
}

libmesh_assert_less(dest_processor_id, this->size());
libmesh_assert(source_processor_id < this->size() ||
source_processor_id == any_source);

// MPI_STATUS_IGNORE is from MPI-2; using it with some versions of
// MPICH may cause a crash:
// https://bugzilla.mcs.anl.gov/globus/show_bug.cgi?id=1798
Expand Down Expand Up @@ -1448,6 +1480,8 @@ inline void Communicator::broadcast (bool & data, const unsigned int root_id) co
// MPI::BOOL available
char char_data = data;

libmesh_assert_less(root_id, this->size());

// Spread data to remote processors.
libmesh_call_mpi
(MPI_Bcast (&char_data, 1, StandardType<char>(&char_data),
Expand Down Expand Up @@ -1515,6 +1549,8 @@ inline void Communicator::broadcast (std::vector<T,A> & data,
// Pass nullptr if our vector is empty.
T * data_ptr = data.empty() ? nullptr : data.data();

libmesh_assert_less(root_id, this->size());

libmesh_call_mpi
(MPI_Bcast (data_ptr, cast_int<int>(data.size()),
StandardType<T>(data_ptr), root_id, this->get()));
Expand Down Expand Up @@ -2618,6 +2654,8 @@ inline void Communicator::gather(const unsigned int root_id,

StandardType<T> send_type(&sendval);

libmesh_assert_less(root_id, this->size());

libmesh_call_mpi
(MPI_Gather(const_cast<T*>(&sendval), 1, send_type,
recv.empty() ? nullptr : recv.data(), 1, send_type,
Expand Down Expand Up @@ -2672,6 +2710,8 @@ inline void Communicator::gather(const unsigned int root_id,
if (root_id == this->rank())
r.resize(globalsize);

libmesh_assert_less(root_id, this->size());

// and get the data from the remote processors
libmesh_call_mpi
(MPI_Gatherv (r_src.empty() ? nullptr : r_src.data(), mysize,
Expand Down Expand Up @@ -2723,6 +2763,8 @@ inline void Communicator::gather(const unsigned int root_id,
if (this->rank() == root_id)
r.resize(globalsize, 0);

libmesh_assert_less(root_id, this->size());

// and get the data from the remote processors.
libmesh_call_mpi
(MPI_Gatherv (const_cast<T*>(sendval.data()),
Expand Down Expand Up @@ -2942,6 +2984,8 @@ void Communicator::scatter(const std::vector<T,A> & data,
T * data_ptr = const_cast<T*>(data.empty() ? nullptr : data.data());
libmesh_ignore(data_ptr); // unused ifndef LIBMESH_HAVE_MPI

libmesh_assert_less(root_id, this->size());

libmesh_call_mpi
(MPI_Scatter (data_ptr, 1, StandardType<T>(data_ptr),
&recv, 1, StandardType<T>(&recv), root_id, this->get()));
Expand Down Expand Up @@ -2980,6 +3024,8 @@ void Communicator::scatter(const std::vector<T,A> & data,
T * recv_ptr = recv.empty() ? nullptr : recv.data();
libmesh_ignore(data_ptr, recv_ptr); // unused ifndef LIBMESH_HAVE_MPI

libmesh_assert_less(root_id, this->size());

libmesh_call_mpi
(MPI_Scatter (data_ptr, recv_buffer_size, StandardType<T>(data_ptr),
recv_ptr, recv_buffer_size, StandardType<T>(recv_ptr), root_id, this->get()));
Expand Down Expand Up @@ -3032,6 +3078,8 @@ void Communicator::scatter(const std::vector<T,A1> & data,
T * recv_ptr = recv.empty() ? nullptr : recv.data();
libmesh_ignore(data_ptr, count_ptr, recv_ptr); // unused ifndef LIBMESH_HAVE_MPI

libmesh_assert_less(root_id, this->size());

// Scatter the non-uniform chunks
libmesh_call_mpi
(MPI_Scatterv (data_ptr, count_ptr, displacements.data(), StandardType<T>(data_ptr),
Expand Down Expand Up @@ -3206,6 +3254,53 @@ inline void Communicator::allgather_packed_range(Context * context,
}



template <typename T, typename A>
inline bool Communicator::possibly_receive (unsigned int & src_processor_id,
std::vector<T,A> & buf,
const DataType & type,
Request & req,
const MessageTag & tag) const
{
LOG_SCOPE("possibly_receive()", "Parallel");

Status stat(type);

int int_flag = 0;

libmesh_assert(src_processor_id < this->size() ||
src_processor_id == any_source);

libmesh_call_mpi(MPI_Iprobe(src_processor_id,
tag.value(),
this->get(),
&int_flag,
stat.get()));

if (int_flag)
{
buf.resize(stat.size());

src_processor_id = stat.source();

libmesh_call_mpi
(MPI_Irecv (buf.data(),
cast_int<int>(buf.size()),
type,
src_processor_id,
tag.value(),
this->get(),
req.get()));

// The MessageTag should stay registered for the Request lifetime
req.add_post_wait_work
(new Parallel::PostWaitDereferenceTag(tag));
}

return int_flag;
}


} // namespace Parallel

} // namespace libMesh
Expand Down
Loading