Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,4 +420,24 @@ inline T Iteration::dt() const
{
return this->readFloatingpoint<T>("dt");
}

/**
* @brief Subclass of Iteration that knows its own index withing the containing
* Series.
*/
class IndexedIteration : public Iteration
{
friend class SeriesIterator;
friend class WriteIterations;

public:
using index_t = Iteration::IterationIndex_t;
index_t const iterationIndex;

private:
template <typename Iteration_t>
IndexedIteration(Iteration_t &&it, index_t index)
: Iteration(std::forward<Iteration_t>(it)), iterationIndex(index)
{}
};
} // namespace openPMD
20 changes: 0 additions & 20 deletions include/openPMD/ReadIterations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,6 @@

namespace openPMD
{
/**
* @brief Subclass of Iteration that knows its own index withing the containing
* Series.
*/
class IndexedIteration : public Iteration
{
friend class SeriesIterator;

public:
using iterations_t = decltype(internal::SeriesData::iterations);
using index_t = iterations_t::key_type;
index_t const iterationIndex;

private:
template <typename Iteration_t>
IndexedIteration(Iteration_t &&it, index_t index)
: Iteration(std::forward<Iteration_t>(it)), iterationIndex(index)
{}
};

class SeriesIterator
{
using iteration_index_t = IndexedIteration::index_t;
Expand Down
5 changes: 5 additions & 0 deletions include/openPMD/WriteIterations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,10 @@ class WriteIterations
public:
mapped_type &operator[](key_type const &key);
mapped_type &operator[](key_type &&key);

/**
* Return the iteration that is currently being written to, if it exists.
*/
std::optional<IndexedIteration> currentIteration();
};
} // namespace openPMD
34 changes: 29 additions & 5 deletions src/WriteIterations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,17 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key)
"[WriteIterations] Trying to access after closing Series.");
}
auto &s = shared->value();
if (s.currentlyOpen.has_value())
auto lastIteration = currentIteration();
if (lastIteration.has_value())
{
auto lastIterationIndex = s.currentlyOpen.value();
auto &lastIteration = s.iterations.at(lastIterationIndex);
if (lastIterationIndex != key && !lastIteration.closed())
auto lastIteration_v = lastIteration.value();
if (lastIteration_v.iterationIndex == key)
{
lastIteration.close();
return s.iterations.at(std::move(key));
}
else
{
lastIteration_v.close(); // continue below
}
}
s.currentlyOpen = key;
Expand All @@ -87,4 +91,24 @@ WriteIterations::mapped_type &WriteIterations::operator[](key_type &&key)
}
return res;
}

std::optional<IndexedIteration> WriteIterations::currentIteration()
{
if (!shared || !shared->has_value())
{
return std::nullopt;
}
auto &s = shared->value();
if (!s.currentlyOpen.has_value())
{
return std::nullopt;
}
Iteration &currentIteration = s.iterations.at(s.currentlyOpen.value());
if (currentIteration.closed())
{
return std::nullopt;
}
return std::make_optional<IndexedIteration>(
IndexedIteration(currentIteration, s.currentlyOpen.value()));
}
} // namespace openPMD
16 changes: 14 additions & 2 deletions src/binding/python/Iteration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,20 @@ void init_Iteration(py::module &m)
"dt", &Iteration::dt<long double>, &Iteration::setDt<double>)
.def_property(
"time_unit_SI", &Iteration::timeUnitSI, &Iteration::setTimeUnitSI)
.def("open", &Iteration::open)
.def("close", &Iteration::close, py::arg("flush") = true)
.def(
"open",
[](Iteration &it) {
py::gil_scoped_release release;
return it.open();
})
.def(
"close",
/*
* Cannot release the GIL here since Python buffers might be
* accessed in deferred tasks
*/
&Iteration::close,
py::arg("flush") = true)

// TODO remove in future versions (deprecated)
.def("set_time", &Iteration::setTime<double>)
Expand Down
88 changes: 83 additions & 5 deletions src/binding/python/Series.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,103 @@ struct openPMD_PyMPICommObject
using openPMD_PyMPIIntracommObject = openPMD_PyMPICommObject;
#endif

struct SeriesIteratorPythonAdaptor : SeriesIterator
{
SeriesIteratorPythonAdaptor(SeriesIterator it)
: SeriesIterator(std::move(it))
{}

/*
* Python iterators are weird and call `__next__()` already for getting the
* first element.
* In that case, no `operator++()` must be called...
*/
bool first_iteration = true;
};

void init_Series(py::module &m)
{
py::class_<WriteIterations>(m, "WriteIterations")
.def(
"__getitem__",
[](WriteIterations writeIterations, Series::IterationIndex_t key) {
auto lastIteration = writeIterations.currentIteration();
if (lastIteration.has_value() &&
lastIteration.value().iterationIndex != key)
{
// this must happen under the GIL
lastIteration.value().close();
}
py::gil_scoped_release release;
Comment thread
franzpoeschel marked this conversation as resolved.
return writeIterations[key];
},
// copy + keepalive
py::return_value_policy::copy);
py::return_value_policy::copy)
.def(
"current_iteration",
&WriteIterations::currentIteration,
"Return the iteration that is currently being written to, if it "
"exists.");
py::class_<IndexedIteration, Iteration>(m, "IndexedIteration")
.def_readonly("iteration_index", &IndexedIteration::iterationIndex);

py::class_<SeriesIteratorPythonAdaptor>(m, "SeriesIterator")
.def(
"__next__",
[](SeriesIteratorPythonAdaptor &iterator) {
if (iterator == SeriesIterator::end())
{
throw py::stop_iteration();
}
/*
* Closing the iteration must happen under the GIL lock since
* Python buffers might be accessed
*/
if (!iterator.first_iteration)
{
if (!(*iterator).closed())
{
(*iterator).close();
}
py::gil_scoped_release release;
++iterator;
}
iterator.first_iteration = false;
if (iterator == SeriesIterator::end())
{
throw py::stop_iteration();
}
else
{
return *iterator;
}
}

);

py::class_<ReadIterations>(m, "ReadIterations")
.def(
"__iter__",
[](ReadIterations &readIterations) {
return py::make_iterator(
readIterations.begin(), readIterations.end());
// Simple iterator implementation:
// But we need to release the GIL inside
// SeriesIterator::operator++, so manually it is
// return py::make_iterator(
// readIterations.begin(), readIterations.end());
return SeriesIteratorPythonAdaptor(readIterations.begin());
},
// keep handle alive while iterator exists
py::keep_alive<0, 1>());

py::class_<Series, Attributable>(m, "Series")

.def(
py::init<std::string const &, Access, std::string const &>(),
py::init([](std::string const &filepath,
Access at,
std::string const &options) {
py::gil_scoped_release release;
return new Series(filepath, at, options);
}),
py::arg("filepath"),
py::arg("access"),
py::arg("options") = "{}")
Expand Down Expand Up @@ -145,6 +216,7 @@ void init_Series(py::module &m)
"(Mismatched MPI at compile vs. runtime?)");
}

py::gil_scoped_release release;
return new Series(filepath, at, *mpiCommPtr, options);
}),
py::arg("filepath"),
Expand Down Expand Up @@ -232,7 +304,13 @@ this method.
py::return_value_policy::reference,
// garbage collection: return value must be freed before Series
py::keep_alive<1, 0>())
.def("read_iterations", &Series::readIterations, py::keep_alive<0, 1>())
.def(
"read_iterations",
[](Series &s) {
py::gil_scoped_release release;
return s.readIterations();
},
py::keep_alive<0, 1>())
.def(
"write_iterations",
&Series::writeIterations,
Expand Down