Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,10 +897,12 @@ def from_dask_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
return from_graph(graph, ddf._meta, ddf.divisions, ddf._name)


def read_csv(*args, **kwargs):
def read_csv(path, *args, **kwargs):
from dask_expr.io.csv import ReadCSV

return new_collection(ReadCSV(*args, **kwargs))
if not isinstance(path, str):
path = stringify_path(path)
return new_collection(ReadCSV(path, *args, **kwargs))


def read_parquet(
Expand All @@ -923,7 +925,7 @@ def read_parquet(
):
from dask_expr.io.parquet import ReadParquet

if hasattr(path, "name"):
if not isinstance(path, str):
path = stringify_path(path)

kwargs["dtype_backend"] = dtype_backend
Expand Down
12 changes: 8 additions & 4 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import dask
import pandas as pd
import toolz
from dask.base import normalize_token, tokenize
from dask.base import normalize_token
from dask.core import flatten, ishashable
from dask.dataframe import methods
from dask.dataframe.core import (
Expand All @@ -27,6 +27,8 @@
from dask.utils import M, apply, funcname, import_required, is_arraylike
from tlz import merge_sorted, unique

from dask_expr._util import _tokenize_deterministic

replacement_rules = []

no_default = "__no_default__"
Expand Down Expand Up @@ -530,7 +532,9 @@ def npartitions(self):

@functools.cached_property
def _name(self):
return funcname(type(self)).lower() + "-" + tokenize(*self.operands)
return (
funcname(type(self)).lower() + "-" + _tokenize_deterministic(*self.operands)
)

@property
def columns(self) -> list:
Expand Down Expand Up @@ -836,7 +840,7 @@ def _name(self):
head = funcname(self.operation)
else:
head = funcname(type(self)).lower()
return head + "-" + tokenize(*self.operands)
return head + "-" + _tokenize_deterministic(*self.operands)

def _blockwise_arg(self, arg, i):
"""Return a Blockwise-task argument"""
Expand Down Expand Up @@ -2025,7 +2029,7 @@ def __str__(self):

@functools.cached_property
def _name(self):
return f"{str(self)}-{tokenize(self.exprs)}"
return f"{str(self)}-{_tokenize_deterministic(self.exprs)}"

def _divisions(self):
return self.exprs[0]._divisions()
Expand Down
16 changes: 16 additions & 0 deletions dask_expr/_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from __future__ import annotations

from types import LambdaType

from dask import config
from dask.base import normalize_token, tokenize


def _convert_to_list(column) -> list | None:
if column is None or isinstance(column, list):
Expand All @@ -11,3 +16,14 @@ def _convert_to_list(column) -> list | None:
else:
column = [column]
return column


@normalize_token.register(LambdaType)
def _normalize_lambda(func):
return str(func)


def _tokenize_deterministic(*args, **kwargs):
# Utility to be strict about deterministic tokens
with config.set({"tokenize.ensure-deterministic": True}):
return tokenize(*args, **kwargs)