diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 1c62e8fc4..0009ed492 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -897,10 +897,12 @@ def from_dask_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase: return from_graph(graph, ddf._meta, ddf.divisions, ddf._name) -def read_csv(*args, **kwargs): +def read_csv(path, *args, **kwargs): from dask_expr.io.csv import ReadCSV - return new_collection(ReadCSV(*args, **kwargs)) + if not isinstance(path, str): + path = stringify_path(path) + return new_collection(ReadCSV(path, *args, **kwargs)) def read_parquet( @@ -923,7 +925,7 @@ def read_parquet( ): from dask_expr.io.parquet import ReadParquet - if hasattr(path, "name"): + if not isinstance(path, str): path = stringify_path(path) kwargs["dtype_backend"] = dtype_backend diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index bdc710d0d..b32f4487b 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -10,7 +10,7 @@ import dask import pandas as pd import toolz -from dask.base import normalize_token, tokenize +from dask.base import normalize_token from dask.core import flatten, ishashable from dask.dataframe import methods from dask.dataframe.core import ( @@ -27,6 +27,8 @@ from dask.utils import M, apply, funcname, import_required, is_arraylike from tlz import merge_sorted, unique +from dask_expr._util import _tokenize_deterministic + replacement_rules = [] no_default = "__no_default__" @@ -530,7 +532,9 @@ def npartitions(self): @functools.cached_property def _name(self): - return funcname(type(self)).lower() + "-" + tokenize(*self.operands) + return ( + funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands) + ) @property def columns(self) -> list: @@ -836,7 +840,7 @@ def _name(self): head = funcname(self.operation) else: head = funcname(type(self)).lower() - return head + "-" + tokenize(*self.operands) + return head + "-" + _tokenize_deterministic(*self.operands) def _blockwise_arg(self, arg, i): """Return a Blockwise-task argument""" @@ -2025,7 +2029,7 @@ def __str__(self): @functools.cached_property def _name(self): - return f"{str(self)}-{tokenize(self.exprs)}" + return f"{str(self)}-{_tokenize_deterministic(self.exprs)}" def _divisions(self): return self.exprs[0]._divisions() diff --git a/dask_expr/_util.py b/dask_expr/_util.py index a206fa9ad..6275e3769 100644 --- a/dask_expr/_util.py +++ b/dask_expr/_util.py @@ -1,5 +1,10 @@ from __future__ import annotations +from types import LambdaType + +from dask import config +from dask.base import normalize_token, tokenize + def _convert_to_list(column) -> list | None: if column is None or isinstance(column, list): @@ -11,3 +16,14 @@ def _convert_to_list(column) -> list | None: else: column = [column] return column + + +@normalize_token.register(LambdaType) +def _normalize_lambda(func): + return str(func) + + +def _tokenize_deterministic(*args, **kwargs): + # Utility to be strict about deterministic tokens + with config.set({"tokenize.ensure-deterministic": True}): + return tokenize(*args, **kwargs)