diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index ac328348e..f10e188af 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -1595,9 +1595,7 @@ def dot(self, other, meta=no_default): if isinstance(other, DataFrame): s = self.map_partitions(M.dot, other, meta=meta) - return s.groupby(by=s.index).apply( - lambda x: x.sum(skipna=False), meta=s._meta_nonempty - ) + return s.groupby(by=s.index).apply(_sum_skipna_false, meta=s._meta_nonempty) return self.map_partitions(_dot_series, other, meta=meta).sum(skipna=False) @@ -4052,3 +4050,7 @@ def _compute_partition_stats( return (mins, maxes, lens) else: return (non_empty_mins, non_empty_maxes, lens) + + +def _sum_skipna_false(part): + return part.sum(skipna=False) diff --git a/dask_expr/io/_delayed.py b/dask_expr/io/_delayed.py index cc2aa4ed3..067d15605 100644 --- a/dask_expr/io/_delayed.py +++ b/dask_expr/io/_delayed.py @@ -7,6 +7,7 @@ from dask.dataframe.dispatch import make_meta from dask.dataframe.utils import check_meta from dask.delayed import Delayed, delayed +from toolz import identity from dask_expr import new_collection from dask_expr._expr import Expr, PartitionsFiltered @@ -83,10 +84,6 @@ def _filtered_task(self, index: int): return identity, (key, 0) -def identity(x): - return x - - def from_delayed( dfs: Delayed | distributed.Future | Iterable[Delayed | distributed.Future], meta=None, diff --git a/dask_expr/tests/_util.py b/dask_expr/tests/_util.py index 1f24bfda1..eda5935fe 100644 --- a/dask_expr/tests/_util.py +++ b/dask_expr/tests/_util.py @@ -20,7 +20,7 @@ def xfail_gpu(reason=None): return pytest.mark.xfail(condition, reason=reason) -def assert_eq(a, b, *args, serialize_graph=True, **kwargs): +def assert_eq(a, b, *args, serialize_graph=True, allow_cloudpickle=False, **kwargs): if serialize_graph: # Check that no `Expr` instances are found in # the graph generated by `Expr.dask` @@ -30,6 +30,10 @@ def assert_eq(a, b, *args, serialize_graph=True, **kwargs): try: pickle.dumps(obj.dask) except AttributeError: + # Cloudpickle is slower so we should try to avoid it + # Ordinary expressions should all be pickleable + if not allow_cloudpickle: + raise try: import cloudpickle as cp diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index d1d80e7c7..99cde3af6 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -14,6 +14,7 @@ from dask.dataframe._compat import PANDAS_GE_210 from dask.dataframe.utils import UNKNOWN_CATEGORIES from dask.utils import M +from toolz import identity from dask_expr import ( DataFrame, @@ -605,6 +606,18 @@ def test_to_timestamp(pdf, how): assert_eq(df.x.to_timestamp(how=how), pdf.x.to_timestamp(how=how)) +def inc(x): + return x + 1 + + +def inca(x): + return x.a + 1 + + +def mod2is0(x): + return x % 2 == 0 + + @pytest.mark.parametrize( "func", [ @@ -613,10 +626,10 @@ def test_to_timestamp(pdf, how): lambda df: df.x.clip(lower=10, upper=50), lambda df: df.clip(lower=3, upper=7, axis=1), lambda df: df.x.between(left=10, right=50), - lambda df: df.x.map(lambda x: x + 1), + lambda df: df.x.map(inc), lambda df: df[df.x > 5], lambda df: df.assign(a=df.x + df.y, b=df.x - df.y), - lambda df: df.assign(a=df.x + df.y, b=lambda x: x.a + 1), + lambda df: df.assign(a=df.x + df.y, b=inca), lambda df: df.replace(to_replace=1, value=1000), lambda df: df.x.replace(to_replace=1, value=1000), lambda df: df.isna(), @@ -627,17 +640,17 @@ def test_to_timestamp(pdf, how): lambda df: df.x.isnull(), lambda df: df.mask(df.x == 10, 42), lambda df: df.mask(df.x == 10), - lambda df: df.mask(lambda df: df.x % 2 == 0, 42), + lambda df: df.mask(mod2is0, 42), lambda df: df.mask(df.x == 10, df + 2), - lambda df: df.mask(df.x == 10, lambda df: df + 2), + lambda df: df.mask(df.x == 10, inc), lambda df: df.x.mask(df.x == 10, 42), lambda df: df.abs(), lambda df: df.x.abs(), lambda df: df.where(df.x == 10, 42), lambda df: df.where(df.x == 10), - lambda df: df.where(lambda df: df.x % 2 == 0, 42), + lambda df: df.where(mod2is0, 42), lambda df: df.where(df.x == 10, df + 2), - lambda df: df.where(df.x == 10, lambda df: df + 2), + lambda df: df.where(df.x == 10, inc), lambda df: df.x.where(df.x == 10, 42), lambda df: df.rename(columns={"x": "xx"}), lambda df: df.rename(columns={"x": "xx"}).xx, @@ -687,17 +700,17 @@ def test_rename(pdf, df): assert q.divisions[0] is None assert_eq(q, pdf.x.rename({1: 2})) - q = df.x.rename(lambda x: x) + q = df.x.rename(identity) assert q.divisions[0] is None - assert_eq(q, pdf.x.rename(lambda x: x)) + assert_eq(q, pdf.x.rename(identity)) q = df.x.rename({1: 2}, sorted_index=True) assert q.divisions[0] is not None assert_eq(q, pdf.x.rename({1: 2})) - q = df.x.rename(lambda x: x, sorted_index=True) + q = df.x.rename(identity, sorted_index=True) assert q.divisions[0] is not None - assert_eq(q, pdf.x.rename(lambda x: x)) + assert_eq(q, pdf.x.rename(identity)) with pytest.raises(ValueError, match="non-monotonic"): df.x.rename({0: 200}, sorted_index=True).divisions @@ -794,14 +807,18 @@ def test_drop_not_implemented(pdf, df): df.drop(axis=0, labels=[0]) +def _apply_func(row, x, y=10): + return row * x + y + + @xfail_gpu("func not supported by cudf") @pytest.mark.parametrize( "func", [ - lambda df: df.apply(lambda row, x, y=10: row * x + y, x=2, axis=1), - lambda df: df.index.map(lambda x: x + 1), + lambda df: df.apply(_apply_func, x=2, axis=1), + lambda df: df.index.map(inc), pytest.param( - lambda df: df.map(lambda x: x + 1), + lambda df: df.map(inc), marks=pytest.mark.skipif( not PANDAS_GE_210, reason="Only available from 2.1" ), @@ -815,8 +832,8 @@ def test_blockwise_pandas_only(func, pdf, df): def test_map_meta(pdf, df): - expected = pdf.x.map(lambda x: x + 1) - result = df.x.map(lambda x: x + 1, meta=expected.iloc[:0]) + expected = pdf.x.map(inc) + result = df.x.map(inc, meta=expected.iloc[:0]) assert_eq(result, expected) result = df.x.map(df.x + 1, meta=expected.iloc[:0]) @@ -830,7 +847,7 @@ def test_map_meta(pdf, df): pdf.index.name = "a" df = from_pandas(pdf, npartitions=10) - result = df.x.map(lambda x: x + 1, meta=("x", "int64")) + result = df.x.map(inc, meta=("x", "int64")) assert_eq(result, expected) @@ -1347,13 +1364,14 @@ def test_enforce_runtime_divisions(): assert_eq(pdf, ddf.enforce_runtime_divisions()) +def return_df(x): + return pd.Series([x.sum(), x.mean()], index=["sum", "mean"]) + + def test_apply_infer_columns(): df = pd.DataFrame({"x": [1, 2, 3, 4], "y": [10, 20, 30, 40]}) ddf = from_pandas(df, npartitions=2) - def return_df(x): - return pd.Series([x.sum(), x.mean()], index=["sum", "mean"]) - result = ddf.apply(return_df, axis=1) assert_eq(result.columns, pd.Index(["sum", "mean"])) assert_eq(result, df.apply(return_df, axis=1)) @@ -2076,6 +2094,15 @@ def test_quantile_frame(df, pdf): assert_eq(df.quantile(axis=1), pdf.quantile(axis=1)) +def first(a, b): + return a + + +def str_add(a, b): + # You can add series with strings and nans but you can't add scalars 'a' + np.nan + return a + b if a is not np.nan else a + + def test_combine(): df1 = pd.DataFrame( { @@ -2093,11 +2120,6 @@ def test_combine(): ddf1 = from_pandas(df1, 4) ddf2 = from_pandas(df2, 5) - first = lambda a, b: a - - # You can add series with strings and nans but you can't add scalars 'a' + np.nan - str_add = lambda a, b: a + b if a is not np.nan else a - # DataFrame for dda, ddb, a, b, runs in [ # (ddf1, ddf2, df1, df2, [(add, None), (first, None)]), diff --git a/dask_expr/tests/test_groupby.py b/dask_expr/tests/test_groupby.py index 810fcd265..67a07b26a 100644 --- a/dask_expr/tests/test_groupby.py +++ b/dask_expr/tests/test_groupby.py @@ -4,6 +4,7 @@ import dask import numpy as np import pytest +from toolz import identity from dask_expr import from_pandas from dask_expr._groupby import GroupByUDFBlockwise @@ -112,13 +113,13 @@ def test_groupby_reduction_optimize(pdf, df): assert ops[0].columns == ["x", "y"] df2 = df[["y"]] - agg = df2.groupby(df.x).y.apply(lambda x: x) + agg = df2.groupby(df.x).y.apply(identity) ops = [ op for op in agg.expr.optimize(fuse=False).walk() if isinstance(op, FromPandas) ] assert len(ops) == 1 assert ops[0].columns == ["x", "y"] - assert_eq(agg, pdf.replace(1, 5).groupby(pdf.replace(1, 5).x).y.apply(lambda x: x)) + assert_eq(agg, pdf.replace(1, 5).groupby(pdf.replace(1, 5).x).y.apply(identity)) @pytest.mark.parametrize( @@ -204,18 +205,21 @@ def test_groupby_series(pdf, df): df.groupby(df2.a) +def copy(x): + return x.copy() + + @pytest.mark.parametrize("group_keys", [True, False, None]) def test_groupby_group_keys(group_keys, pdf): pdf = pdf.set_index("x") df = from_pandas(pdf, npartitions=10) - func = lambda g: g.copy() - expected = pdf.groupby("x").apply(func) - assert_eq(expected, df.groupby("x").apply(func, meta=expected)) + expected = pdf.groupby("x").apply(copy) + assert_eq(expected, df.groupby("x").apply(copy, meta=expected)) - expected = pdf.groupby("x", group_keys=group_keys).apply(func) + expected = pdf.groupby("x", group_keys=group_keys).apply(copy) assert_eq( - expected, df.groupby("x", group_keys=group_keys).apply(func, meta=expected) + expected, df.groupby("x", group_keys=group_keys).apply(copy, meta=expected) ) @@ -361,52 +365,54 @@ def test_groupby_repartition_to_one(pdf, df): assert_eq(result, expected) -def test_groupby_apply(df, pdf): - def test(x): - x["new"] = x.sum().sum() - return x +def func(x): + x["new"] = x.sum().sum() + return x - assert_eq(df.groupby(df.x).apply(test), pdf.groupby(pdf.x).apply(test)) + +def test_groupby_apply(df, pdf): + assert_eq(df.groupby(df.x).apply(func), pdf.groupby(pdf.x).apply(func)) assert_eq( - df.groupby(df.x, group_keys=False).apply(test), - pdf.groupby(pdf.x, group_keys=False).apply(test), + df.groupby(df.x, group_keys=False).apply(func), + pdf.groupby(pdf.x, group_keys=False).apply(func), ) - assert_eq(df.groupby("x").apply(test), pdf.groupby("x").apply(test)) + assert_eq(df.groupby("x").apply(func), pdf.groupby("x").apply(func)) assert_eq( - df.groupby("x").apply(test, meta=pdf.groupby("x").apply(test).head(0)), - pdf.groupby("x").apply(test), + df.groupby("x").apply(func, meta=pdf.groupby("x").apply(func).head(0)), + pdf.groupby("x").apply(func), ) - assert_eq(df.groupby(["x", "y"]).apply(test), pdf.groupby(["x", "y"]).apply(test)) + assert_eq(df.groupby(["x", "y"]).apply(func), pdf.groupby(["x", "y"]).apply(func)) - query = df.groupby("x").apply(test).optimize(fuse=False) + query = df.groupby("x").apply(func).optimize(fuse=False) assert query.expr.find_operations(Shuffle) assert query.expr.find_operations(GroupByUDFBlockwise) - query = df.groupby("x")[["y"]].apply(test).simplify() - expected = df[["x", "y"]].groupby("x")[["y"]].apply(test).simplify() + query = df.groupby("x")[["y"]].apply(func).simplify() + expected = df[["x", "y"]].groupby("x")[["y"]].apply(func).simplify() assert query._name == expected._name - assert_eq(query, pdf.groupby("x")[["y"]].apply(test)) + assert_eq(query, pdf.groupby("x")[["y"]].apply(func)) def test_groupby_transform(df, pdf): - def test(x): - return x - - assert_eq(df.groupby(df.x).transform(test), pdf.groupby(pdf.x).transform(test)) - assert_eq(df.groupby("x").transform(test), pdf.groupby("x").transform(test)) assert_eq( - df.groupby("x").transform(test, meta=pdf.groupby("x").transform(test).head(0)), - pdf.groupby("x").transform(test), + df.groupby(df.x).transform(identity), pdf.groupby(pdf.x).transform(identity) + ) + assert_eq(df.groupby("x").transform(identity), pdf.groupby("x").transform(identity)) + assert_eq( + df.groupby("x").transform( + identity, meta=pdf.groupby("x").transform(identity).head(0) + ), + pdf.groupby("x").transform(identity), ) - query = df.groupby("x").transform(test).optimize(fuse=False) + query = df.groupby("x").transform(identity).optimize(fuse=False) assert query.expr.find_operations(Shuffle) assert query.expr.find_operations(GroupByUDFBlockwise) - query = df.groupby("x")[["y"]].transform(test).simplify() - expected = df[["x", "y"]].groupby("x")[["y"]].transform(test).simplify() + query = df.groupby("x")[["y"]].transform(identity).simplify() + expected = df[["x", "y"]].groupby("x")[["y"]].transform(identity).simplify() assert query._name == expected._name - assert_eq(query, pdf.groupby("x")[["y"]].transform(test)) + assert_eq(query, pdf.groupby("x")[["y"]].transform(identity)) def test_groupby_shift(df, pdf): @@ -433,10 +439,14 @@ def test_size(pdf, df): assert_eq(df.groupby("x").agg("size"), pdf.groupby("x").agg("size")) +def div2(x): + return x // 2 + + def test_groupby_numeric_only_lambda_caller(df, pdf): assert_eq( - df.groupby(lambda x: x // 2).mean(numeric_only=False), - pdf.groupby(lambda x: x // 2).mean(numeric_only=False), + df.groupby(div2).mean(numeric_only=False), + pdf.groupby(div2).mean(numeric_only=False), ) @@ -463,11 +473,15 @@ def test_groupby_single_agg_split_out(pdf, df, api, sort, split_out): assert_eq(agg, expect, sort_results=not sort) +def dfsum(df): + return df.sum() + + @pytest.mark.parametrize( "func", [ - lambda grouped: grouped.apply(lambda x: x.sum()), - lambda grouped: grouped.transform(lambda x: x.sum()), + lambda grouped: grouped.apply(dfsum), + lambda grouped: grouped.transform(dfsum), ], ) def test_apply_or_transform_shuffle_multilevel(pdf, df, func): @@ -533,9 +547,9 @@ def test_numeric_column_names(): ddf = from_pandas(df, npartitions=2) assert_eq(ddf.groupby(0).sum(), df.groupby(0).sum()) assert_eq(ddf.groupby([0, 2]).sum(), df.groupby([0, 2]).sum()) - expected = df.groupby(0).apply(lambda x: x) + expected = df.groupby(0).apply(identity) assert_eq( - ddf.groupby(0).apply(lambda x: x, meta=expected), + ddf.groupby(0).apply(identity, meta=expected), expected, ) @@ -543,9 +557,9 @@ def test_numeric_column_names(): def test_apply_divisions(pdf): pdf = pdf.set_index("x") df = from_pandas(pdf, npartitions=10) - result = df.groupby(["x", "y"]).apply(lambda x: x) + result = df.groupby(["x", "y"]).apply(identity) assert df.divisions == result.divisions - assert_eq(result, pdf.groupby(["x", "y"]).apply(lambda x: x)) + assert_eq(result, pdf.groupby(["x", "y"]).apply(identity)) def test_groupby_co_aligned_grouper(df, pdf): @@ -580,10 +594,14 @@ def test_groupby_median(df, pdf): assert_eq(df.groupby("x").median()["y"], pdf.groupby("x").median()["y"]) +def _func(x, y): + return x + y + + def test_groupby_apply_args(df, pdf): assert_eq( - df.groupby("x").apply(lambda x, y: x + y, 1), - pdf.groupby("x").apply(lambda x, y: x + y, 1), + df.groupby("x").apply(_func, 1), + pdf.groupby("x").apply(_func, 1), ) @@ -723,17 +741,18 @@ def test_groupby_dir(df): assert "y" in dir(df.groupby("x")) -def test_groupby_udf_user_warning(df, pdf): - def func(df): - return df + 1 +def inc(df): + return df + 1 - expected = pdf.groupby("x").apply(func) + +def test_groupby_udf_user_warning(df, pdf): + expected = pdf.groupby("x").apply(inc) with pytest.warns(UserWarning, match="`meta` is not specified"): - assert_eq(expected, df.groupby("x").apply(func)) + assert_eq(expected, df.groupby("x").apply(inc)) - expected = pdf.groupby("x").transform(func) + expected = pdf.groupby("x").transform(inc) with pytest.warns(UserWarning, match="`meta` is not specified"): - assert_eq(expected, df.groupby("x").transform(func)) + assert_eq(expected, df.groupby("x").transform(inc)) def test_groupby_index_array(pdf): diff --git a/dask_expr/tests/test_indexing.py b/dask_expr/tests/test_indexing.py index 6cfea63b3..15f679970 100644 --- a/dask_expr/tests/test_indexing.py +++ b/dask_expr/tests/test_indexing.py @@ -134,10 +134,11 @@ def test_loc_with_array(df, pdf): assert_eq(df.loc[(df.x % 2 == 0).values], pdf.loc[(pdf.x % 2 == 0).values]) +def _col_loc_fun(_df): + return _df.columns.str.contains("y") + + def test_loc_with_function(df, pdf): assert_eq(df.loc[lambda df: df["x"] > 3, :], pdf.loc[lambda df: df["x"] > 3, :]) - def _col_loc_fun(_df): - return _df.columns.str.contains("y") - assert_eq(df.loc[:, _col_loc_fun], pdf.loc[:, _col_loc_fun]) diff --git a/dask_expr/tests/test_map_partitions_overlap.py b/dask_expr/tests/test_map_partitions_overlap.py index 8ddc48f6f..0c1409169 100644 --- a/dask_expr/tests/test_map_partitions_overlap.py +++ b/dask_expr/tests/test_map_partitions_overlap.py @@ -23,11 +23,12 @@ def df(pdf): yield from_pandas(pdf, npartitions=10) -def test_map_partitions(df): - def combine_x_y(x, y, foo=None): - assert foo == "bar" - return x + y +def combine_x_y(x, y, foo=None): + assert foo == "bar" + return x + y + +def test_map_partitions(df): df2 = df.map_partitions(combine_x_y, df + 1, foo="bar") assert_eq(df2, df + (df + 1)) @@ -35,16 +36,20 @@ def combine_x_y(x, y, foo=None): assert_eq(df2, df + (df + 1)) -def test_map_partitions_broadcast(df): - def combine_x_y(x, y, val, foo=None): - assert foo == "bar" - return x + y + val +def combine_x_y_val(x, y, val, foo=None): + assert foo == "bar" + return x + y + val - df2 = df.map_partitions(combine_x_y, df["x"].sum(), 123, foo="bar") + +def test_map_partitions_broadcast(df): + df2 = df.map_partitions(combine_x_y_val, df["x"].sum(), 123, foo="bar") assert_eq(df2, df + df["x"].sum() + 123) assert_eq(df2.optimize(), df + df["x"].sum() + 123) +from pandas import merge + + @pytest.mark.parametrize("opt", [True, False]) def test_map_partitions_merge(opt): # Make simple left & right dfs @@ -55,7 +60,7 @@ def test_map_partitions_merge(opt): # Partition-wise merge with map_partitions df3 = df1.map_partitions( - lambda l, r: l.merge(r, on="x"), + merge, df2, enforce_metadata=False, clear_divisions=True, @@ -69,11 +74,12 @@ def test_map_partitions_merge(opt): assert_eq(df3, expect, check_index=False) -def test_map_overlap(): - def func(x): - x = x + x.sum() - return x +def func(x): + x = x + x.sum() + return x + +def test_map_overlap(): idx = pd.date_range("2020-01-01", periods=5, freq="D") pdf = pd.DataFrame(1, index=idx, columns=["a"]) df = from_pandas(pdf, npartitions=2) @@ -97,11 +103,12 @@ def func(x): assert_eq(result, expected, check_index=False) -def test_map_overlap_raises(): - def func(x): - x = x + x.sum() - return x +def func(x): + x = x + x.sum() + return x + +def test_map_overlap_raises(): idx = pd.date_range("2020-01-01", periods=5, freq="D") pdf = pd.DataFrame(1, index=idx, columns=["a"]) df = from_pandas(pdf, npartitions=2) @@ -122,13 +129,14 @@ def func(x): df.map_overlap(func, before=1, after=-5).compute() +def shifted_sum(df, before, after, c=0): + a = df.shift(before) + b = df.shift(-after) + return df + a + b + c + + @pytest.mark.parametrize("npartitions", [1, 4]) def test_map_overlap(npartitions, pdf, df): - def shifted_sum(df, before, after, c=0): - a = df.shift(before) - b = df.shift(-after) - return df + a + b + c - for before, after in [(0, 3), (3, 0), (3, 3), (0, 0)]: # DataFrame res = df.map_overlap(shifted_sum, before, after, before, after, c=2) @@ -172,6 +180,10 @@ def f(x, partition_info=None): assert type(result) == pd.DataFrame +def _rolling_2_sum(df): + return df.rolling(2).sum() + + def test_map_overlap_provide_meta(): df = pd.DataFrame( {"x": [1, 2, 4, 7, 11], "y": [1.0, 2.0, 3.0, 4.0, 5.0]} @@ -179,15 +191,11 @@ def test_map_overlap_provide_meta(): ddf = from_pandas(df, npartitions=2) # Provide meta spec, but not full metadata - res = ddf.map_overlap( - lambda df: df.rolling(2).sum(), 2, 0, meta={"x": "i8", "y": "i8"} - ) + res = ddf.map_overlap(_rolling_2_sum, 2, 0, meta={"x": "i8", "y": "i8"}) sol = df.rolling(2).sum() assert_eq(res, sol) - res = map_overlap( - lambda df: df.rolling(2).sum(), ddf, 2, 0, meta={"x": "i8", "y": "i8"} - ) + res = map_overlap(_rolling_2_sum, ddf, 2, 0, meta={"x": "i8", "y": "i8"}) sol = df.rolling(2).sum() assert_eq(res, sol) @@ -320,6 +328,14 @@ def get_shifted_sum_arg(overlap): assert_eq(res, sol) +def _assign(df): + return df.assign(C=df.A + df.B) + + +def _rename_axis(df): + return df.rename_axis("newindex") + + def test_map_partitions_propagates_index_metadata(): index = pd.Series(list("abcde"), name="myindex") df = pd.DataFrame( @@ -328,12 +344,12 @@ def test_map_partitions_propagates_index_metadata(): ) ddf = from_pandas(df, npartitions=2) res = ddf.map_partitions( - lambda df: df.assign(C=df.A + df.B), + _assign, meta=[("A", "i4"), ("B", "i4"), ("C", "i4")], ) - sol = df.assign(C=df.A + df.B) + sol = _assign(df) assert_eq(res, sol) - res = ddf.map_partitions(lambda df: df.rename_axis("newindex")) - sol = df.rename_axis("newindex") + res = ddf.map_partitions(_rename_axis) + sol = _rename_axis(df) assert_eq(res, sol) diff --git a/dask_expr/tests/test_resample.py b/dask_expr/tests/test_resample.py index 4276c22d8..5cbe33d24 100644 --- a/dask_expr/tests/test_resample.py +++ b/dask_expr/tests/test_resample.py @@ -100,10 +100,11 @@ def test_series_resample(obj, method, npartitions, freq, closed, label): assert expected.index[-1] == divisions[-1] -def test_resample_agg(df, pdf): - def my_sum(vals, foo=None, *, bar=None): - return vals.sum() +def my_sum(vals, foo=None, *, bar=None): + return vals.sum() + +def test_resample_agg(df, pdf): result = df.resample("2T").agg(my_sum, "foo", bar="bar") expected = pdf.resample("2T").agg(my_sum, "foo", bar="bar") assert_eq(result, expected) diff --git a/dask_expr/tests/test_rolling.py b/dask_expr/tests/test_rolling.py index fe4bcc503..ec5b77f1b 100644 --- a/dask_expr/tests/test_rolling.py +++ b/dask_expr/tests/test_rolling.py @@ -59,12 +59,13 @@ def test_rolling_apis(df, pdf, window, api, how_args, min_periods, center): assert q._name == eq._name +def my_sum(vals, foo=None, *, bar=None): + return vals.sum() + + @pytest.mark.parametrize("window", (1, 2)) @pytest.mark.parametrize("df", (1, 2), indirect=True) def test_rolling_agg(df, pdf, window): - def my_sum(vals, foo=None, *, bar=None): - return vals.sum() - result = df.rolling(window).agg(my_sum, "foo", bar="bar") expected = pdf.rolling(window).agg(my_sum, "foo", bar="bar") assert_eq(result, expected) @@ -98,11 +99,11 @@ def my_sum(vals, foo_=None, *, bar_=None): result = df.rolling(window).apply(my_sum, **kwargs) expected = pdf.rolling(window).apply(my_sum, **kwargs) - assert_eq(result, expected) + assert_eq(result, expected, allow_cloudpickle=True) result = df.rolling(window).apply(my_sum, **kwargs)["foo"] expected = pdf.rolling(window).apply(my_sum, **kwargs)["foo"] - assert_eq(result, expected) + assert_eq(result, expected, allow_cloudpickle=True) # simplify up disabled for `apply`, function may access other columns q = df.rolling(window).apply(my_sum, **kwargs)["foo"].simplify() @@ -135,10 +136,14 @@ def test_time_rolling_large_window_variable_chunks(window): assert_eq(ddf.rolling(window).mean(), df.rolling(window).mean()) +def _rolling_1s_count(df): + return df.rolling("1s").count() + + def test_rolling_one_element_window_empty_after(df, pdf): pdf.index = pd.date_range("2000-01-01", periods=12, freq="2s") df = from_pandas(pdf, npartitions=3) - result = df.map_overlap(lambda x: x.rolling("1s").count(), before="1s", after="1s") + result = df.map_overlap(_rolling_1s_count, before="1s", after="1s") expected = pdf.rolling("1s").count() assert_eq(result, expected)