Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9875b27
basic cudf backend support
rjzamora Jul 5, 2023
3afa3b9
formatting
rjzamora Jul 5, 2023
bead5cb
partial revision
rjzamora Jul 6, 2023
4381738
configure with backend fixture
rjzamora Jul 6, 2023
3d229a4
revert xdf name back to df
rjzamora Jul 6, 2023
c947cfa
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 6, 2023
f3c466c
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 12, 2023
55d4a9b
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 13, 2023
489f6ef
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 13, 2023
06f3bf8
fix pdf in test
rjzamora Jul 13, 2023
7d7a400
fix predicate-pushdown test
rjzamora Jul 14, 2023
934bd03
rely on DASK_DATAFRAME__BACKEND=cudf for now
rjzamora Jul 14, 2023
65252c4
add _set_engine utility for parquet
rjzamora Jul 17, 2023
b2ebdf9
remove unnecesary engine arg
rjzamora Jul 17, 2023
d907084
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 17, 2023
8573040
fix test
rjzamora Jul 17, 2023
5c4c019
revert pdf renaming
rjzamora Jul 18, 2023
07934a2
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 21, 2023
6d308d7
update test
rjzamora Jul 21, 2023
b9624da
address problems with cudf var/std behavior
rjzamora Jul 21, 2023
1f203f9
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 24, 2023
f502717
us decorators
rjzamora Jul 24, 2023
fc88402
rename _set_engine to _set_parquet_engine
rjzamora Jul 24, 2023
5d927c1
introduce _required_attribute
rjzamora Jul 24, 2023
c766ed8
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 24, 2023
7f87c8f
Update dask_expr/_reductions.py
rjzamora Jul 24, 2023
b8b3896
Merge remote-tracking branch 'upstream/main' into cudf-backend
rjzamora Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1088,9 +1088,10 @@ def read_parquet(
aggregate_files=None,
parquet_file_extension=(".parq", ".parquet", ".pq"),
filesystem="fsspec",
engine=None,
**kwargs,
):
from dask_expr.io.parquet import ReadParquet
from dask_expr.io.parquet import ReadParquet, _set_parquet_engine

if not isinstance(path, str):
path = stringify_path(path)
Expand All @@ -1113,6 +1114,7 @@ def read_parquet(
aggregate_files=aggregate_files,
parquet_file_extension=parquet_file_extension,
filesystem=filesystem,
engine=_set_parquet_engine(engine),
kwargs=kwargs,
)
)
Expand Down
26 changes: 25 additions & 1 deletion dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ def __init__(self, *args, **kwargs):
operands.append(type(self)._defaults[parameter])
assert not kwargs
self.operands = operands
if self._required_attribute:
dep = next(iter(self.dependencies()))._meta
if not hasattr(dep, self._required_attribute):
# Raise a ValueError instead of AttributeError to
# avoid infinite recursion
raise ValueError(f"{dep} has no attribute {self._required_attribute}")

@property
def _required_attribute(self) -> str:
# Specify if the first `dependency` must support
# a specific attribute for valid behavior.
return None
Comment on lines +57 to +68
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phofl - I ran into quite a few cases where the cudf backend was hanging (rather than quickly failing) for cases where a cudf.DataFrame/Series did not support the same attribute as pd.DataFrame/Series (e.g. nbytes, align, etc).

What do you think about doing something like this so that we can more-quickly detect that the dependency is missing a necessary attribute?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes I like this.


@functools.cached_property
def ndim(self):
Expand Down Expand Up @@ -941,6 +953,12 @@ class Blockwise(Expr):
_keyword_only = []
_projection_passthrough = False

@property
def _required_attribute(self):
if isinstance(self.operation, type(M.method_caller)):
return self.operation.method
return None

@functools.cached_property
def _meta(self):
args = [op._meta if isinstance(op, Expr) else op for op in self._args]
Expand Down Expand Up @@ -1027,7 +1045,13 @@ def _combine_similar(self, root: Expr):
# Push projections back up through `_projection_passthrough`
# operations if it reduces the number of unique expression nodes.
if self._projection_passthrough and isinstance(self.frame, Projection):
common = type(self)(self.frame.frame, *self.operands[1:])
try:
common = type(self)(self.frame.frame, *self.operands[1:])
except ValueError:
# May have encountered a problem with `_required_attribute`.
# (There is no guarentee that the same method will exist for
# both a Series and DataFrame)
return None
projection = self.frame.operand("columns")
push_up_projection = False
for op in self._find_similar_operations(root, ignore=self._parameters):
Expand Down
8 changes: 7 additions & 1 deletion dask_expr/_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ class NBytes(Reduction):
# Only supported for Series objects
reduction_chunk = lambda ser: ser.nbytes
reduction_aggregate = sum
_required_attribute = "nbytes"


class Var(Reduction):
Expand Down Expand Up @@ -519,7 +520,7 @@ def aggregate_kwargs(self):
@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {}
if skipna:
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
Expand All @@ -529,6 +530,11 @@ def reduction_chunk(cls, x, skipna=True, numeric_only=False):
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x.select_dtypes("number")
n = n.loc[x.columns]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

Expand Down
25 changes: 22 additions & 3 deletions dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from dask.dataframe.io.parquet.utils import _split_user_options
from dask.dataframe.io.utils import _is_local_fs
from dask.delayed import delayed
from dask.utils import apply, natural_sort_key
from dask.utils import apply, natural_sort_key, typename
from fsspec.utils import stringify_path

from dask_expr._expr import (
Expand Down Expand Up @@ -157,7 +157,6 @@ def _layer(self):
def to_parquet(
df,
path,
engine="pyarrow",
compression="snappy",
write_index=True,
append=False,
Expand All @@ -177,6 +176,7 @@ def to_parquet(
from dask_expr._collection import new_collection
from dask_expr.io.parquet import NONE_LABEL, ToParquet

engine = _set_parquet_engine(meta=df._meta)
compute_kwargs = compute_kwargs or {}

partition_on = partition_on or []
Expand Down Expand Up @@ -391,6 +391,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO):
"aggregate_files",
"parquet_file_extension",
"filesystem",
"engine",
"kwargs",
"_partitions",
"_series",
Expand All @@ -409,6 +410,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO):
"aggregate_files": None,
"parquet_file_extension": (".parq", ".parquet", ".pq"),
"filesystem": "fsspec",
"engine": "pyarrow",
"kwargs": None,
"_partitions": None,
"_series": False,
Expand All @@ -417,7 +419,10 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO):

@property
def engine(self):
return get_engine("pyarrow")
_engine = self.operand("engine")
if isinstance(_engine, str):
return get_engine(_engine)
return _engine

@property
def columns(self):
Expand Down Expand Up @@ -681,6 +686,20 @@ def _update_length_statistics(self):
#


def _set_parquet_engine(engine=None, meta=None):
# Use `engine` or `meta` input to set the parquet engine
if engine is None:
if (
meta is not None and typename(meta).split(".")[0] == "cudf"
) or dask.config.get("dataframe.backend", "pandas") == "cudf":
from dask_cudf.io.parquet import CudfEngine

engine = CudfEngine
else:
engine = "pyarrow"
return engine


def _align_statistics(parts, statistics):
# Make sure parts and statistics are aligned
# (if statistics is not empty)
Expand Down
37 changes: 23 additions & 14 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import importlib
import os

import dask.dataframe as dd
import pandas as pd
import pytest
from dask import config
from dask.dataframe.utils import assert_eq

from dask_expr import from_dask_dataframe, from_pandas, optimize, read_csv, read_parquet
from dask_expr._expr import Expr, Lengths, Literal, Replace
from dask_expr._reductions import Len
from dask_expr.io import ReadParquet

# Import backend DataFrame library to test
BACKEND = config.get("dataframe.backend", "pandas")
lib = importlib.import_module(BACKEND)
Comment on lines +14 to +16
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phofl - Any opinions on using this approach as a "switch" to test the cudf backend?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly interested in your thoughts as well @mrocklin

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, although it will take me some time to get used to it😂

traveling today, will take a closer look at the whole pr when I am back



def _make_file(dir, format="parquet", df=None):
fn = os.path.join(str(dir), f"myfile.{format}")
if df is None:
df = pd.DataFrame({c: range(10) for c in "abcde"})
df = lib.DataFrame({c: range(10) for c in "abcde"})
if format == "csv":
df.to_csv(fn)
elif format == "parquet":
Expand Down Expand Up @@ -83,7 +88,7 @@ def test_io_fusion(tmpdir, fmt):


def test_predicate_pushdown(tmpdir):
original = pd.DataFrame(
original = lib.DataFrame(
{
"a": [1, 2, 3, 4, 5] * 10,
"b": [0, 1, 2, 3, 4] * 10,
Expand All @@ -106,11 +111,11 @@ def test_predicate_pushdown(tmpdir):
y_result = y.compute()
assert y_result.name == "b"
assert len(y_result) == 6
assert all(y_result == 4)
assert (y_result == 4).all()


def test_predicate_pushdown_compound(tmpdir):
pdf = pd.DataFrame(
pdf = lib.DataFrame(
{
"a": [1, 2, 3, 4, 5] * 10,
"b": [0, 1, 2, 3, 4] * 10,
Expand All @@ -134,15 +139,18 @@ def test_predicate_pushdown_compound(tmpdir):
)

# Test OR
x = df[(df.a == 5) | (df.c > 20)][df.b != 0]["b"]
x = df[(df.a == 5) | (df.c > 20)]
x = x[x.b != 0]["b"]
y = optimize(x, fuse=False)
assert isinstance(y.expr, ReadParquet)
filters = [set(y.filters[0]), set(y.filters[1])]
assert {("c", ">", 20), ("b", "!=", 0)} in filters
assert {("a", "==", 5), ("b", "!=", 0)} in filters
expect = pdf[(pdf.a == 5) | (pdf.c > 20)]
expect = expect[expect.b != 0]["b"]
assert_eq(
y,
pdf[(pdf.a == 5) | (pdf.c > 20)][pdf.b != 0]["b"],
expect,
check_index=False,
)

Expand All @@ -158,7 +166,7 @@ def test_predicate_pushdown_compound(tmpdir):

@pytest.mark.parametrize("fmt", ["parquet", "csv", "pandas"])
def test_io_culling(tmpdir, fmt):
pdf = pd.DataFrame({c: range(10) for c in "abcde"})
pdf = lib.DataFrame({c: range(10) for c in "abcde"})
if fmt == "parquet":
dd.from_pandas(pdf, 2).to_parquet(tmpdir)
df = read_parquet(tmpdir)
Expand Down Expand Up @@ -191,23 +199,24 @@ def _check_culling(expr, partitions):

@pytest.mark.parametrize("sort", [True, False])
def test_from_pandas(sort):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2, sort=sort)

assert df.divisions == (0, 3, 5) if sort else (None,) * 3
assert_eq(df, pdf)


def test_from_pandas_immutable():
pdf = pd.DataFrame({"x": [1, 2, 3, 4]})
pdf = lib.DataFrame({"x": [1, 2, 3, 4]})
expected = pdf.copy()
df = from_pandas(pdf)
pdf["z"] = 100
assert_eq(df, expected)


def test_parquet_complex_filters(tmpdir):
df = read_parquet(_make_file(tmpdir))
with config.set({"dataframe.backend": BACKEND}):
df = read_parquet(_make_file(tmpdir))
pdf = df.compute()
got = df["a"][df["b"] > df["b"].mean()]
expect = pdf["a"][pdf["b"] > pdf["b"].mean()]
Expand Down Expand Up @@ -247,7 +256,7 @@ def test_from_dask_dataframe(optimize):

@pytest.mark.parametrize("optimize", [True, False])
def test_to_dask_dataframe(optimize):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2)
ddf = df.to_dask_dataframe(optimize=optimize)
assert isinstance(ddf, dd.DataFrame)
Expand All @@ -256,7 +265,7 @@ def test_to_dask_dataframe(optimize):

@pytest.mark.parametrize("write_metadata_file", [True, False])
def test_to_parquet(tmpdir, write_metadata_file):
pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]})
df = from_pandas(pdf, npartitions=2)

# Check basic parquet round trip
Expand All @@ -277,7 +286,7 @@ def test_to_parquet(tmpdir, write_metadata_file):


def test_combine_similar(tmpdir):
pdf = pd.DataFrame(
pdf = lib.DataFrame(
{"x": [0, 1, 2, 3] * 4, "y": range(16), "z": [None, 1, 2, 3] * 4}
)
fn = _make_file(tmpdir, format="parquet", df=pdf)
Expand Down
Loading