Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .rodare.json
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
"name": "Ganyushin, Dmitry",
"orcid": "0000-0001-7337-2161",
"type": "Other"
},
{
"affiliation": "NVIDIA",
"name": "Kirkham, John",
"type": "Other"
}
],
"title": "C++ & Python API for Scientific I/O with openPMD",
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,8 @@ Further thanks go to improvements and contributions from:
report on NVCC warnings
* [Dmitry Ganyushin (ORNL)](https://github.com/dmitry-ganyushin):
Dask dataframe support
* [John Kirkham (NVIDIA)](https://github.com/jakirkham):
Dask guidance & reviews

### Grants

Expand Down
13 changes: 7 additions & 6 deletions src/binding/python/openpmd_api/DaskDataFrame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This file is part of the openPMD-api.

Copyright 2021 openPMD contributors
Authors: Axel Huebl, Dmitry Ganyushin
Authors: Axel Huebl, Dmitry Ganyushin, John Kirkham
License: LGPLv3+
"""
import numpy as np
Expand All @@ -19,6 +19,11 @@
found_pandas = False


def read_chunk_to_df(species, chunk):
stride = np.s_[chunk.offset[0]:chunk.extent[0]]
return species.to_df(stride)


def particles_to_daskdataframe(particle_species):
"""
Load all records of a particle species into a Dask DataFrame.
Expand Down Expand Up @@ -74,13 +79,9 @@ def particles_to_daskdataframe(particle_species):
if chunks:
break

def read_chunk(species, chunk):
stride = np.s_[chunk.offset[0]:chunk.extent[0]]
return species.to_df(stride)

# merge DataFrames
dfs = [
delayed(read_chunk)(particle_species, chunk) for chunk in chunks
delayed(read_chunk_to_df)(particle_species, chunk) for chunk in chunks
]
df = dd.from_delayed(dfs)

Expand Down