From 6b417258b7cc6bd4b66edbe1b7f0c91636822a61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 8 May 2024 14:45:18 +0200 Subject: [PATCH 01/12] retrieveIteration: return both Series and Iteration --- include/openPMD/Iteration.hpp | 1 + include/openPMD/Series.hpp | 1 + include/openPMD/backend/Attributable.hpp | 48 ++++++++++++++++- src/backend/Attributable.cpp | 67 +++++++++--------------- src/backend/BaseRecordComponent.cpp | 13 ++++- 5 files changed, 85 insertions(+), 45 deletions(-) diff --git a/include/openPMD/Iteration.hpp b/include/openPMD/Iteration.hpp index e40cce5a28..a8f4d7e43d 100644 --- a/include/openPMD/Iteration.hpp +++ b/include/openPMD/Iteration.hpp @@ -130,6 +130,7 @@ class Iteration : public Attributable friend class Series; friend class WriteIterations; friend class SeriesIterator; + friend class internal::AttributableData; public: Iteration(Iteration const &) = default; diff --git a/include/openPMD/Series.hpp b/include/openPMD/Series.hpp index 4a3417d149..04907eda40 100644 --- a/include/openPMD/Series.hpp +++ b/include/openPMD/Series.hpp @@ -254,6 +254,7 @@ class Series : public Attributable friend class ReadIterations; friend class SeriesIterator; friend class internal::SeriesData; + friend class internal::AttributableData; friend class WriteIterations; public: diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index c7f01491e1..a1daf441cf 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -53,6 +53,9 @@ class Series; namespace internal { + class IterationData; + class SeriesData; + class AttributableData { friend class openPMD::Attributable; @@ -74,6 +77,42 @@ namespace internal */ Writable m_writable; + template + T asInternalCopyOf() + { + auto *self = dynamic_cast(this); + if (!self) + { + if constexpr (std::is_same_v) + { + throw std::runtime_error( + "[Attributable::retrieveSeries] Error when trying to " + "retrieve the Series object. Note: An instance of the " + "Series object must still exist when flushing. A " + "common cause for this error is using a flush call on " + "a handle (e.g. `Iteration::seriesFlush()`) when the " + "original Series object has already gone out of " + "scope."); + } + else + { + throw std::runtime_error( + + "[AttributableData::asInternalCopyOf] Error when " + "trying to retrieve a containing object. Note: An " + "instance of the Series object must still exist when " + "flushing. A common cause for this error is using a " + "flush call on a handle (e.g. " + "`Iteration::seriesFlush()`) when the original Series " + "object has already gone out of scope."); + } + } + T res; + res.setData( + std::shared_ptr(self, [](auto const *) {})); + return res; + } + private: /** * The attributes defined by this Attributable. @@ -263,8 +302,13 @@ OPENPMD_protected * Throws an error otherwise, e.g., for Series objects. * @{ */ - Iteration const &containingIteration() const; - Iteration &containingIteration(); + [[nodiscard]] auto containingIteration() const + -> std::pair< + std::optional, + internal::SeriesData const *>; + auto containingIteration() -> std::pair< + std::optional, + internal::SeriesData *>; /** @} */ void seriesFlush(internal::FlushParams const &); diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index c50ca5345d..eb86d129b8 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -127,24 +127,13 @@ Series Attributable::retrieveSeries() const { findSeries = findSeries->parent; } - auto *seriesData = - dynamic_cast(findSeries->attributable); - if (!seriesData) - { - throw std::runtime_error( - "[Attributable::retrieveSeries] Error when trying to retrieve the " - "Series object. Note: An instance of the Series object must still " - "exist when flushing. A common cause for this error is using a " - "flush call on a handle (e.g. `Iteration::seriesFlush()`) when the " - "original Series object has already gone out of scope."); - } - Series res; - res.setData( - std::shared_ptr{seriesData, [](auto const *) {}}); - return res; + return findSeries->attributable->asInternalCopyOf(); } -Iteration const &Attributable::containingIteration() const +auto Attributable::containingIteration() const + -> std::pair< + std::optional, + internal::SeriesData const *> { std::vector searchQueue; searchQueue.reserve(7); @@ -157,40 +146,34 @@ Iteration const &Attributable::containingIteration() const } // End of the queue: // Iteration -> Series.iterations -> Series + auto *series = &auxiliary::deref_dynamic_cast( + (*searchQueue.rbegin())->attributable); if (searchQueue.size() < 3) { - throw std::runtime_error( - "containingIteration(): Must be called for an object contained in " - "an iteration."); + return std::make_pair(std::nullopt, series); } - auto end = searchQueue.rbegin(); - internal::AttributableData const *attr = (*(end + 2))->attributable; - if (attr == nullptr) - throw std::runtime_error( - "containingIteration(): attributable must not be a nullptr."); - /* - * We now know the unique instance of Attributable that corresponds with - * the iteration. - * Since the class Iteration itself still follows the old class design, - * we will have to take a detour via Series. - */ - auto &series = auxiliary::deref_dynamic_cast( - (*searchQueue.rbegin())->attributable); - for (auto const &pair : series.iterations) + else { - if (&static_cast(pair.second).get() == attr) - { - return pair.second; - } + auto end = searchQueue.rbegin(); + auto *iteration = + &auxiliary::deref_dynamic_cast( + (*(end + 2))->attributable); + return std::make_pair(std::make_optional(iteration), series); } - throw std::runtime_error( - "Containing iteration not found in containing Series."); } -Iteration &Attributable::containingIteration() +auto Attributable::containingIteration() + -> std:: + pair, internal::SeriesData *> { - return const_cast( - static_cast(this)->containingIteration()); + auto const_res = + static_cast(this)->containingIteration(); + return std::make_pair( + const_res.first.has_value() + ? std::make_optional( + const_cast(*const_res.first)) + : std::nullopt, + const_cast(const_res.second)); } std::string Attributable::MyPath::filePath() const diff --git a/src/backend/BaseRecordComponent.cpp b/src/backend/BaseRecordComponent.cpp index 839cdc55a6..3f0f1b35c0 100644 --- a/src/backend/BaseRecordComponent.cpp +++ b/src/backend/BaseRecordComponent.cpp @@ -19,6 +19,7 @@ * If not, see . */ #include "openPMD/backend/BaseRecordComponent.hpp" +#include "openPMD/Error.hpp" #include "openPMD/Iteration.hpp" namespace openPMD @@ -90,7 +91,17 @@ ChunkTable BaseRecordComponent::availableChunks() Offset offset(rc.m_dataset.value().extent.size(), 0); return ChunkTable{{std::move(offset), rc.m_dataset.value().extent}}; } - containingIteration().open(); + if (auto iteration_data = containingIteration().first; + iteration_data.has_value()) + { + (*iteration_data)->asInternalCopyOf().open(); + } + else + { + throw error::Internal( + "Containing Iteration of BaseRecordComponent could not be " + "retrieved."); + } Parameter param; IOTask task(this, param); IOHandler()->enqueue(task); From b56d525fe3ef59b981b5d72178da0c80a897194d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 8 May 2024 14:59:42 +0200 Subject: [PATCH 02/12] Optimize implementation --- src/backend/Attributable.cpp | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/src/backend/Attributable.cpp b/src/backend/Attributable.cpp index eb86d129b8..ed24308950 100644 --- a/src/backend/Attributable.cpp +++ b/src/backend/Attributable.cpp @@ -135,30 +135,40 @@ auto Attributable::containingIteration() const std::optional, internal::SeriesData const *> { - std::vector searchQueue; - searchQueue.reserve(7); + constexpr size_t search_queue_size = 3; + Writable const *search_queue[search_queue_size]{nullptr}; + size_t search_queue_idx = 0; Writable const *findSeries = &writable(); - while (findSeries) + while (true) { - searchQueue.push_back(findSeries); + search_queue[search_queue_idx] = findSeries; // we don't need to push the last Writable since it's the Series anyway findSeries = findSeries->parent; + if (!findSeries) + { + break; + } + else + { + search_queue_idx = (search_queue_idx + 1) % search_queue_size; + } } // End of the queue: // Iteration -> Series.iterations -> Series - auto *series = &auxiliary::deref_dynamic_cast( - (*searchQueue.rbegin())->attributable); - if (searchQueue.size() < 3) + auto *series = &auxiliary::deref_dynamic_cast( + search_queue[search_queue_idx]->attributable); + auto maybe_iteration = search_queue + [(search_queue_idx + (search_queue_size - 2)) % search_queue_size]; + if (maybe_iteration) { - return std::make_pair(std::nullopt, series); + auto *iteration = + &auxiliary::deref_dynamic_cast( + maybe_iteration->attributable); + return std::make_pair(std::make_optional(iteration), series); } else { - auto end = searchQueue.rbegin(); - auto *iteration = - &auxiliary::deref_dynamic_cast( - (*(end + 2))->attributable); - return std::make_pair(std::make_optional(iteration), series); + return std::make_pair(std::nullopt, series); } } From 404cd1105564eb7b3b355263acb649f8be2608d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 8 May 2024 15:18:49 +0200 Subject: [PATCH 03/12] seriesFlush(): Mark containing Iteration as dirty --- src/backend/Writable.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/backend/Writable.cpp b/src/backend/Writable.cpp index bd68941345..0e399a3a81 100644 --- a/src/backend/Writable.cpp +++ b/src/backend/Writable.cpp @@ -51,7 +51,14 @@ void Writable::seriesFlush(internal::FlushParams const &flushParams) { Attributable impl; impl.setData({attributable, [](auto const *) {}}); - auto series = impl.retrieveSeries(); + auto [iteration_internal, series_internal] = impl.containingIteration(); + if (iteration_internal) + { + (*iteration_internal) + ->asInternalCopyOf() + .setDirtyRecursive(true); + } + auto series = series_internal->asInternalCopyOf(); series.flush_impl( series.iterations.begin(), series.iterations.end(), flushParams); } From 712388ceba71ef31b2ec98c53f2e599b8ff7e3ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 4 Jun 2024 12:47:50 +0200 Subject: [PATCH 04/12] Add failing test --- test/ParallelIOTest.cpp | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 32851d8dc3..38f01044b0 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1135,6 +1135,41 @@ TEST_CASE("hipace_like_write", "[parallel]") } #endif +#if openPMD_HAVE_ADIOS2 && openPMD_HAS_ADIOS_2_9 +TEST_CASE("independent_write_with_collective_flush", "[parallel]") +{ + Series write( + "../samples/independent_write_with_collective_flush.bp5", + Access::CREATE, + MPI_COMM_WORLD, + "adios2.engine.preferred_flush_target = \"buffer\""); + int size, rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + auto iteration = write.iterations[0]; + auto E_x = iteration.meshes["E"]["x"]; + E_x.resetDataset({Datatype::DOUBLE, {10}}); + write.flush(); + if (rank == 1) + { + E_x.storeChunk( + std::unique_ptr{new double[10]{4.2}}, {0}, {10}); + } + /* + * Now, the iteration is dirty only on rank 1. But the following flush must + * run collectively anyway. The test has been designed in such a way that + * the PerformDataWrite() call required by the disk flush target will + * conflict with the default buffer target that will run in the destructor, + * unless the flush in the next line really is collective. + */ + std::cout << "ENTER" << std::endl; + MPI_Barrier(MPI_COMM_WORLD); + iteration.seriesFlush("adios2.engine.preferred_flush_target = \"disk\""); + MPI_Barrier(MPI_COMM_WORLD); + std::cout << "LEAVE" << std::endl; +} +#endif + #if openPMD_HAVE_ADIOS2 && openPMD_HAVE_MPI void adios2_streaming(bool variableBasedLayout) From bd1d33e77b3fc6b085617af7ef6ff6d4dfc388e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Tue, 4 Jun 2024 12:47:56 +0200 Subject: [PATCH 05/12] Backend implementation --- docs/source/dev/design.rst | 2 +- include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp | 2 + include/openPMD/IO/AbstractIOHandlerImpl.hpp | 5 +++ include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp | 1 + include/openPMD/IO/IOTask.hpp | 45 ++++++++++++++++--- include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp | 2 + src/IO/ADIOS/ADIOS2IOHandler.cpp | 8 ++++ src/IO/AbstractIOHandlerImpl.cpp | 8 ++++ src/IO/HDF5/HDF5IOHandler.cpp | 5 +++ src/IO/JSON/JSONIOHandlerImpl.cpp | 7 +++ src/Iteration.cpp | 3 ++ 11 files changed, 81 insertions(+), 7 deletions(-) diff --git a/docs/source/dev/design.rst b/docs/source/dev/design.rst index ce43777407..6fb81071ce 100644 --- a/docs/source/dev/design.rst +++ b/docs/source/dev/design.rst @@ -23,7 +23,7 @@ Therefore, enabling users to handle hierarchical, self-describing file formats w .. literalinclude:: IOTask.hpp :language: cpp - :lines: 48-78 + :lines: 50-81 Every task is designed to be a fully self-contained description of one such atomic operation. By describing a required minimal step of work (without any side-effect), these operations are the foundation of the unified handling mechanism across suitable file formats. The actual low-level exchange of data is implemented in ``IOHandlers``, one per file format (possibly two if handlingi MPI-parallel work is possible and requires different behaviour). diff --git a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp index efab738a20..bd7c698f37 100644 --- a/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp +++ b/include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp @@ -206,6 +206,8 @@ class ADIOS2IOHandlerImpl void deregister(Writable *, Parameter const &) override; + void touch(Writable *, Parameter const &) override; + /** * @brief The ADIOS2 access type to chose for Engines opened * within this instance. diff --git a/include/openPMD/IO/AbstractIOHandlerImpl.hpp b/include/openPMD/IO/AbstractIOHandlerImpl.hpp index 7fc2e4cca0..81b0a0d816 100644 --- a/include/openPMD/IO/AbstractIOHandlerImpl.hpp +++ b/include/openPMD/IO/AbstractIOHandlerImpl.hpp @@ -395,6 +395,11 @@ class AbstractIOHandlerImpl virtual void deregister(Writable *, Parameter const ¶m) = 0; + /** Treat this writable's file as open/active/dirty. + */ + virtual void + touch(Writable *, Parameter const ¶m) = 0; + AbstractIOHandler *m_handler; bool m_verboseIOTasks = false; diff --git a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp index 9fee9978f0..20309e79fc 100644 --- a/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp +++ b/include/openPMD/IO/HDF5/HDF5IOHandlerImpl.hpp @@ -80,6 +80,7 @@ class HDF5IOHandlerImpl : public AbstractIOHandlerImpl void listAttributes(Writable *, Parameter &) override; void deregister(Writable *, Parameter const &) override; + void touch(Writable *, Parameter const &) override; std::unordered_map m_fileNames; std::unordered_map m_fileNamesWithID; diff --git a/include/openPMD/IO/IOTask.hpp b/include/openPMD/IO/IOTask.hpp index 0d64effa72..76839cd35b 100644 --- a/include/openPMD/IO/IOTask.hpp +++ b/include/openPMD/IO/IOTask.hpp @@ -48,20 +48,36 @@ Writable *getWritable(Attributable *); /** Type of IO operation between logical and persistent data. */ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){ - CREATE_FILE, CHECK_FILE, OPEN_FILE, CLOSE_FILE, + CREATE_FILE, + CHECK_FILE, + OPEN_FILE, + CLOSE_FILE, DELETE_FILE, - CREATE_PATH, CLOSE_PATH, OPEN_PATH, DELETE_PATH, + CREATE_PATH, + CLOSE_PATH, + OPEN_PATH, + DELETE_PATH, LIST_PATHS, - CREATE_DATASET, EXTEND_DATASET, OPEN_DATASET, DELETE_DATASET, - WRITE_DATASET, READ_DATASET, LIST_DATASETS, GET_BUFFER_VIEW, + CREATE_DATASET, + EXTEND_DATASET, + OPEN_DATASET, + DELETE_DATASET, + WRITE_DATASET, + READ_DATASET, + LIST_DATASETS, + GET_BUFFER_VIEW, - DELETE_ATT, WRITE_ATT, READ_ATT, LIST_ATTS, + DELETE_ATT, + WRITE_ATT, + READ_ATT, + LIST_ATTS, ADVANCE, AVAILABLE_CHUNKS, //!< Query chunks that can be loaded in a dataset - DEREGISTER //!< Inform the backend that an object has been deleted. + DEREGISTER, //!< Inform the backend that an object has been deleted. + TOUCH //!< tell the backend that the file is to be considered active }; // note: if you change the enum members here, please update // docs/source/dev/design.rst @@ -658,6 +674,23 @@ struct OPENPMDAPI_EXPORT Parameter void const *former_parent = nullptr; }; +template <> +struct OPENPMDAPI_EXPORT Parameter : public AbstractParameter +{ + explicit Parameter() = default; + + Parameter(Parameter const &) = default; + Parameter(Parameter &&) = default; + + Parameter &operator=(Parameter const &) = default; + Parameter &operator=(Parameter &&) = default; + + std::unique_ptr to_heap() && override + { + return std::make_unique>(std::move(*this)); + } +}; + /** @brief Self-contained description of a single IO operation. * * Contained are diff --git a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp index da7e296d59..b67ac9138a 100644 --- a/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp +++ b/include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp @@ -240,6 +240,8 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl void deregister(Writable *, Parameter const &) override; + void touch(Writable *, Parameter const &) override; + std::future flush(); private: diff --git a/src/IO/ADIOS/ADIOS2IOHandler.cpp b/src/IO/ADIOS/ADIOS2IOHandler.cpp index 52c2b8acfc..0dc128340b 100644 --- a/src/IO/ADIOS/ADIOS2IOHandler.cpp +++ b/src/IO/ADIOS/ADIOS2IOHandler.cpp @@ -895,6 +895,7 @@ void ADIOS2IOHandlerImpl::openFile( // lazy opening is deathly in parallel situations auto &fileData = getFileData(file, IfFileNotOpen::OpenImplicitly); *parameters.out_parsePreference = fileData.parsePreference; + m_dirty.emplace(std::move(file)); } void ADIOS2IOHandlerImpl::closeFile( @@ -1482,6 +1483,13 @@ void ADIOS2IOHandlerImpl::deregister( m_files.erase(writable); } +void ADIOS2IOHandlerImpl::touch( + Writable *writable, Parameter const &) +{ + auto file = refreshFileFromParent(writable, /* preferParentFile = */ false); + m_dirty.emplace(std::move(file)); +} + adios2::Mode ADIOS2IOHandlerImpl::adios2AccessMode(std::string const &fullPath) { switch (m_handler->m_backendAccess) diff --git a/src/IO/AbstractIOHandlerImpl.cpp b/src/IO/AbstractIOHandlerImpl.cpp index 109942df51..fe01fea649 100644 --- a/src/IO/AbstractIOHandlerImpl.cpp +++ b/src/IO/AbstractIOHandlerImpl.cpp @@ -421,6 +421,14 @@ std::future AbstractIOHandlerImpl::flush() deregister(i.writable, parameter); break; } + case O::TOUCH: { + auto ¶meter = + deref_dynamic_cast>(i.parameter.get()); + writeToStderr( + "[", i.writable->parent, "->", i.writable, "] DEREGISTER"); + touch(i.writable, parameter); + break; + } } } catch (...) diff --git a/src/IO/HDF5/HDF5IOHandler.cpp b/src/IO/HDF5/HDF5IOHandler.cpp index c98349a5b3..a091fd004f 100644 --- a/src/IO/HDF5/HDF5IOHandler.cpp +++ b/src/IO/HDF5/HDF5IOHandler.cpp @@ -2910,6 +2910,11 @@ void HDF5IOHandlerImpl::deregister( m_fileNames.erase(writable); } +void HDF5IOHandlerImpl::touch(Writable *, Parameter const &) +{ + // no-op +} + std::optional HDF5IOHandlerImpl::getFile(Writable *writable) { diff --git a/src/IO/JSON/JSONIOHandlerImpl.cpp b/src/IO/JSON/JSONIOHandlerImpl.cpp index 657c15e5fb..61b732ed0a 100644 --- a/src/IO/JSON/JSONIOHandlerImpl.cpp +++ b/src/IO/JSON/JSONIOHandlerImpl.cpp @@ -1039,6 +1039,13 @@ void JSONIOHandlerImpl::deregister( m_files.erase(writable); } +void JSONIOHandlerImpl::touch( + Writable *writable, Parameter const &) +{ + auto file = refreshFileFromParent(writable); + m_dirty.emplace(std::move(file)); +} + auto JSONIOHandlerImpl::getFilehandle(File const &fileName, Access access) -> std::tuple, std::istream *, std::ostream *> { diff --git a/src/Iteration.cpp b/src/Iteration.cpp index b56fb21096..7268ae7153 100644 --- a/src/Iteration.cpp +++ b/src/Iteration.cpp @@ -22,6 +22,7 @@ #include "openPMD/Dataset.hpp" #include "openPMD/Datatype.hpp" #include "openPMD/IO/AbstractIOHandler.hpp" +#include "openPMD/IO/IOTask.hpp" #include "openPMD/Series.hpp" #include "openPMD/auxiliary/DerefDynamicCast.hpp" #include "openPMD/auxiliary/Filesystem.hpp" @@ -315,6 +316,8 @@ void Iteration::flushVariableBased( void Iteration::flush(internal::FlushParams const &flushParams) { + Parameter touch; + IOHandler()->enqueue(IOTask(&writable(), touch)); if (access::readOnly(IOHandler()->m_frontendAccess)) { for (auto &m : meshes) From a9d408a2184779bda5ff63bfeb688d07aea5ccf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Thu, 6 Jun 2024 15:06:49 +0200 Subject: [PATCH 06/12] Add documentation --- docs/source/details/mpi.rst | 6 ++++++ include/openPMD/backend/Attributable.hpp | 2 ++ 2 files changed, 8 insertions(+) diff --git a/docs/source/details/mpi.rst b/docs/source/details/mpi.rst index f59a3b0aa7..38bdc2643d 100644 --- a/docs/source/details/mpi.rst +++ b/docs/source/details/mpi.rst @@ -49,6 +49,12 @@ Functionality Behavior Description .. [4] We usually open iterations delayed on first access. This first access is usually the ``flush()`` call after a ``storeChunk``/``loadChunk`` operation. If the first access is non-collective, an explicit, collective ``Iteration::open()`` can be used to have the files already open. Alternatively, iterations might be accessed for the first time by immediate operations such as ``::availableChunks()``. +.. warning:: + + The openPMD-api will by default flush only those Iterations which are dirty, i.e. have been written to. + This is somewhat unfortunate in parallel setups since only the dirty status of the current MPI rank can be considered. + As a workaround, use ``Attributable::seriesFlush()`` on an Iteration (or an object contained within an Iteration) to force flush that Iteration regardless of its dirty status. + .. tip:: Just because an operation is independent does not mean it is allowed to be inconsistent. diff --git a/include/openPMD/backend/Attributable.hpp b/include/openPMD/backend/Attributable.hpp index a1daf441cf..6dcb9dc918 100644 --- a/include/openPMD/backend/Attributable.hpp +++ b/include/openPMD/backend/Attributable.hpp @@ -246,6 +246,8 @@ class Attributable * of parents. This method will walk up the parent list until it reaches * an object that has no parent, which is the Series object, and flush()-es * it. + * If the Attributable is an Iteration or any object contained in an + * Iteration, that Iteration will be flushed regardless of its dirty status. * * @param backendConfig Further backend-specific instructions on how to * implement this flush call. From 81a28e84a0023482e2af45c1b198e8b3a42049d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 3 Jun 2024 12:25:06 +0200 Subject: [PATCH 07/12] Add ADIOS2 v2.10 define and use that for BP5 check --- include/openPMD/IO/ADIOS/macros.hpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/openPMD/IO/ADIOS/macros.hpp b/include/openPMD/IO/ADIOS/macros.hpp index d7e10385ee..4e5f223e44 100644 --- a/include/openPMD/IO/ADIOS/macros.hpp +++ b/include/openPMD/IO/ADIOS/macros.hpp @@ -19,7 +19,10 @@ #define openPMD_HAS_ADIOS_2_9 \ (ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 209) -#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_9 +#define openPMD_HAS_ADIOS_2_10 \ + (ADIOS2_VERSION_MAJOR * 100 + ADIOS2_VERSION_MINOR >= 210) + +#if defined(ADIOS2_HAVE_BP5) || openPMD_HAS_ADIOS_2_10 // ADIOS2 v2.10 no longer defines this #define openPMD_HAVE_ADIOS2_BP5 1 #else From 186d99e6eecf4649f2281be3e565a9e4e84ce633 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 3 Jun 2024 12:36:51 +0200 Subject: [PATCH 08/12] Ask the engine if it is BP5 for BP5-specific features --- src/IO/ADIOS/ADIOS2File.cpp | 10 +++++++++- test/SerialIOTest.cpp | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/IO/ADIOS/ADIOS2File.cpp b/src/IO/ADIOS/ADIOS2File.cpp index 2ba0fee981..e19698465b 100644 --- a/src/IO/ADIOS/ADIOS2File.cpp +++ b/src/IO/ADIOS/ADIOS2File.cpp @@ -23,6 +23,7 @@ #include "openPMD/Error.hpp" #include "openPMD/IO/ADIOS/ADIOS2IOHandler.hpp" #include "openPMD/auxiliary/Environment.hpp" +#include "openPMD/auxiliary/StringManip.hpp" #if openPMD_USE_VERIFY #define VERIFY(CONDITION, TEXT) \ @@ -1044,7 +1045,14 @@ void ADIOS2File::flush_impl(ADIOS2FlushParams flushParams, bool writeLatePuts) performDataWrite = false; break; } - performDataWrite = performDataWrite && m_engineType == "bp5"; + performDataWrite = performDataWrite && + (m_engineType == "bp5" || + /* this second check should be sufficient, but we leave the + first check in as a safeguard against renamings in ADIOS2. + Also do a lowerCase transform since the docstring of + `Engine::Type()` claims that the return value is in + lowercase, but for BP5 this does not seem true. */ + auxiliary::lowerCase(engine.Type()) == "bp5writer"); if (performDataWrite) { diff --git a/test/SerialIOTest.cpp b/test/SerialIOTest.cpp index 4c78d16e16..524b2af6a3 100644 --- a/test/SerialIOTest.cpp +++ b/test/SerialIOTest.cpp @@ -4414,6 +4414,25 @@ BufferChunkSize = 2147483646 # 2^31 - 2 )"; adios2_bp5_flush(cfg5, /* flushDuringStep = */ FlushDuringStep::Always); + +#if openPMD_HAVE_ADIOS2_BP5 + std::string cfg6 = R"( +[adios2] + +[adios2.engine] +preferred_flush_target = "disk" + +[adios2.engine.parameters] +AggregationType = "TwoLevelShm" +MaxShmSize = 3221225472 +NumSubFiles = 1 +NumAggregators = 1 +BufferChunkSize = 2147483646 # 2^31 - 2 +)"; + + adios2_bp5_flush( + cfg6, /* flushDuringStep = */ FlushDuringStep::Default_Yes); +#endif } #endif From 0711777912b6366405be122e45146bcef4ec57ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Mon, 3 Jun 2024 17:47:32 +0200 Subject: [PATCH 09/12] write_test_zero_extent: require flush to buffer Somehow PerformDataWrite() leads to trouble with this pattern. --- test/ParallelIOTest.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 38f01044b0..77e6a54a27 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -133,7 +133,8 @@ void write_test_zero_extent( Series o = Series( filePath.append(".").append(file_ending), Access::CREATE, - MPI_COMM_WORLD); + MPI_COMM_WORLD, + "adios2.engine.preferred_flush_target = \"buffer\""); int const max_step = 100; From f592e044ef0a29f8643878632031f5a38b19e006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 5 Jun 2024 13:31:05 +0200 Subject: [PATCH 10/12] Revert "write_test_zero_extent: require flush to buffer" This reverts commit 36597bd27148c82ab69f5fddd58a7af5e3961c93. No longer needed after rebasing on fix-iteration-flush --- test/ParallelIOTest.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 77e6a54a27..38f01044b0 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -133,8 +133,7 @@ void write_test_zero_extent( Series o = Series( filePath.append(".").append(file_ending), Access::CREATE, - MPI_COMM_WORLD, - "adios2.engine.preferred_flush_target = \"buffer\""); + MPI_COMM_WORLD); int const max_step = 100; From d9c54f715a5dd17a75800b2b9a5cb26d52cdec41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 5 Jun 2024 12:13:22 +0200 Subject: [PATCH 11/12] Fix hipace_like_write test It used Series::flush non-collectively --- test/ParallelIOTest.cpp | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/ParallelIOTest.cpp b/test/ParallelIOTest.cpp index 38f01044b0..2ca4248333 100644 --- a/test/ParallelIOTest.cpp +++ b/test/ParallelIOTest.cpp @@ -1031,10 +1031,16 @@ void hipace_like_write(std::string const &file_ending) int const last_step = 100; int const my_first_step = i_mpi_rank * int(local_Nz); int const all_last_step = last_step + (i_mpi_size - 1) * int(local_Nz); + + bool participate_in_barrier = true; for (int first_rank_step = 0; first_rank_step < all_last_step; ++first_rank_step) { - MPI_Barrier(MPI_COMM_WORLD); + if (participate_in_barrier) + { + MPI_Barrier(MPI_COMM_WORLD); + } + participate_in_barrier = true; // first_rank_step: this step will "lead" the opening of an output step // step on the local rank @@ -1073,16 +1079,25 @@ void hipace_like_write(std::string const &file_ending) // has this ranks started computations yet? if (step < 0) + { + participate_in_barrier = false; continue; + } // has this ranks stopped computations? if (step > last_step) + { + participate_in_barrier = false; continue; + } // does this rank contribute to with output currently? bool const rank_in_output_step = std::find(iterations.begin(), iterations.end(), step) != iterations.end(); if (!rank_in_output_step) + { + participate_in_barrier = false; continue; + } // now we write (parallel, independent I/O) auto it = series.iterations[step]; From 0a05f10c70a53806443e2bae82a860ec13896e82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Franz=20P=C3=B6schel?= Date: Wed, 5 Jun 2024 13:30:18 +0200 Subject: [PATCH 12/12] Also ensure all ranks flush in group/variable encoding --- src/Series.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Series.cpp b/src/Series.cpp index 9140011c15..fcd382bc96 100644 --- a/src/Series.cpp +++ b/src/Series.cpp @@ -1403,6 +1403,7 @@ void Series::flushGorVBased( bool flushIOHandler) { auto &series = get(); + if (access::readOnly(IOHandler()->m_frontendAccess)) { for (auto it = begin; it != end; ++it) @@ -1432,6 +1433,8 @@ void Series::flushGorVBased( } // Phase 3 + Parameter touch; + IOHandler()->enqueue(IOTask(&writable(), touch)); if (flushIOHandler) { IOHandler()->flush(flushParams); @@ -1510,6 +1513,8 @@ void Series::flushGorVBased( } flushAttributes(flushParams); + Parameter touch; + IOHandler()->enqueue(IOTask(&writable(), touch)); if (flushIOHandler) { IOHandler()->flush(flushParams);