Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,9 +1595,7 @@ def dot(self, other, meta=no_default):

if isinstance(other, DataFrame):
s = self.map_partitions(M.dot, other, meta=meta)
return s.groupby(by=s.index).apply(
lambda x: x.sum(skipna=False), meta=s._meta_nonempty
)
return s.groupby(by=s.index).apply(_sum_skipna_false, meta=s._meta_nonempty)

return self.map_partitions(_dot_series, other, meta=meta).sum(skipna=False)

Expand Down Expand Up @@ -4052,3 +4050,7 @@ def _compute_partition_stats(
return (mins, maxes, lens)
else:
return (non_empty_mins, non_empty_maxes, lens)


def _sum_skipna_false(part):
return part.sum(skipna=False)
5 changes: 1 addition & 4 deletions dask_expr/io/_delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dask.dataframe.dispatch import make_meta
from dask.dataframe.utils import check_meta
from dask.delayed import Delayed, delayed
from toolz import identity

from dask_expr import new_collection
from dask_expr._expr import Expr, PartitionsFiltered
Expand Down Expand Up @@ -83,10 +84,6 @@ def _filtered_task(self, index: int):
return identity, (key, 0)


def identity(x):
return x


def from_delayed(
dfs: Delayed | distributed.Future | Iterable[Delayed | distributed.Future],
meta=None,
Expand Down
6 changes: 5 additions & 1 deletion dask_expr/tests/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def xfail_gpu(reason=None):
return pytest.mark.xfail(condition, reason=reason)


def assert_eq(a, b, *args, serialize_graph=True, **kwargs):
def assert_eq(a, b, *args, serialize_graph=True, allow_cloudpickle=False, **kwargs):
if serialize_graph:
# Check that no `Expr` instances are found in
# the graph generated by `Expr.dask`
Expand All @@ -30,6 +30,10 @@ def assert_eq(a, b, *args, serialize_graph=True, **kwargs):
try:
pickle.dumps(obj.dask)
except AttributeError:
# Cloudpickle is slower so we should try to avoid it
# Ordinary expressions should all be pickleable
if not allow_cloudpickle:
raise
try:
import cloudpickle as cp

Expand Down
70 changes: 46 additions & 24 deletions dask_expr/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dask.dataframe._compat import PANDAS_GE_210
from dask.dataframe.utils import UNKNOWN_CATEGORIES
from dask.utils import M
from toolz import identity

from dask_expr import (
DataFrame,
Expand Down Expand Up @@ -605,6 +606,18 @@ def test_to_timestamp(pdf, how):
assert_eq(df.x.to_timestamp(how=how), pdf.x.to_timestamp(how=how))


def inc(x):
return x + 1


def inca(x):
return x.a + 1


def mod2is0(x):
return x % 2 == 0


@pytest.mark.parametrize(
"func",
[
Expand All @@ -613,10 +626,10 @@ def test_to_timestamp(pdf, how):
lambda df: df.x.clip(lower=10, upper=50),
lambda df: df.clip(lower=3, upper=7, axis=1),
lambda df: df.x.between(left=10, right=50),
lambda df: df.x.map(lambda x: x + 1),
lambda df: df.x.map(inc),
lambda df: df[df.x > 5],
lambda df: df.assign(a=df.x + df.y, b=df.x - df.y),
lambda df: df.assign(a=df.x + df.y, b=lambda x: x.a + 1),
lambda df: df.assign(a=df.x + df.y, b=inca),
lambda df: df.replace(to_replace=1, value=1000),
lambda df: df.x.replace(to_replace=1, value=1000),
lambda df: df.isna(),
Expand All @@ -627,17 +640,17 @@ def test_to_timestamp(pdf, how):
lambda df: df.x.isnull(),
lambda df: df.mask(df.x == 10, 42),
lambda df: df.mask(df.x == 10),
lambda df: df.mask(lambda df: df.x % 2 == 0, 42),
lambda df: df.mask(mod2is0, 42),
lambda df: df.mask(df.x == 10, df + 2),
lambda df: df.mask(df.x == 10, lambda df: df + 2),
lambda df: df.mask(df.x == 10, inc),
lambda df: df.x.mask(df.x == 10, 42),
lambda df: df.abs(),
lambda df: df.x.abs(),
lambda df: df.where(df.x == 10, 42),
lambda df: df.where(df.x == 10),
lambda df: df.where(lambda df: df.x % 2 == 0, 42),
lambda df: df.where(mod2is0, 42),
lambda df: df.where(df.x == 10, df + 2),
lambda df: df.where(df.x == 10, lambda df: df + 2),
lambda df: df.where(df.x == 10, inc),
lambda df: df.x.where(df.x == 10, 42),
lambda df: df.rename(columns={"x": "xx"}),
lambda df: df.rename(columns={"x": "xx"}).xx,
Expand Down Expand Up @@ -687,17 +700,17 @@ def test_rename(pdf, df):
assert q.divisions[0] is None
assert_eq(q, pdf.x.rename({1: 2}))

q = df.x.rename(lambda x: x)
q = df.x.rename(identity)
assert q.divisions[0] is None
assert_eq(q, pdf.x.rename(lambda x: x))
assert_eq(q, pdf.x.rename(identity))

q = df.x.rename({1: 2}, sorted_index=True)
assert q.divisions[0] is not None
assert_eq(q, pdf.x.rename({1: 2}))

q = df.x.rename(lambda x: x, sorted_index=True)
q = df.x.rename(identity, sorted_index=True)
assert q.divisions[0] is not None
assert_eq(q, pdf.x.rename(lambda x: x))
assert_eq(q, pdf.x.rename(identity))

with pytest.raises(ValueError, match="non-monotonic"):
df.x.rename({0: 200}, sorted_index=True).divisions
Expand Down Expand Up @@ -794,14 +807,18 @@ def test_drop_not_implemented(pdf, df):
df.drop(axis=0, labels=[0])


def _apply_func(row, x, y=10):
return row * x + y


@xfail_gpu("func not supported by cudf")
@pytest.mark.parametrize(
"func",
[
lambda df: df.apply(lambda row, x, y=10: row * x + y, x=2, axis=1),
lambda df: df.index.map(lambda x: x + 1),
lambda df: df.apply(_apply_func, x=2, axis=1),
lambda df: df.index.map(inc),
pytest.param(
lambda df: df.map(lambda x: x + 1),
lambda df: df.map(inc),
marks=pytest.mark.skipif(
not PANDAS_GE_210, reason="Only available from 2.1"
),
Expand All @@ -815,8 +832,8 @@ def test_blockwise_pandas_only(func, pdf, df):


def test_map_meta(pdf, df):
expected = pdf.x.map(lambda x: x + 1)
result = df.x.map(lambda x: x + 1, meta=expected.iloc[:0])
expected = pdf.x.map(inc)
result = df.x.map(inc, meta=expected.iloc[:0])
assert_eq(result, expected)

result = df.x.map(df.x + 1, meta=expected.iloc[:0])
Expand All @@ -830,7 +847,7 @@ def test_map_meta(pdf, df):

pdf.index.name = "a"
df = from_pandas(pdf, npartitions=10)
result = df.x.map(lambda x: x + 1, meta=("x", "int64"))
result = df.x.map(inc, meta=("x", "int64"))
assert_eq(result, expected)


Expand Down Expand Up @@ -1347,13 +1364,14 @@ def test_enforce_runtime_divisions():
assert_eq(pdf, ddf.enforce_runtime_divisions())


def return_df(x):
return pd.Series([x.sum(), x.mean()], index=["sum", "mean"])


def test_apply_infer_columns():
df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]})
ddf = from_pandas(df, npartitions=2)

def return_df(x):
return pd.Series([x.sum(), x.mean()], index=["sum", "mean"])

result = ddf.apply(return_df, axis=1)
assert_eq(result.columns, pd.Index(["sum", "mean"]))
assert_eq(result, df.apply(return_df, axis=1))
Expand Down Expand Up @@ -2076,6 +2094,15 @@ def test_quantile_frame(df, pdf):
assert_eq(df.quantile(axis=1), pdf.quantile(axis=1))


def first(a, b):
return a


def str_add(a, b):
# You can add series with strings and nans but you can't add scalars 'a' + np.nan
return a + b if a is not np.nan else a


def test_combine():
df1 = pd.DataFrame(
{
Expand All @@ -2093,11 +2120,6 @@ def test_combine():
ddf1 = from_pandas(df1, 4)
ddf2 = from_pandas(df2, 5)

first = lambda a, b: a

# You can add series with strings and nans but you can't add scalars 'a' + np.nan
str_add = lambda a, b: a + b if a is not np.nan else a

# DataFrame
for dda, ddb, a, b, runs in [
# (ddf1, ddf2, df1, df2, [(add, None), (first, None)]),
Expand Down
Loading