diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index aeaba07cd..005f2500d 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -1088,9 +1088,10 @@ def read_parquet( aggregate_files=None, parquet_file_extension=(".parq", ".parquet", ".pq"), filesystem="fsspec", + engine=None, **kwargs, ): - from dask_expr.io.parquet import ReadParquet + from dask_expr.io.parquet import ReadParquet, _set_parquet_engine if not isinstance(path, str): path = stringify_path(path) @@ -1113,6 +1114,7 @@ def read_parquet( aggregate_files=aggregate_files, parquet_file_extension=parquet_file_extension, filesystem=filesystem, + engine=_set_parquet_engine(engine), kwargs=kwargs, ) ) diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index d7b6c55da..6390450bc 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -54,6 +54,18 @@ def __init__(self, *args, **kwargs): operands.append(type(self)._defaults[parameter]) assert not kwargs self.operands = operands + if self._required_attribute: + dep = next(iter(self.dependencies()))._meta + if not hasattr(dep, self._required_attribute): + # Raise a ValueError instead of AttributeError to + # avoid infinite recursion + raise ValueError(f"{dep} has no attribute {self._required_attribute}") + + @property + def _required_attribute(self) -> str: + # Specify if the first `dependency` must support + # a specific attribute for valid behavior. + return None @functools.cached_property def ndim(self): @@ -941,6 +953,12 @@ class Blockwise(Expr): _keyword_only = [] _projection_passthrough = False + @property + def _required_attribute(self): + if isinstance(self.operation, type(M.method_caller)): + return self.operation.method + return None + @functools.cached_property def _meta(self): args = [op._meta if isinstance(op, Expr) else op for op in self._args] @@ -1027,7 +1045,13 @@ def _combine_similar(self, root: Expr): # Push projections back up through `_projection_passthrough` # operations if it reduces the number of unique expression nodes. if self._projection_passthrough and isinstance(self.frame, Projection): - common = type(self)(self.frame.frame, *self.operands[1:]) + try: + common = type(self)(self.frame.frame, *self.operands[1:]) + except ValueError: + # May have encountered a problem with `_required_attribute`. + # (There is no guarentee that the same method will exist for + # both a Series and DataFrame) + return None projection = self.frame.operand("columns") push_up_projection = False for op in self._find_similar_operations(root, ignore=self._parameters): diff --git a/dask_expr/_reductions.py b/dask_expr/_reductions.py index c63e10379..2b679c493 100644 --- a/dask_expr/_reductions.py +++ b/dask_expr/_reductions.py @@ -488,6 +488,7 @@ class NBytes(Reduction): # Only supported for Series objects reduction_chunk = lambda ser: ser.nbytes reduction_aggregate = sum + _required_attribute = "nbytes" class Var(Reduction): @@ -519,7 +520,7 @@ def aggregate_kwargs(self): @classmethod def reduction_chunk(cls, x, skipna=True, numeric_only=False): kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {} - if skipna: + if skipna or numeric_only: n = x.count(**kwargs) kwargs["skipna"] = skipna avg = x.mean(**kwargs) @@ -529,6 +530,11 @@ def reduction_chunk(cls, x, skipna=True, numeric_only=False): n = len(x) kwargs["skipna"] = skipna avg = x.sum(**kwargs) / n + if numeric_only: + # Workaround for cudf bug + # (see: https://github.com/rapidsai/cudf/issues/13731) + x = x.select_dtypes("number") + n = n.loc[x.columns] m2 = ((x - avg) ** 2).sum(**kwargs) return n, avg, m2 diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 04cd2250e..8f9927f07 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -24,7 +24,7 @@ from dask.dataframe.io.parquet.utils import _split_user_options from dask.dataframe.io.utils import _is_local_fs from dask.delayed import delayed -from dask.utils import apply, natural_sort_key +from dask.utils import apply, natural_sort_key, typename from fsspec.utils import stringify_path from dask_expr._expr import ( @@ -157,7 +157,6 @@ def _layer(self): def to_parquet( df, path, - engine="pyarrow", compression="snappy", write_index=True, append=False, @@ -177,6 +176,7 @@ def to_parquet( from dask_expr._collection import new_collection from dask_expr.io.parquet import NONE_LABEL, ToParquet + engine = _set_parquet_engine(meta=df._meta) compute_kwargs = compute_kwargs or {} partition_on = partition_on or [] @@ -391,6 +391,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "aggregate_files", "parquet_file_extension", "filesystem", + "engine", "kwargs", "_partitions", "_series", @@ -409,6 +410,7 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): "aggregate_files": None, "parquet_file_extension": (".parq", ".parquet", ".pq"), "filesystem": "fsspec", + "engine": "pyarrow", "kwargs": None, "_partitions": None, "_series": False, @@ -417,7 +419,10 @@ class ReadParquet(PartitionsFiltered, BlockwiseIO): @property def engine(self): - return get_engine("pyarrow") + _engine = self.operand("engine") + if isinstance(_engine, str): + return get_engine(_engine) + return _engine @property def columns(self): @@ -681,6 +686,20 @@ def _update_length_statistics(self): # +def _set_parquet_engine(engine=None, meta=None): + # Use `engine` or `meta` input to set the parquet engine + if engine is None: + if ( + meta is not None and typename(meta).split(".")[0] == "cudf" + ) or dask.config.get("dataframe.backend", "pandas") == "cudf": + from dask_cudf.io.parquet import CudfEngine + + engine = CudfEngine + else: + engine = "pyarrow" + return engine + + def _align_statistics(parts, statistics): # Make sure parts and statistics are aligned # (if statistics is not empty) diff --git a/dask_expr/io/tests/test_io.py b/dask_expr/io/tests/test_io.py index 8abbba3f0..68c2d7363 100644 --- a/dask_expr/io/tests/test_io.py +++ b/dask_expr/io/tests/test_io.py @@ -1,8 +1,9 @@ +import importlib import os import dask.dataframe as dd -import pandas as pd import pytest +from dask import config from dask.dataframe.utils import assert_eq from dask_expr import from_dask_dataframe, from_pandas, optimize, read_csv, read_parquet @@ -10,11 +11,15 @@ from dask_expr._reductions import Len from dask_expr.io import ReadParquet +# Import backend DataFrame library to test +BACKEND = config.get("dataframe.backend", "pandas") +lib = importlib.import_module(BACKEND) + def _make_file(dir, format="parquet", df=None): fn = os.path.join(str(dir), f"myfile.{format}") if df is None: - df = pd.DataFrame({c: range(10) for c in "abcde"}) + df = lib.DataFrame({c: range(10) for c in "abcde"}) if format == "csv": df.to_csv(fn) elif format == "parquet": @@ -83,7 +88,7 @@ def test_io_fusion(tmpdir, fmt): def test_predicate_pushdown(tmpdir): - original = pd.DataFrame( + original = lib.DataFrame( { "a": [1, 2, 3, 4, 5] * 10, "b": [0, 1, 2, 3, 4] * 10, @@ -106,11 +111,11 @@ def test_predicate_pushdown(tmpdir): y_result = y.compute() assert y_result.name == "b" assert len(y_result) == 6 - assert all(y_result == 4) + assert (y_result == 4).all() def test_predicate_pushdown_compound(tmpdir): - pdf = pd.DataFrame( + pdf = lib.DataFrame( { "a": [1, 2, 3, 4, 5] * 10, "b": [0, 1, 2, 3, 4] * 10, @@ -134,15 +139,18 @@ def test_predicate_pushdown_compound(tmpdir): ) # Test OR - x = df[(df.a == 5) | (df.c > 20)][df.b != 0]["b"] + x = df[(df.a == 5) | (df.c > 20)] + x = x[x.b != 0]["b"] y = optimize(x, fuse=False) assert isinstance(y.expr, ReadParquet) filters = [set(y.filters[0]), set(y.filters[1])] assert {("c", ">", 20), ("b", "!=", 0)} in filters assert {("a", "==", 5), ("b", "!=", 0)} in filters + expect = pdf[(pdf.a == 5) | (pdf.c > 20)] + expect = expect[expect.b != 0]["b"] assert_eq( y, - pdf[(pdf.a == 5) | (pdf.c > 20)][pdf.b != 0]["b"], + expect, check_index=False, ) @@ -158,7 +166,7 @@ def test_predicate_pushdown_compound(tmpdir): @pytest.mark.parametrize("fmt", ["parquet", "csv", "pandas"]) def test_io_culling(tmpdir, fmt): - pdf = pd.DataFrame({c: range(10) for c in "abcde"}) + pdf = lib.DataFrame({c: range(10) for c in "abcde"}) if fmt == "parquet": dd.from_pandas(pdf, 2).to_parquet(tmpdir) df = read_parquet(tmpdir) @@ -191,7 +199,7 @@ def _check_culling(expr, partitions): @pytest.mark.parametrize("sort", [True, False]) def test_from_pandas(sort): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) df = from_pandas(pdf, npartitions=2, sort=sort) assert df.divisions == (0, 3, 5) if sort else (None,) * 3 @@ -199,7 +207,7 @@ def test_from_pandas(sort): def test_from_pandas_immutable(): - pdf = pd.DataFrame({"x": [1, 2, 3, 4]}) + pdf = lib.DataFrame({"x": [1, 2, 3, 4]}) expected = pdf.copy() df = from_pandas(pdf) pdf["z"] = 100 @@ -207,7 +215,8 @@ def test_from_pandas_immutable(): def test_parquet_complex_filters(tmpdir): - df = read_parquet(_make_file(tmpdir)) + with config.set({"dataframe.backend": BACKEND}): + df = read_parquet(_make_file(tmpdir)) pdf = df.compute() got = df["a"][df["b"] > df["b"].mean()] expect = pdf["a"][pdf["b"] > pdf["b"].mean()] @@ -247,7 +256,7 @@ def test_from_dask_dataframe(optimize): @pytest.mark.parametrize("optimize", [True, False]) def test_to_dask_dataframe(optimize): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) df = from_pandas(pdf, npartitions=2) ddf = df.to_dask_dataframe(optimize=optimize) assert isinstance(ddf, dd.DataFrame) @@ -256,7 +265,7 @@ def test_to_dask_dataframe(optimize): @pytest.mark.parametrize("write_metadata_file", [True, False]) def test_to_parquet(tmpdir, write_metadata_file): - pdf = pd.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) + pdf = lib.DataFrame({"x": [1, 4, 3, 2, 0, 5]}) df = from_pandas(pdf, npartitions=2) # Check basic parquet round trip @@ -277,7 +286,7 @@ def test_to_parquet(tmpdir, write_metadata_file): def test_combine_similar(tmpdir): - pdf = pd.DataFrame( + pdf = lib.DataFrame( {"x": [0, 1, 2, 3] * 4, "y": range(16), "z": [None, 1, 2, 3] * 4} ) fn = _make_file(tmpdir, format="parquet", df=pdf) diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index 80772ab54..f231dbbbf 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -1,10 +1,11 @@ +from __future__ import annotations + +import importlib import operator import pickle -import re import dask import numpy as np -import pandas as pd import pytest from dask.dataframe._compat import PANDAS_GE_210 from dask.dataframe.utils import UNKNOWN_CATEGORIES, assert_eq @@ -15,10 +16,15 @@ from dask_expr._reductions import Len from dask_expr.datasets import timeseries +# Import backend DataFrame library to test +BACKEND = dask.config.get("dataframe.backend", "pandas") +CUDF_BACKEND = BACKEND == "cudf" +lib = importlib.import_module(BACKEND) + @pytest.fixture def pdf(): - pdf = pd.DataFrame({"x": range(100)}) + pdf = lib.DataFrame({"x": range(100)}) pdf["y"] = pdf.x * 10.0 yield pdf @@ -47,13 +53,15 @@ def test_setitem(pdf, df): assert_eq(df, pdf) +@pytest.mark.xfail(CUDF_BACKEND, reason="https://github.com/rapidsai/cudf/issues/10271") def test_explode(): - pdf = pd.DataFrame({"a": [[1, 2], [3, 4]]}) + pdf = lib.DataFrame({"a": [[1, 2], [3, 4]]}) df = from_pandas(pdf) assert_eq(pdf.explode(column="a"), df.explode(column="a")) assert_eq(pdf.a.explode(), df.a.explode()) +@pytest.mark.xfail(CUDF_BACKEND, reason="https://github.com/rapidsai/cudf/issues/10271") def test_explode_simplify(pdf): pdf["z"] = 1 df = from_pandas(pdf) @@ -64,7 +72,7 @@ def test_explode_simplify(pdf): def test_meta_divisions_name(): - a = pd.DataFrame({"x": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) + a = lib.DataFrame({"x": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) df = 2 * from_pandas(a, npartitions=2) assert list(df.columns) == list(a.columns) assert df.npartitions == 2 @@ -77,8 +85,8 @@ def test_meta_divisions_name(): def test_meta_blockwise(): - a = pd.DataFrame({"x": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) - b = pd.DataFrame({"z": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) + a = lib.DataFrame({"x": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) + b = lib.DataFrame({"z": [1, 2, 3, 4], "y": [1.0, 2.0, 3.0, 4.0]}) aa = from_pandas(a, npartitions=2) bb = from_pandas(b, npartitions=2) @@ -116,6 +124,8 @@ def test_dask(pdf, df): ], ) def test_reductions(func, pdf, df): + if CUDF_BACKEND and func in [M.idxmin, M.idxmax]: + pytest.xfail(reason="https://github.com/rapidsai/cudf/issues/9602") result = func(df) assert result.known_divisions assert_eq(result, func(pdf)) @@ -131,7 +141,9 @@ def test_reductions(func, pdf, df): @pytest.mark.parametrize("skipna", [True, False]) @pytest.mark.parametrize("ddof", [1, 2]) def test_std_kwargs(axis, skipna, ddof): - pdf = pd.DataFrame( + if CUDF_BACKEND and skipna is False: + pytest.xfail(reason="cudf requires skipna=True when nulls are present.") + pdf = lib.DataFrame( {"x": range(30), "y": [1, 2, None] * 10, "z": ["dog", "cat"] * 15} ) df = from_pandas(pdf, npartitions=3) @@ -141,6 +153,7 @@ def test_std_kwargs(axis, skipna, ddof): ) +@pytest.mark.xfail(CUDF_BACKEND, reason="nbytes not supported by cudf") def test_nbytes(pdf, df): with pytest.raises(NotImplementedError, match="nbytes is not implemented"): df.nbytes @@ -148,7 +161,7 @@ def test_nbytes(pdf, df): def test_mode(): - pdf = pd.DataFrame({"x": [1, 2, 3, 1, 2]}) + pdf = lib.DataFrame({"x": [1, 2, 3, 1, 2]}) df = from_pandas(pdf, npartitions=3) assert_eq(df.x.mode(), pdf.x.mode(), check_names=False) @@ -159,7 +172,7 @@ def test_value_counts(df, pdf): AttributeError, match="'DataFrame' object has no attribute 'value_counts'" ): df.value_counts() - assert_eq(df.x.value_counts(), pdf.x.value_counts()) + assert_eq(df.x.value_counts(), pdf.x.value_counts().astype("int64")) def test_dropna(pdf): @@ -171,7 +184,7 @@ def test_dropna(pdf): def test_fillna(): - pdf = pd.DataFrame({"x": [1, 2, None, None, 5, 6]}) + pdf = lib.DataFrame({"x": [1, 2, None, None, 5, 6]}) df = from_pandas(pdf, npartitions=2) actual = df.fillna(value=100) expected = pdf.fillna(value=100) @@ -230,7 +243,7 @@ def test_conditionals(func, pdf, df): ], ) def test_boolean_operators(func): - pdf = pd.DataFrame( + pdf = lib.DataFrame( {"x": [True, False, True, False], "y": [True, False, False, False]} ) df = from_pandas(pdf) @@ -249,7 +262,7 @@ def test_boolean_operators(func): ], ) def test_unary_operators(func): - pdf = pd.DataFrame( + pdf = lib.DataFrame( {"x": [True, False, True, False], "y": [True, False, False, False], "z": 1} ) df = from_pandas(pdf) @@ -267,9 +280,10 @@ def test_and_or(func, pdf, df): assert_eq(func(pdf), func(df), check_names=False) +@pytest.mark.xfail(CUDF_BACKEND, reason="period_range not supported by cudf") @pytest.mark.parametrize("how", ["start", "end"]) def test_to_timestamp(pdf, how): - pdf.index = pd.period_range("2019-12-31", freq="D", periods=len(pdf)) + pdf.index = lib.period_range("2019-12-31", freq="D", periods=len(pdf)) df = from_pandas(pdf) assert_eq(df.to_timestamp(how=how), pdf.to_timestamp(how=how)) assert_eq(df.x.to_timestamp(how=how), pdf.x.to_timestamp(how=how)) @@ -279,18 +293,10 @@ def test_to_timestamp(pdf, how): "func", [ lambda df: df.astype(int), - lambda df: df.apply(lambda row, x, y=10: row * x + y, x=2), - pytest.param( - lambda df: df.map(lambda x: x + 1), - marks=pytest.mark.skipif( - not PANDAS_GE_210, reason="Only available from 2.1" - ), - ), lambda df: df.clip(lower=10, upper=50), lambda df: df.x.clip(lower=10, upper=50), lambda df: df.x.between(left=10, right=50), lambda df: df.x.map(lambda x: x + 1), - lambda df: df.index.map(lambda x: x + 1), lambda df: df[df.x > 5], lambda df: df.assign(a=df.x + df.y, b=df.x - df.y), lambda df: df.replace(to_replace=1, value=1000), @@ -302,8 +308,6 @@ def test_to_timestamp(pdf, how): lambda df: df.rename(columns={"x": "xx"}), lambda df: df.rename(columns={"x": "xx"}).xx, lambda df: df.rename(columns={"x": "xx"})[["xx"]], - lambda df: df.combine_first(df), - lambda df: df.x.combine_first(df.y), lambda df: df.x.to_frame(), lambda df: df.drop(columns="x"), lambda df: df.x.index.to_frame(), @@ -317,6 +321,26 @@ def test_blockwise(func, pdf, df): assert_eq(func(pdf), func(df)) +@pytest.mark.xfail(CUDF_BACKEND, reason="func not supported by cudf") +@pytest.mark.parametrize( + "func", + [ + lambda df: df.apply(lambda row, x, y=10: row * x + y, x=2), + lambda df: df.index.map(lambda x: x + 1), + pytest.param( + lambda df: df.map(lambda x: x + 1), + marks=pytest.mark.skipif( + not PANDAS_GE_210, reason="Only available from 2.1" + ), + ), + lambda df: df.combine_first(df), + lambda df: df.x.combine_first(df.y), + ], +) +def test_blockwise_pandas_only(func, pdf, df): + assert_eq(func(pdf), func(df)) + + def test_simplify_add_suffix_add_prefix(df, pdf): result = df.add_prefix("2_")["2_x"].simplify() expected = df[["x"]].add_prefix("2_")["2_x"] @@ -329,6 +353,7 @@ def test_simplify_add_suffix_add_prefix(df, pdf): assert_eq(result, pdf.add_suffix("_2")["x_2"]) +@pytest.mark.xfail(CUDF_BACKEND, reason="rename_axis not supported by cudf") def test_rename_axis(pdf): pdf.index.name = "a" pdf.columns.name = "b" @@ -361,6 +386,7 @@ def test_repr(df): assert "sum(skipna=False)" in s +@pytest.mark.xfail(CUDF_BACKEND, reason="combine_first not supported by cudf") def test_combine_first_simplify(pdf): df = from_pandas(pdf) pdf2 = pdf.rename(columns={"y": "z"}) @@ -379,7 +405,7 @@ def test_rename_traverse_filter(df): assert str(result) == str(expected) -def test_columns_traverse_filters(pdf, df): +def test_columns_traverse_filters(df): result = df[df.x > 5].y.simplify() expected = df.y[df.x > 5] @@ -538,8 +564,8 @@ def test_remove_unnecessary_projections(df): assert optimized._name == expected._name -def test_substitute(df): - pdf = pd.DataFrame( +def test_substitute(): + pdf = lib.DataFrame( { "a": range(100), "b": range(100), @@ -571,7 +597,7 @@ def test_substitute(df): def test_substitute_parameters(df): - pdf = pd.DataFrame( + pdf = lib.DataFrame( { "a": range(100), "b": range(100), @@ -601,7 +627,7 @@ def test_from_pandas(pdf): assert "pandas" in df._name -def test_copy(pdf, df): +def test_copy(df): original = df.copy() columns = tuple(original.columns) @@ -655,6 +681,7 @@ def test_serialization(pdf, df): assert_eq(pickle.loads(before), pickle.loads(after)) +@pytest.mark.xfail(CUDF_BACKEND, reason="Cannot apply lambda function in cudf") def test_size_optimized(df): expr = (df.x + 1).apply(lambda x: x).size out = optimize(expr) @@ -668,9 +695,12 @@ def test_size_optimized(df): @pytest.mark.parametrize("fuse", [True, False]) -def test_tree_repr(df, fuse): - s = df.expr.tree_repr() - assert "" in s +def test_tree_repr(fuse): + s = from_pandas(lib.Series(range(10))).expr.tree_repr() + if BACKEND == "pandas": + assert "" in s + else: + assert "" in s df = timeseries() expr = ((df.x + 1).sum(skipna=False) + df.y.mean()).expr @@ -730,9 +760,9 @@ def combine_x_y(x, y, val, foo=None): @pytest.mark.parametrize("opt", [True, False]) def test_map_partitions_merge(opt): # Make simple left & right dfs - pdf1 = pd.DataFrame({"x": range(20), "y": range(20)}) + pdf1 = lib.DataFrame({"x": range(20), "y": range(20)}) df1 = from_pandas(pdf1, 2) - pdf2 = pd.DataFrame({"x": range(0, 20, 2), "z": range(10)}) + pdf2 = lib.DataFrame({"x": range(0, 20, 2), "z": range(10)}) df2 = from_pandas(pdf2, 1) # Partition-wise merge with map_partitions @@ -832,7 +862,7 @@ def test_drop_duplicates(df, pdf): assert_eq(df.drop_duplicates(subset=["x"]), pdf.drop_duplicates(subset=["x"])) assert_eq(df.x.drop_duplicates(), pdf.x.drop_duplicates()) - with pytest.raises(KeyError, match=re.escape("Index(['a'], dtype='object')")): + with pytest.raises(KeyError, match="'a'"): df.drop_duplicates(subset=["a"]) with pytest.raises(TypeError, match="got an unexpected keyword argument"): @@ -846,8 +876,8 @@ def test_unique(df, pdf): df.unique() # pandas returns a numpy array while we return a Series/Index - assert_eq(df.x.unique(), pd.Series(pdf.x.unique(), name="x")) - assert_eq(df.index.unique(), pd.Index(pdf.index.unique())) + assert_eq(df.x.unique(), lib.Series(pdf.x.unique(), name="x")) + assert_eq(df.index.unique(), lib.Index(pdf.index.unique())) def test_walk(df): @@ -933,6 +963,7 @@ def test_sample(df): assert_eq(result, expected) +@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") def test_align(df, pdf): result_1, result_2 = df.align(df) pdf_result_1, pdf_result_2 = pdf.align(pdf) @@ -945,10 +976,11 @@ def test_align(df, pdf): assert_eq(result_2, pdf_result_2) +@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") def test_align_different_partitions(): - pdf = pd.DataFrame({"a": [11, 12, 31, 1, 2, 3], "b": [1, 2, 3, 4, 5, 6]}) + pdf = lib.DataFrame({"a": [11, 12, 31, 1, 2, 3], "b": [1, 2, 3, 4, 5, 6]}) df = from_pandas(pdf, npartitions=2) - pdf2 = pd.DataFrame( + pdf2 = lib.DataFrame( {"a": [11, 12, 31, 1, 2, 3], "b": [1, 2, 3, 4, 5, 6]}, index=[-2, -1, 0, 1, 2, 3], ) @@ -959,8 +991,9 @@ def test_align_different_partitions(): assert_eq(result_2, pdf_result_2) +@pytest.mark.xfail(CUDF_BACKEND, reason="align not supported by cudf") def test_align_unknown_partitions_same_root(): - pdf = pd.DataFrame({"a": 1}, index=[3, 2, 1]) + pdf = lib.DataFrame({"a": 1}, index=[3, 2, 1]) df = from_pandas(pdf, npartitions=2, sort=False) result_1, result_2 = df.align(df) pdf_result_1, pdf_result_2 = pdf.align(pdf) @@ -968,15 +1001,17 @@ def test_align_unknown_partitions_same_root(): assert_eq(result_2, pdf_result_2) +@pytest.mark.skipif(CUDF_BACKEND, reason="align not supported by cudf") def test_unknown_partitions_different_root(): - pdf = pd.DataFrame({"a": 1}, index=[3, 2, 1]) + pdf = lib.DataFrame({"a": 1}, index=[3, 2, 1]) df = from_pandas(pdf, npartitions=2, sort=False) - pdf2 = pd.DataFrame({"a": 1}, index=[4, 3, 2, 1]) + pdf2 = lib.DataFrame({"a": 1}, index=[4, 3, 2, 1]) df2 = from_pandas(pdf2, npartitions=2, sort=False) with pytest.raises(ValueError, match="Not all divisions"): df.align(df2) +@pytest.mark.xfail(CUDF_BACKEND, reason="compute_hll_array doesn't work for cudf") def test_nunique_approx(df): result = df.nunique_approx().compute() assert 99 < result < 101 @@ -1015,6 +1050,7 @@ def test_assign_simplify_series(pdf): assert result._name == expected._name +@pytest.mark.xfail(CUDF_BACKEND, reason="assign function not supported by cudf") def test_assign_non_series_inputs(df, pdf): assert_eq(df.assign(a=lambda x: x.x * 2), pdf.assign(a=lambda x: x.x * 2)) assert_eq(df.assign(a=2), pdf.assign(a=2)) @@ -1045,13 +1081,14 @@ def test_are_co_aligned(pdf, df): assert not are_co_aligned(merged_first.expr, df.expr) +@pytest.mark.xfail(CUDF_BACKEND, reason="TODO") def test_astype_categories(df): result = df.astype("category") - assert_eq(result.x._meta.cat.categories, pd.Index([UNKNOWN_CATEGORIES])) - assert_eq(result.y._meta.cat.categories, pd.Index([UNKNOWN_CATEGORIES])) + assert_eq(result.x._meta.cat.categories, lib.Index([UNKNOWN_CATEGORIES])) + assert_eq(result.y._meta.cat.categories, lib.Index([UNKNOWN_CATEGORIES])) -def test_drop_simplify(pdf, df): +def test_drop_simplify(df): q = df.drop(columns=["x"])[["y"]] result = q.simplify() expected = df[["y"]] @@ -1059,10 +1096,10 @@ def test_drop_simplify(pdf, df): def test_op_align(): - pdf = pd.DataFrame({"x": [1, 2, 3], "y": 1}) + pdf = lib.DataFrame({"x": [1, 2, 3], "y": 1}) df = from_pandas(pdf, npartitions=2) - pdf2 = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": 1}) + pdf2 = lib.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": 1}) df2 = from_pandas(pdf2, npartitions=2) assert_eq(df - df2, pdf - pdf2) @@ -1082,10 +1119,10 @@ def test_can_co_align(df, pdf): def test_avoid_alignment(): from dask_expr._align import AlignPartitions - a = pd.DataFrame({"x": range(100)}) + a = lib.DataFrame({"x": range(100)}) da = from_pandas(a, npartitions=4) - b = pd.DataFrame({"y": range(100)}) + b = lib.DataFrame({"y": range(100)}) b["z"] = b.y * 2 db = from_pandas(b, npartitions=3)