diff --git a/include/parallel/communicator.h b/include/parallel/communicator.h index 99003ba9363..f3f4b27c527 100644 --- a/include/parallel/communicator.h +++ b/include/parallel/communicator.h @@ -143,13 +143,22 @@ class Communicator const communicator & get() const { return _communicator; } /** - * Get a tag that is unique to this Communicator. + * Get a tag that is unique to this Communicator. A requested tag + * value may be provided. If no request is made then an automatic + * unique tag value will be generated; such usage of + * get_unique_tag() must be done on every processor in a consistent + * order. * * \note If people are also using magic numbers or copying - * communicators around then we can't guarantee the tag is unique to - * this MPI_Comm. + * raw communicators around then we can't guarantee the tag is + * unique to this MPI_Comm. + * + * \note Leaving \p tagvalue unspecified is recommended in most + * cases. Manually selecting tag values is dangerous, as tag values may be + * freed and reselected earlier than expected in asynchronous + * communication algorithms. */ - MessageTag get_unique_tag(int tagvalue) const; + MessageTag get_unique_tag(int tagvalue = MessageTag::invalid_tag) const; /** * Reference an already-acquired tag, so that we know it will @@ -191,10 +200,15 @@ class Communicator processor_id_type _rank, _size; SendMode _send_mode; - // mutable used_tag_values - not thread-safe, but then Parallel:: - // isn't thread-safe in general. + // mutable used_tag_values and tag_queue - not thread-safe, but then + // Parallel:: isn't thread-safe in general. mutable std::map used_tag_values; - bool _I_duped_it; + mutable int _next_tag; + + int _max_tag; + + // Keep track of duplicate/split operations so we know when to free + bool _I_duped_it; // Communication operations: public: @@ -214,6 +228,11 @@ class Communicator */ void barrier () const; + /** + * Start a barrier that doesn't block + */ + void nonblocking_barrier (Request & req) const; + /** * Verify that a local variable has the same value on all processors. * Containers must have the same value in every entry. @@ -413,6 +432,36 @@ class Communicator Request & req, const MessageTag & tag=any_tag) const; + /** + * Nonblocking-receive from one processor with user-defined type. + * + * Checks to see if a message can be received from the + * src_processor_id . If so, it starts a non-blocking + * receive using the passed in request and returns true + * + * Otherwise - if there is no message to receive it returns false + * + * Note: The buf does NOT need to properly sized before this call + * this will resize the buffer automatically + * + * If \p T is a container, container-of-containers, etc., then + * \p type should be the DataType of the underlying fixed-size + * entries in the container(s). + * + * @param src_processor_id The pid to receive from or "any". + * will be set to the actual src being receieved from + * @param buf THe buffer to receive into + * @param type The intrinsic datatype to receive + * @param req The request to use + * @param tag The tag to use + */ + template + bool possibly_receive (unsigned int & src_processor_id, + std::vector & buf, + const DataType & type, + Request & req, + const MessageTag & tag) const; + /** * Blocking-send range-of-pointers to one processor. This * function does not send the raw pointers, but rather constructs diff --git a/include/parallel/parallel_implementation.h b/include/parallel/parallel_implementation.h index fd148ed2140..b6cca898de7 100644 --- a/include/parallel/parallel_implementation.h +++ b/include/parallel/parallel_implementation.h @@ -178,6 +178,9 @@ inline status Communicator::probe (const unsigned int src_processor_id, status stat; + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + libmesh_call_mpi (MPI_Probe (src_processor_id, tag.value(), this->get(), &stat)); @@ -197,6 +200,9 @@ inline Status Communicator::packed_range_probe (const unsigned int src_processor int int_flag; + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + libmesh_call_mpi(MPI_Iprobe(src_processor_id, tag.value(), this->get(), @@ -218,6 +224,8 @@ inline void Communicator::send (const unsigned int dest_processor_id, T * dataptr = buf.empty() ? nullptr : const_cast(buf.data()); + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_call_mpi (((this->send_mode() == SYNCHRONOUS) ? MPI_Ssend : MPI_Send) (dataptr, @@ -240,6 +248,8 @@ inline void Communicator::send (const unsigned int dest_processor_id, T * dataptr = buf.empty() ? nullptr : const_cast(buf.data()); + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_call_mpi (((this->send_mode() == SYNCHRONOUS) ? MPI_Issend : MPI_Isend) (dataptr, @@ -266,6 +276,8 @@ inline void Communicator::send (const unsigned int dest_processor_id, T * dataptr = const_cast (&buf); + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_call_mpi (((this->send_mode() == SYNCHRONOUS) ? MPI_Ssend : MPI_Send) (dataptr, @@ -288,6 +300,8 @@ inline void Communicator::send (const unsigned int dest_processor_id, T * dataptr = const_cast(&buf); + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_call_mpi (((this->send_mode() == SYNCHRONOUS) ? MPI_Issend : MPI_Isend) (dataptr, @@ -417,6 +431,8 @@ inline void Communicator::send (const unsigned int dest_processor_id, { LOG_SCOPE("send()", "Parallel"); + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_call_mpi (((this->send_mode() == SYNCHRONOUS) ? MPI_Issend : MPI_Isend) (buf.empty() ? nullptr : const_cast(buf.data()), @@ -778,6 +794,9 @@ inline Status Communicator::receive (const unsigned int src_processor_id, // datatype so we can later query the size Status stat(this->probe(src_processor_id, tag), StandardType(&buf)); + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + libmesh_call_mpi (MPI_Recv (&buf, 1, StandardType(&buf), src_processor_id, tag.value(), this->get(), stat.get())); @@ -795,6 +814,9 @@ inline void Communicator::receive (const unsigned int src_processor_id, { LOG_SCOPE("receive()", "Parallel"); + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + libmesh_call_mpi (MPI_Irecv (&buf, 1, StandardType(&buf), src_processor_id, tag.value(), this->get(), req.get())); @@ -929,6 +951,9 @@ inline Status Communicator::receive (const unsigned int src_processor_id, buf.resize(stat.size()); + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + // Use stat.source() and stat.tag() in the receive - if // src_processor_id is or tag is "any" then we want to be sure we // try to receive the same message we just probed. @@ -953,6 +978,9 @@ inline void Communicator::receive (const unsigned int src_processor_id, { LOG_SCOPE("receive()", "Parallel"); + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + libmesh_call_mpi (MPI_Irecv (buf.empty() ? nullptr : buf.data(), cast_int(buf.size()), type, src_processor_id, @@ -1191,6 +1219,10 @@ inline void Communicator::send_receive(const unsigned int dest_processor_id, return; } + libmesh_assert_less(dest_processor_id, this->size()); + libmesh_assert(source_processor_id < this->size() || + source_processor_id == any_source); + // MPI_STATUS_IGNORE is from MPI-2; using it with some versions of // MPICH may cause a crash: // https://bugzilla.mcs.anl.gov/globus/show_bug.cgi?id=1798 @@ -1448,6 +1480,8 @@ inline void Communicator::broadcast (bool & data, const unsigned int root_id) co // MPI::BOOL available char char_data = data; + libmesh_assert_less(root_id, this->size()); + // Spread data to remote processors. libmesh_call_mpi (MPI_Bcast (&char_data, 1, StandardType(&char_data), @@ -1515,6 +1549,8 @@ inline void Communicator::broadcast (std::vector & data, // Pass nullptr if our vector is empty. T * data_ptr = data.empty() ? nullptr : data.data(); + libmesh_assert_less(root_id, this->size()); + libmesh_call_mpi (MPI_Bcast (data_ptr, cast_int(data.size()), StandardType(data_ptr), root_id, this->get())); @@ -2618,6 +2654,8 @@ inline void Communicator::gather(const unsigned int root_id, StandardType send_type(&sendval); + libmesh_assert_less(root_id, this->size()); + libmesh_call_mpi (MPI_Gather(const_cast(&sendval), 1, send_type, recv.empty() ? nullptr : recv.data(), 1, send_type, @@ -2672,6 +2710,8 @@ inline void Communicator::gather(const unsigned int root_id, if (root_id == this->rank()) r.resize(globalsize); + libmesh_assert_less(root_id, this->size()); + // and get the data from the remote processors libmesh_call_mpi (MPI_Gatherv (r_src.empty() ? nullptr : r_src.data(), mysize, @@ -2723,6 +2763,8 @@ inline void Communicator::gather(const unsigned int root_id, if (this->rank() == root_id) r.resize(globalsize, 0); + libmesh_assert_less(root_id, this->size()); + // and get the data from the remote processors. libmesh_call_mpi (MPI_Gatherv (const_cast(sendval.data()), @@ -2942,6 +2984,8 @@ void Communicator::scatter(const std::vector & data, T * data_ptr = const_cast(data.empty() ? nullptr : data.data()); libmesh_ignore(data_ptr); // unused ifndef LIBMESH_HAVE_MPI + libmesh_assert_less(root_id, this->size()); + libmesh_call_mpi (MPI_Scatter (data_ptr, 1, StandardType(data_ptr), &recv, 1, StandardType(&recv), root_id, this->get())); @@ -2980,6 +3024,8 @@ void Communicator::scatter(const std::vector & data, T * recv_ptr = recv.empty() ? nullptr : recv.data(); libmesh_ignore(data_ptr, recv_ptr); // unused ifndef LIBMESH_HAVE_MPI + libmesh_assert_less(root_id, this->size()); + libmesh_call_mpi (MPI_Scatter (data_ptr, recv_buffer_size, StandardType(data_ptr), recv_ptr, recv_buffer_size, StandardType(recv_ptr), root_id, this->get())); @@ -3032,6 +3078,8 @@ void Communicator::scatter(const std::vector & data, T * recv_ptr = recv.empty() ? nullptr : recv.data(); libmesh_ignore(data_ptr, count_ptr, recv_ptr); // unused ifndef LIBMESH_HAVE_MPI + libmesh_assert_less(root_id, this->size()); + // Scatter the non-uniform chunks libmesh_call_mpi (MPI_Scatterv (data_ptr, count_ptr, displacements.data(), StandardType(data_ptr), @@ -3206,6 +3254,53 @@ inline void Communicator::allgather_packed_range(Context * context, } + +template +inline bool Communicator::possibly_receive (unsigned int & src_processor_id, + std::vector & buf, + const DataType & type, + Request & req, + const MessageTag & tag) const +{ + LOG_SCOPE("possibly_receive()", "Parallel"); + + Status stat(type); + + int int_flag = 0; + + libmesh_assert(src_processor_id < this->size() || + src_processor_id == any_source); + + libmesh_call_mpi(MPI_Iprobe(src_processor_id, + tag.value(), + this->get(), + &int_flag, + stat.get())); + + if (int_flag) + { + buf.resize(stat.size()); + + src_processor_id = stat.source(); + + libmesh_call_mpi + (MPI_Irecv (buf.data(), + cast_int(buf.size()), + type, + src_processor_id, + tag.value(), + this->get(), + req.get())); + + // The MessageTag should stay registered for the Request lifetime + req.add_post_wait_work + (new Parallel::PostWaitDereferenceTag(tag)); + } + + return int_flag; +} + + } // namespace Parallel } // namespace libMesh diff --git a/include/parallel/parallel_sync.h b/include/parallel/parallel_sync.h index 451aae88205..8cb0d50366d 100644 --- a/include/parallel/parallel_sync.h +++ b/include/parallel/parallel_sync.h @@ -27,6 +27,7 @@ #include #include #include +#include namespace libMesh @@ -53,42 +54,14 @@ namespace Parallel { * All receives and actions are completed before this function * returns. * - * Not all sends may have yet completed. The supplied container of - * Request objects, \p req, has more requests inserted, one for each - * of the data sends. These requests must be waited on before the \p - * data map is deleted. + * Note: it is very important that the message tag be completely + * unique to each invocation */ template void push_parallel_vector_data(const Communicator & comm, const MapToVectors & data, - RequestContainer & reqs, - ActionFunctor & act_on_data); - - - -/** - * Send and receive and act on vectors of data. - * - * The \p data map is indexed by processor ids as keys, and for each - * processor id in the map there should be a vector of data to send. - * - * Data which is received from other processors will be operated on by - * act_on_data(processor_id_type pid, const std::vector & data); - * - * No guarantee about operation ordering is made - this function will - * attempt to act on data in the order in which it is received. - * - * All communication and actions are complete when this function - * returns. - */ -template -void push_parallel_vector_data(const Communicator & comm, - const MapToVectors & data, - ActionFunctor & act_on_data); - + const ActionFunctor & act_on_data); /** * Send query vectors, receive and answer them with vectors of data, @@ -116,50 +89,6 @@ void push_parallel_vector_data(const Communicator & comm, * * All receives and actions are completed before this function * returns. - * - * Not all sends may have yet completed. The supplied container of - * Request objects, \p req, has more requests inserted, one for each - * of the data sends. These requests must be waited on before the \p - * data map is deleted. - */ -template -void pull_parallel_vector_data(const Communicator & comm, - const MapToVectors & queries, - RequestContainer & reqs, - GatherFunctor & gather_data, - ActionFunctor & act_on_data, - const datum * example); - -/** - * Send query vectors, receive and answer them with vectors of data, - * then act on those answers. - * - * The \p data map is indexed by processor ids as keys, and for each - * processor id in the map there should be a vector of query ids to send. - * - * Query data which is received from other processors will be operated - * on by - * gather_data(processor_id_type pid, const std::vector & ids, - * std::vector & data) - * - * Answer data which is received from other processors will be operated on by - * act_on_data(processor_id_type pid, const std::vector & ids, - * const std::vector & data); - * - * The example pointer may be null; it merely needs to be of the - * correct type. It's just here because function overloading in C++ - * is easy, whereas SFINAE is hard and partial template specialization - * of functions is impossible. - * - * No guarantee about operation ordering is made - this function will - * attempt to act on data in the order in which it is received. - * - * All communication and actions are complete when this function - * returns. */ template class MapType, - typename KeyType, - typename ValueType, - typename A1, - typename A2, - typename ... ExtraTypes, - typename RequestContainer, - typename ActionFunctor> -void push_parallel_vector_data(const Communicator & comm, - const MapType,A2>, ExtraTypes...> & data, - RequestContainer & reqs, - ActionFunctor & act_on_data); - - - /* * A specialization for types that are harder to non-blocking receive. */ @@ -205,7 +116,7 @@ template