diff --git a/cf/constants.py b/cf/constants.py index c6c2ea4f5b..b89eff9e99 100644 --- a/cf/constants.py +++ b/cf/constants.py @@ -63,6 +63,7 @@ "LOG_LEVEL": logging.getLevelName(logging.getLogger().level), "BOUNDS_COMBINATION_MODE": "AND", "CHUNKSIZE": parse_bytes(_CHUNKSIZE), + "ACTIVE_STORAGE": True, } masked = np.ma.masked diff --git a/cf/data/array/mixin/__init__.py b/cf/data/array/mixin/__init__.py index 15764dd8e3..0dba941f5d 100644 --- a/cf/data/array/mixin/__init__.py +++ b/cf/data/array/mixin/__init__.py @@ -1,3 +1,4 @@ +from .activestoragemixin import ActiveStorageMixin from .arraymixin import ArrayMixin from .compressedarraymixin import CompressedArrayMixin from .filearraymixin import FileArrayMixin diff --git a/cf/data/array/mixin/activestoragemixin.py b/cf/data/array/mixin/activestoragemixin.py new file mode 100644 index 0000000000..2c7f6b5d3c --- /dev/null +++ b/cf/data/array/mixin/activestoragemixin.py @@ -0,0 +1,154 @@ +try: + from activestorage import Active +except ModuleNotFoundError: + Active = None + + +class ActiveStorageMixin: + """TODOACTIVEDOCS. + + .. versionadded:: ACTIVEVERSION + + """ + + def __getitem__(self, indices): + """Returns a subspace of the array as a numpy array. + + x.__getitem__(indices) <==> x[indices] + + The indices that define the subspace must be either `Ellipsis` or + a sequence that contains an index for each dimension. In the + latter case, each dimension's index must either be a `slice` + object or a sequence of two or more integers. + + Indexing is similar to numpy indexing. The only difference to + numpy indexing (given the restrictions on the type of indices + allowed) is: + + * When two or more dimension's indices are sequences of integers + then these indices work independently along each dimension + (similar to the way vector subscripts work in Fortran). + + .. versionadded:: ACTIVEVERSION + + """ + method = self.get_active_method() + if method is None: + # Normal read by local client. Returns a numpy array. + return super().__getitem__(indices) + + # Active storage reduction. Returns a dictionary. + try: + missing_values = self.get_missing_values() + except AttributeError: + missing_values = {} + else: + if missing_values is None: + missing_values = {} + + active = Active( + self.get_filename(), self.get_ncvar(), **missing_values + ) + active.method = method + active.components = True + try: + active.lock = self._dask_lock + except AttributeError: + pass + + return active[indices] + + def actify(self, method, axis=None): + """Return a new actified `{{class}}` instance. + + The new instance is a deep copy of the original, with the + additional setting of the active storage method and axis. + + .. versionadded:: ACTIVEVER + + .. seealso:: `set_active_axis`, `set_active_method` + + :Parameters: + + method: `str` + TODOACTIVEDOCS + + axis: `None` or (sequence of) `int`, optional + TODOACTIVEDOCS + + :Returns: + + `{{class}}` + TODOACTIVEDOCS + + """ + a = self.copy() + a.set_active_method(method) + a.set_active_axis(axis) + return a + + def get_active_axis(self): + """TODOACTIVEDOCS. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `set_active_axis` + + :Returns: + + TODOACTIVEDOCS + + """ + return self._custom.get("active_axis") + + def get_active_method(self): + """TODOACTIVEDOCS. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `set_active_method` + + :Returns: + + `str` or `None` + The name of the active reduction method, or `None` if + one hasn't been set. + + """ + return self._custom.get("active_method") + + def set_active_axis(self, value): + """TODOACTIVEDOCS. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `get_active_axis` + + :Parameters: + + TODOACTIVEDOCS + + :Returns: + + `None` + + """ + self._custom["active_axis"] = value + + def set_active_method(self, value): + """TODOACTIVEDOCS. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `get_active_method` + + :Parameters: + + TODOACTIVEDOCS + + :Returns: + + `None` + + """ + self._custom["active_method"] = value diff --git a/cf/data/array/netcdfarray.py b/cf/data/array/netcdfarray.py index d6042909f9..7dece7d1a3 100644 --- a/cf/data/array/netcdfarray.py +++ b/cf/data/array/netcdfarray.py @@ -1,15 +1,19 @@ import cfdm -from dask.utils import SerializableLock from ...mixin_container import Container -from .mixin import FileArrayMixin # Global lock for netCDF file access -_lock = SerializableLock() +from ..utils import netcdf_lock +from .mixin import ActiveStorageMixin, FileArrayMixin -class NetCDFArray(FileArrayMixin, Container, cfdm.NetCDFArray): - """An array stored in a netCDF file.""" +class NetCDFArray( + ActiveStorageMixin, FileArrayMixin, Container, cfdm.NetCDFArray +): + """An array stored in a netCDF file. + + TODOACTIVEDOCS + """ def __repr__(self): """Called by the `repr` built-in function. @@ -37,4 +41,4 @@ def _dask_lock(self): if filename is None: return False - return _lock + return netcdf_lock diff --git a/cf/data/collapse/__init__.py b/cf/data/collapse/__init__.py index 0de12360ea..47bbd037ce 100644 --- a/cf/data/collapse/__init__.py +++ b/cf/data/collapse/__init__.py @@ -1 +1,2 @@ from .collapse import Collapse +from .collapse_active import actify diff --git a/cf/data/collapse/collapse.py b/cf/data/collapse/collapse.py index 0220f3e17a..fc1ccc07cf 100644 --- a/cf/data/collapse/collapse.py +++ b/cf/data/collapse/collapse.py @@ -5,12 +5,31 @@ from dask.array.reductions import reduction from ...docstring import _docstring_substitution_definitions +from .collapse_active import active_storage from .collapse_utils import check_input_dtype, double_precision_dtype class Collapse(metaclass=DocstringRewriteMeta): """Container for functions that collapse dask arrays. + **Active storage reductions** + + A collapse method (such as `max`, `var`, etc.) will attempt to + make use of active storage reductions if: + + * The collapse method's *active_storage* parameter is set to True. + + * The method has a corresponding active chunk function defined in + the `collapse_active.active_chunk_functions` dictionary. + + These conditions alone are not sufficient active storage + reductions to occur. In addition, the graph of the `dask` array is + inspected to confirm that making use of active storage is + possible, and if so the graph is modified to expect the per-chunk + reductions to be carried out externally. + + See `cf.data.collapse.actify` for details. + .. versionadded:: 3.14.0 """ @@ -47,6 +66,7 @@ def __docstring_package_depth__(self): """ return 0 + @active_storage("max") def max( self, a, @@ -55,6 +75,7 @@ def max( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return maximum values of an array. @@ -82,6 +103,10 @@ def max( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -108,14 +133,16 @@ def max( meta=np.array((), dtype=dtype), ) + @active_storage("max_abs") def max_abs( self, a, axis=None, keepdims=False, - mtol=None, + mtol=1, split_every=None, chunk_function=None, + active_storage=False, ): """Return maximum absolute values of an array. @@ -143,6 +170,10 @@ def max_abs( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -155,8 +186,10 @@ def max_abs( keepdims=keepdims, mtol=mtol, split_every=split_every, + active_storage=False, ) + @active_storage("mean") def mean( self, a, @@ -166,6 +199,7 @@ def mean( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return mean values of an array. @@ -195,6 +229,10 @@ def mean( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -222,6 +260,7 @@ def mean( weights=weights, ) + @active_storage("mean_abs") def mean_abs( self, a, @@ -231,6 +270,7 @@ def mean_abs( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return mean absolute values of an array. @@ -260,6 +300,10 @@ def mean_abs( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -273,8 +317,10 @@ def mean_abs( keepdims=keepdims, mtol=mtol, split_every=split_every, + active_storage=False, ) + @active_storage("mid_range") def mid_range( self, a, @@ -284,6 +330,7 @@ def mid_range( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return mid-range values of an array. @@ -311,6 +358,10 @@ def mid_range( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -341,6 +392,7 @@ def mid_range( meta=np.array((), dtype=dtype), ) + @active_storage("min") def min( self, a, @@ -349,6 +401,7 @@ def min( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return minimum values of an array. @@ -376,6 +429,10 @@ def min( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -402,6 +459,7 @@ def min( meta=np.array((), dtype=dtype), ) + @active_storage("min_abs") def min_abs( self, a, @@ -410,6 +468,7 @@ def min_abs( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return minimum absolute values of an array. @@ -437,6 +496,10 @@ def min_abs( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -449,8 +512,10 @@ def min_abs( keepdims=keepdims, mtol=mtol, split_every=split_every, + active_storage=False, ) + @active_storage("range") def range( self, a, @@ -459,6 +524,7 @@ def range( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return range values of an array. @@ -486,6 +552,10 @@ def range( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -516,6 +586,7 @@ def range( meta=np.array((), dtype=dtype), ) + @active_storage("rms") def rms( self, a, @@ -525,6 +596,7 @@ def rms( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return root mean square (RMS) values of an array. @@ -554,6 +626,10 @@ def rms( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -581,6 +657,7 @@ def rms( weights=weights, ) + @active_storage("sample_size") def sample_size( self, a, @@ -589,6 +666,7 @@ def sample_size( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return sample size values of an array. @@ -616,6 +694,10 @@ def sample_size( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -646,6 +728,7 @@ def sample_size( meta=np.array((), dtype=dtype), ) + @active_storage("sum") def sum( self, a, @@ -655,6 +738,7 @@ def sum( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return sum values of an array. @@ -684,6 +768,10 @@ def sum( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -714,6 +802,7 @@ def sum( weights=weights, ) + @active_storage("sum_of_weights") def sum_of_weights( self, a, @@ -723,6 +812,7 @@ def sum_of_weights( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return sum of weights values for an array. @@ -752,6 +842,10 @@ def sum_of_weights( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -783,6 +877,7 @@ def sum_of_weights( weights=weights, ) + @active_storage("sum_of_weights2") def sum_of_weights2( self, a, @@ -792,6 +887,7 @@ def sum_of_weights2( mtol=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return sum of squares of weights values for an array. @@ -821,6 +917,10 @@ def sum_of_weights2( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -852,7 +952,10 @@ def sum_of_weights2( weights=weights, ) - def unique(self, a, split_every=None, chunk_function=None): + @active_storage("unique") + def unique( + self, a, split_every=None, chunk_function=None, active_storage=False + ): """Return unique elements of the data. .. versionadded:: 3.14.0 @@ -866,6 +969,10 @@ def unique(self, a, split_every=None, chunk_function=None): {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` @@ -899,6 +1006,7 @@ def unique(self, a, split_every=None, chunk_function=None): meta=np.array((), dtype=dtype), ) + @active_storage("var") def var( self, a, @@ -909,6 +1017,7 @@ def var( ddof=None, split_every=None, chunk_function=None, + active_storage=False, ): """Return variances of an array. @@ -940,6 +1049,10 @@ def var( {{chunk_function: callable, optional}} + {{active_storage: `bool`, optional}} + + .. versionadded:: ACTIVEVERSION + :Returns: `dask.array.Array` diff --git a/cf/data/collapse/collapse_active.py b/cf/data/collapse/collapse_active.py new file mode 100644 index 0000000000..5684927cef --- /dev/null +++ b/cf/data/collapse/collapse_active.py @@ -0,0 +1,329 @@ +from functools import wraps + + +# -------------------------------------------------------------------- +# Define the active functions +# -------------------------------------------------------------------- +def active_min(a, **kwargs): + """Chunk function for minimum values computed by active storage. + + Converts active storage reduction components to the components + expected by the reduction combine and aggregate functions. + + This function is intended to be passed to `dask.array.reduction` + as the ``chunk`` parameter. Its returned value must be the same as + the non-active chunk function that it is replacing. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `actify` + + :Parameters: + + a: `dict` + The components output from the active storage + reduction. For instance: + + >>> print(a) + {'min': array([[[49.5]]], dtype=float32), 'n': 1015808} + + :Returns: + + `dict` + Dictionary with the keys: + + * N: The sample size. + * min: The minimum. + + """ + return {"N": a["n"], "min": a["min"]} + + +def active_max(a, **kwargs): + """Chunk function for maximum values computed by active storage. + + Converts active storage reduction components to the components + expected by the reduction combine and aggregate functions. + + This function is intended to be passed to `dask.array.reduction` + as the ``chunk`` parameter. Its returned value must be the same as + the non-active chunk function that it is replacing. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `actify` + + :Parameters: + + a: `dict` + The components output from the active storage + reduction. For instance: + + >>> print(a) + {'max': array([[[2930.4856]]], dtype=float32), 'n': 1015808} + + :Returns: + + `dict` + Dictionary with the keys: + + * N: The sample size. + * max: The maximum. + + """ + return {"N": a["n"], "max": a["max"]} + + +def active_mean(a, **kwargs): + """Chunk function for mean values computed by active storage. + + Converts active storage reduction components to the components + expected by the reduction combine and aggregate functions. + + This function is intended to be passed to `dask.array.reduction` + as the ``chunk`` parameter. Its returned value must be the same as + the non-active chunk function that it is replacing. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `actify` + + :Parameters: + + a: `dict` + The components output from the active storage + reduction. For instance: + + >>> print(a) + {'sum': array([[[1.5131907e+09]]], dtype=float32), 'n': 1015808} + + :Returns: + + `dict` + Dictionary with the keys: + + * N: The sample size. + * V1: The sum of ``weights``. Always equal to ``N`` + because weights have not been set. + * sum: The un-weighted sum. + * weighted: True if weights have been set. Always + False. + + """ + return {"N": a["n"], "V1": a["n"], "sum": a["sum"], "weighted": False} + + +def active_sum(a, **kwargs): + """Chunk function for sum values computed by active storage. + + Converts active storage reduction components to the components + expected by the reduction combine and aggregate functions. + + This function is intended to be passed to `dask.array.reduction` + as the ``chunk`` parameter. Its returned value must be the same as + the non-active chunk function that it is replacing. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `actify` + + :Parameters: + + a: `dict` + The components output from the active storage + reduction. For instance: + + >>> print(a) + {'sum': array([[[1.5131907e+09]]], dtype=float32), 'n': 1015808} + + :Returns: + + `dict` + Dictionary with the keys: + + * N: The sample size. + * sum: The un-weighted sum. + + """ + return {"N": a["n"], "sum": a["sum"]} + + +# -------------------------------------------------------------------- +# Create a map of reduction methods to their corresponding active +# functions +# -------------------------------------------------------------------- +active_chunk_functions = { + "min": active_min, + "max": active_max, + "mean": active_mean, + "sum": active_sum, +} + + +def actify(a, method, axis=None): + """Modify a dask array to use active storage reductions. + + The dask graph is inspected to ensure that active storage + reductions are possible, and if not then the dask array is + returned unchanged. + + It is assumed that: + + * The method has a corresponding active function defined in the + `active_chunk_functions` dictionary. If this is not the case + then an error will occur at definition time. + + * The `!active_storage` attribute of the `Data` object that + provided the dask array *a* is `True`. If this is not the case + then an error at compute time is likely. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `active_storage` + + :Parameters: + + a: `dask.array.Array` + The array to be collapsed. + + method: `str` + The name of the reduction method. Must be a key of the + `active_chunk_functions` dictionary. + + axis: (sequence of) `int`, optional + Axis or axes along which to operate. By default, + flattened input is used. + + :Returns: + + (`dask.array.Array`, function) or (`dask.array.Array`, `None`) + If active storage operations are possible then return the + modified dask array and the new chunk reduction + function. Otherwise return the unaltered input array and + `None`. + + """ + try: + from activestorage import Active # noqa: F401 + except ModuleNotFoundError: + # The active storage class dependency is not met, so using + # active storage is not possible. + return a, None + + from numbers import Integral + + import dask.array as da + from dask.array.utils import validate_axis + from dask.base import collections_to_dsk + + # Parse axis + if axis is None: + axis = tuple(range(a.ndim)) + else: + if isinstance(axis, Integral): + axis = (axis,) + + if len(axis) != a.ndim: + # Can't (yet) use active storage to collapse a subset of + # the axes, so return the input data unchanged. + return a, None + + axis = validate_axis(axis, a.ndim) + + # Loop round the nodes of the dask graph, looking for data + # definitions that point to files and which support active storage + # operations, and modify the dask grpah when we find them. + # + # The elements are traversed in reverse order so that the data + # defintions come out first, allowing for the potential of a + # faster short circuit when using active storage is not possible. + ok_to_actify = True + dsk = collections_to_dsk((a,), optimize_graph=True) + for key, value in reversed(dsk.items()): + try: + filename = value.get_filename() + except AttributeError: + # This dask chunk is not a data definition + continue + + if not filename: + # This data definition doesn't have any files, so can't + # support active storage reductions. + ok_to_actify = False + break + + # Still here? Then this chunk is a data definition that points + # to files, so try to insert an actified copy into the dask + # graph. + try: + dsk[key] = value.actify(method, axis) + except AttributeError: + # This data definition doesn't support active storage + # reductions + ok_to_actify = False + break + + if not ok_to_actify: + # It turns out that the dask graph is not suitable for active + # storage reductions, so return the input data unchanged. + return a, None + + # Still here? Then all data definitions in the dask graph support + # active storage reductions => redefine the dask array from the + # actified dask graph, and set the active storage reduction chunk + # function. + return ( + da.Array(dsk, a.name, a.chunks, a.dtype, a._meta), + active_chunk_functions[method], + ) + + +def active_storage(method): + """A decorator that enables active storage reductions. + + This decorator is intended for `Collapse` methods. Active storage + operations are only carried out when the conditions are right. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `actify`, `cf.data.collapse.Collapse` + + :Parameters: + + method: `str` + The name of the reduction method. If it is not one of the + keys of the `active_chunk_functions` dictionary then + active storage reductions will not occur. + + """ + + def decorator(collapse_method): + @wraps(collapse_method) + def wrapper(self, *args, **kwargs): + if ( + kwargs.get("active_storage") + and method in active_chunk_functions + and kwargs.get("weights") is None + and kwargs.get("chunk_function") is None + ): + # Attempt to actify the dask array and provide a new + # chunk function + a, chunk_function = actify( + args[0], + method=method, + axis=kwargs.get("axis"), + ) + args = list(args) + args[0] = a + + if chunk_function is not None: + # The dask array has been actified, so update the + # chunk function. + kwargs["chunk_function"] = chunk_function + + # Create the collapse + return collapse_method(self, *args, **kwargs) + + return wrapper + + return decorator diff --git a/cf/data/data.py b/cf/data/data.py index 5880d91503..fc85a5d8a6 100644 --- a/cf/data/data.py +++ b/cf/data/data.py @@ -29,6 +29,7 @@ from ..functions import ( _DEPRECATION_ERROR_KWARGS, _section, + active_storage, atol, default_netCDF_fillvals, free_memory, @@ -90,10 +91,11 @@ # Contstants used to specify which `Data` components should be cleared # when a new dask array is set. See `Data._clear_after_dask_update` # for details. -_NONE = 0 # = 0b0000 -_ARRAY = 1 # = 0b0001 -_CACHE = 2 # = 0b0010 -_ALL = 15 # = 0b1111 +_NONE = 0 # = 0b0000 +_ARRAY = 1 # = 0b0001 +_CACHE = 2 # = 0b0010 +_ACTIVE = 8 # = 0b1000 +_ALL = 15 # = 0b1111 class Data(DataClassDeprecationsMixin, Container, cfdm.Data): @@ -423,19 +425,21 @@ def __init__( "for compressed input arrays" ) - # Bring the compressed data into memory without - # decompressing it - if to_memory: - try: - array = array.to_memory() - except AttributeError: - pass + # Bring the compressed data into memory without + # decompressing it + if to_memory: + try: + array = array.to_memory() + except AttributeError: + pass if self._is_abstract_Array_subclass(array): # Save the input array in case it's useful later. For - # compressed input arrays this will contain extra information, - # such as a count or index variable. + # compressed input arrays this will contain extra + # information, such as a count or index variable. self._set_Array(array) + # Data files are candidates for active storage reductions + self._set_active_storage(True) # Cast the input data as a dask array kwargs = init_options.get("from_array", {}) @@ -621,20 +625,6 @@ def _rtol(self): """Return the current value of the `cf.rtol` function.""" return rtol().value - def _is_abstract_Array_subclass(self, array): - """Whether or not an array is a type of abstract Array. - - :Parameters: - - array: - - :Returns: - - `bool` - - """ - return isinstance(array, cfdm.Array) - def __data__(self): """Returns a new reference to self.""" return self @@ -1274,6 +1264,9 @@ def _clear_after_dask_update(self, clear=_ALL): * If ``clear & _CACHE`` is non-zero then cached element values are deleted. + * If ``clear & _ACTIVE`` is non-zero then set the + active storage status to `False`. + By default *clear* is the ``_ALL`` integer-valued constant, which results in all components being removed. @@ -1305,6 +1298,10 @@ def _clear_after_dask_update(self, clear=_ALL): # Delete cached element values self._del_cached_elements() + if clear & _ACTIVE: + # Set active storage to False + self._del_active_storage() + def _set_dask(self, array, copy=False, clear=_ALL): """Set the dask array. @@ -1411,6 +1408,32 @@ def _del_dask(self, default=ValueError(), clear=_ALL): self._clear_after_dask_update(clear) return out + def _del_active_storage(self): + """Set the active storage reduction status to False. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `active_storage`, `_set_active_storage` + + :Returns: + + `None` + + **Examples** + + >>> d = cf.Data([9]) + >>> d.active_storage() + False + >>> d._set_active_storage(True) + >>> d.active_storage() + True + >>> d._del_active_storage() + >>> d.active_storage() + False + + """ + self._custom.pop("active_storage", False) + def _del_cached_elements(self): """Delete any cached element values. @@ -1456,6 +1479,46 @@ def _get_cached_elements(self): return cache.copy() + def _is_abstract_Array_subclass(self, array): + """Whether or not an array is a type of Array. + + :Parameters: + + array: + + :Returns: + + `bool` + + """ + return isinstance(array, cfdm.Array) + + def _set_active_storage(self, value): + """Set the active storage reduction status. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `active_storage`, `_del_active_storage` + + :Returns: + + `None` + + **Examples** + + >>> d = cf.Data([9]) + >>> d.active_storage() + False + >>> d._set_active_storage(True) + >>> d.active_storage() + True + >>> d._del_active_storage() + >>> d.active_storage() + False + + """ + self._custom["active_storage"] = bool(value) + def _set_cached_elements(self, elements): """Cache selected element values. @@ -3734,8 +3797,17 @@ def concatenate(cls, data, axis=0, cull_graph=True, relaxed_units=False): dxs = [d.to_dask_array() for d in processed_data] dx = da.concatenate(dxs, axis=axis) + # Set the active storage status + active = _ACTIVE + for d in processed_data: + if not d.active_storage: + # Set the output active storage status to False when any + # input data instance has False status + active = _NONE + break + # Set the new dask array, retaining the cached elements ... - data0._set_dask(dx, clear=_ALL) + data0._set_dask(dx, clear=_ALL ^ active) # Set the appropriate cached elements cached_elements = {} @@ -4311,7 +4383,7 @@ def _axes(self, value): # ---------------------------------------------------------------- @property def chunks(self): - """The chunk sizes for each dimension. + """The `dask` chunk sizes for each dimension. .. versionadded:: 3.14.0 @@ -4333,6 +4405,27 @@ def chunks(self): # ---------------------------------------------------------------- # Attributes # ---------------------------------------------------------------- + @property + def active_storage(self): + """Whether or not active storage recductions are possible. + + If the `active_storage` attribute is `True` then reductions + (such as calculating the minimum value of the data) will + attempt to use active storage capabilities, falling back on + the usual (non-active) techniques if an active storage + operation fails for any reason. + + .. versionadded:: ACTIVEVERSION + + **Examples** + + >>> d = cf.Data([9]) + >>> d.active_storage + False + + """ + return self._custom.get("active_storage", False) + @property def Units(self): """The `cf.Units` object containing the units of the data array. @@ -7364,7 +7457,11 @@ def unique(self, split_every=None): d.soften_mask() dx = d.to_dask_array() - dx = Collapse().unique(dx, split_every=split_every) + dx = Collapse().unique( + dx, + split_every=split_every, + active_storage=d.active_storage and active_storage(), + ) d._set_dask(dx) @@ -12278,243 +12375,3 @@ def _size_of_index(index, size=None): else: # Index is a list of integers return len(index) - - -# def _collapse( -# func, -# d, -# axis=None, -# weights=None, -# keepdims=True, -# mtol=1, -# ddof=None, -# split_every=None, -# ): -# """Collapse data in-place using a given funcion. -# -# .. versionadded:: 3.14.0 -# -# .. seealso:: `_parse_weights` -# -# :Parameters: -# -# func: callable -# The function that collapses the underlying `dask` array of -# *d*. Must have the minimum signature (parameters and -# default values) ``func(dx, axis=None, keepdims=False, -# mtol=None, split_every=None)`` (optionally including -# ``weights=None`` or ``ddof=None``), where ``dx`` is a the -# dask array contained in *d*. -# -# d: `Data` -# The data to be collapsed. -# -# axis: (sequence of) int, optional -# The axes to be collapsed. By default all axes are -# collapsed, resulting in output with size 1. Each axis is -# identified by its integer position. If *axes* is an empty -# sequence then the collapse is applied to each scalar -# element and the reuslt has the same shape as the input -# data. -# -# weights: data_like, `dict`, or `None`, optional -# Weights associated with values of the data. By default -# *weights* is `None`, meaning that all non-missing elements -# of the data have a weight of 1 and all missing elements -# have a weight of 0. -# -# If *weights* is a data_like object then it must be -# broadcastable to the array. -# -# If *weights* is a dictionary then each key specifies axes -# of the data (an `int` or `tuple` of `int`), with a -# corresponding value of data_like weights for those -# axes. The dimensions of a weights value must correspond to -# its key axes in the same order. Not all of the axes need -# weights assigned to them. The weights that will be used -# will be an outer product of the dictionary's values. -# -# However they are specified, the weights are internally -# broadcast to the shape of the data, and those weights that -# are missing data, or that correspond to the missing -# elements of the data, are assigned a weight of 0. -# -# For collapse functions that do not have a ``weights`` -# parameter, *weights* must be `None`. -# -# keepdims: `bool`, optional -# By default, the axes which are collapsed are left in the -# result as dimensions with size one, so that the result -# will broadcast correctly against the input array. If set -# to False then collapsed axes are removed from the data. -# -# mtol: number, optional -# The sample size threshold below which collapsed values are -# set to missing data. It is defined as a fraction (between -# 0 and 1 inclusive) of the contributing input data values. -# -# The default of *mtol* is 1, meaning that a missing datum -# in the output array occurs whenever all of its -# contributing input array elements are missing data. -# -# For other values, a missing datum in the output array -# occurs whenever more than ``100*mtol%`` of its -# contributing input array elements are missing data. -# -# ddof: number, optional -# The delta degrees of freedom. The number of degrees of -# freedom used in the calculation is (N-*ddof*) where N -# represents the number of non-missing elements. -# -# For collapse functions that do not have a ``ddof`` -# parameter, *ddof* must be `None`. -# -# split_every: `int` or `dict`, optional -# Determines the depth of the recursive aggregation. See -# `dask.array.reduction` for details. -# -# :Returns: -# -# (`Data`, formatted weights) -# The collapsed data and the output of ``_parse_weights(d, -# weights, axis)``. -# -# """ -# kwargs = { -# "axis": axis, -# "keepdims": keepdims, -# "split_every": split_every, -# "mtol": mtol, -# } -# -# weights = _parse_weights(d, weights, axis) -# if weights is not None: -# kwargs["weights"] = weights -# -# if ddof is not None: -# kwargs["ddof"] = ddof -# -# dx = d.to_dask_array() -# dx = func(dx, **kwargs) -# d._set_dask(dx) -# -# return d, weights -# -# -# def _parse_weights(d, weights, axis=None): -# """Parse the weights input to `_collapse`. -# -# .. versionadded:: 3.14.0 -# -# .. seealso:: `_collapse` -# -# :Parameters: -# -# d: `Data` -# The data to be collapsed. -# -# weights: data_like or `dict` -# See `_collapse` for details. -# -# axis: (sequence of) `int`, optional -# See `_collapse` for details. -# -# :Returns: -# -# `Data` or `None` -# * If *weights* is a data_like object then they are -# returned unchanged as a `Data` object. It is up to the -# downstream functions to check if the weights can be -# broadcast to the data. -# -# * If *weights* is a dictionary then the dictionary -# values', i.e. the weights components, outer product is -# returned in `Data` object that is broadcastable to the -# data. -# -# If the dictionary is empty, or none of the axes defined -# by the keys correspond to collapse axes defined by -# *axis*, then then the collapse is unweighted and `None` -# is returned. -# -# Note that, in all cases, the returned weights are *not* -# modified to account for missing values in the data. -# -# **Examples** -# -# >>> d = cf.Data(np.arange(12)).reshape(4, 3) -# -# >>> _parse_weights(d, [1, 2, 1], (0, 1)) -# -# -# >>> _parse_weights(d, [[1, 2, 1]], (0, 1)) -# -# -# >>> _parse_weights(d, {1: [1, 2, 1]}, (0, 1)) -# -# -# >>> print(_parse_weights(d, {0: [1, 2, 3, 4], 1: [1, 2, 1]}, (0, 1))) -# [[1 2 1] -# [2 4 2] -# [3 6 3] -# [4 8 4]] -# -# >>> print(cf.data.data._parse_weights(d, {}, (0, 1))) -# None -# -# >>> print(cf.data.data._parse_weights(d, {1: [1, 2, 1]}, 0)) -# None -# -# """ -# if weights is None: -# # No weights -# return -# -# if not isinstance(weights, dict): -# # Weights is data_like. Don't check broadcastability to d, -# # leave that to whatever uses the weights. -# return Data.asdata(weights) -# -# if not weights: -# # No weights (empty dictionary) -# return -# -# if axis is None: -# axis = tuple(range(d.ndim)) -# else: -# axis = d._parse_axes(axis) -# -# weights = weights.copy() -# weights_axes = set() -# for key, value in tuple(weights.items()): -# del weights[key] -# key = d._parse_axes(key) -# if weights_axes.intersection(key): -# raise ValueError("Duplicate weights axis") -# -# weights[tuple(key)] = value -# weights_axes.update(key) -# -# if not weights_axes.intersection(axis): -# # No weights span collapse axes -# return -# -# # For each component, add missing dimensions as size 1. -# w = [] -# shape = d.shape -# axes = d._axes -# for key, value in weights.items(): -# value = Data.asdata(value) -# -# # Make sure axes are in ascending order -# if key != tuple(sorted(key)): -# key1 = [axes[i] for i in key] -# new_order = [key1.index(axis) for axis in axes if axis in key1] -# value = value.transpose(new_order) -# -# new_shape = [n if i in key else 1 for i, n in enumerate(shape)] -# w.append(value.reshape(new_shape)) -# -# # Return the product of the weights components, which will be -# # broadcastable to d -# return reduce(mul, w) diff --git a/cf/data/fragment/netcdffragmentarray.py b/cf/data/fragment/netcdffragmentarray.py index 4bf56408ac..58f359a1e7 100644 --- a/cf/data/fragment/netcdffragmentarray.py +++ b/cf/data/fragment/netcdffragmentarray.py @@ -1,8 +1,9 @@ +from ..array.mixin import ActiveStorageMixin from ..array.netcdfarray import NetCDFArray from .abstract import FragmentArray -class NetCDFFragmentArray(FragmentArray): +class NetCDFFragmentArray(ActiveStorageMixin, FragmentArray): """A CFA fragment array stored in a netCDF file. .. versionadded:: 3.14.0 @@ -115,7 +116,7 @@ def __getitem__(self, indices): differences: * A dimension's index can't be rank-reducing, i.e. it can't - be an integer, nor a scalar `numpy` or `dask` array. + be an integer, a scalar `numpy`, nor a `dask` array. * When two or more dimension's indices are sequences of integers then these indices work independently along each diff --git a/cf/data/utils.py b/cf/data/utils.py index 3b501fb538..360f6312af 100644 --- a/cf/data/utils.py +++ b/cf/data/utils.py @@ -5,6 +5,7 @@ import dask.array as da import numpy as np +from dask.utils import SerializableLock from ..cfdatetime import ( canonical_calendar, @@ -14,11 +15,17 @@ rt2dt, st2rt, ) +from ..functions import active_storage from ..units import Units from .dask_utils import cf_YMDhms _units_None = Units(None) +# -------------------------------------------------------------------- +# Global lock for netCDF file access +# -------------------------------------------------------------------- +netcdf_lock = SerializableLock() + def is_numeric_dtype(array): """True if the given array is of a numeric or boolean data type. @@ -820,6 +827,7 @@ def collapse( "keepdims": keepdims, "split_every": split_every, "mtol": mtol, + "active_storage": d.active_storage and active_storage(), } weights = parse_weights(d, weights, axis) @@ -942,8 +950,9 @@ def parse_weights(d, weights, axis=None): w = [] shape = d.shape axes = d._axes + Data = type(d) for key, value in weights.items(): - value = type(d).asdata(value) + value = Data.asdata(value) # Make sure axes are in ascending order if key != tuple(sorted(key)): diff --git a/cf/docstring/docstring.py b/cf/docstring/docstring.py index c07262f3fc..2555e6f59e 100644 --- a/cf/docstring/docstring.py +++ b/cf/docstring/docstring.py @@ -314,6 +314,13 @@ value. A default can also be set globally with the ``split_every`` key in `dask.config`. See `dask.array.reduction` for details.""", + # active_storage + "{{active_storage: `bool`, optional}}": """{{active_storage: `bool`, optional}} + If True then attempt to perform the collapse using + active storage reductions. However, if other necessary + conditions are not met (see `cf.data.collapse.actify` + for details) then the operation will be executed + without active storage.""", # Collapse chunk_function "{{chunk_function: callable, optional}}": """{{chunk_function: callable, optional}} Provides the ``chunk`` parameter to diff --git a/cf/functions.py b/cf/functions.py index d9a987bf8f..61ce74b547 100644 --- a/cf/functions.py +++ b/cf/functions.py @@ -173,6 +173,7 @@ def configuration( regrid_logging=None, relaxed_identities=None, bounds_combination_mode=None, + active_storage=None, of_fraction=None, collapse_parallel_mode=None, free_memory_factor=None, @@ -261,6 +262,14 @@ def configuration( construct identity. The default is to not change the current value. + active_storage: `bool` or `Constant`, optional + The new value (either True to enable active storage + reductions or False to disable them). The default is to + not change the current behaviour. + + .. versionaddedd:: ACTIVEVERSION + + of_fraction: `float` or `Constant`, optional Deprecated at version 3.14.0 and is no longer available. @@ -376,6 +385,7 @@ def configuration( new_regrid_logging=regrid_logging, new_relaxed_identities=relaxed_identities, bounds_combination_mode=bounds_combination_mode, + active_storage=active_storage, ) @@ -425,6 +435,7 @@ def _configuration(_Configuration, **kwargs): "new_regrid_logging": regrid_logging, "new_relaxed_identities": relaxed_identities, "bounds_combination_mode": bounds_combination_mode, + "active_storage": active_storage, } old_values = {} @@ -1136,6 +1147,60 @@ def _parse(cls, arg): return arg +class active_storage(ConstantAccess): + """Whether or not to allow active storage reductions. + + .. versionadded:: ACTIVEVERSION + + .. seealso:: `configuration` + + :Parameters: + + arg: `bool` or `Constant`, optional + Provide a value that will apply to all subsequent + operations. + + :Returns: + + `Constant` + The value prior to the change, or the current value if no + new value was specified. + + **Examples** + + >>> cf.active_storage() + True + >>> cf.active_storage(False) + True + >>> cf.active_storage() + False + + """ + + _name = "ACTIVE_STORAGE" + + def _parse(cls, arg): + """Parse a new constant value. + + .. versionaddedd:: ACTIVEVERSION + + :Parameters: + + cls: + This class. + + arg: + The given new constant value. + + :Returns: + + A version of the new constant value suitable for + insertion into the `CONSTANTS` dictionary. + + """ + return bool(arg) + + def CF(): """The version of the CF conventions. diff --git a/cf/read_write/netcdf/netcdfread.py b/cf/read_write/netcdf/netcdfread.py index eaca6abdd3..367936a9d4 100644 --- a/cf/read_write/netcdf/netcdfread.py +++ b/cf/read_write/netcdf/netcdfread.py @@ -1,25 +1,6 @@ import cfdm import numpy as np -""" -TODOCFA: remove aggregation_* properties from constructs - -TODOCFA: Create auxiliary coordinates from non-standardised terms - -TODOCFA: Reference instruction variables (and/or set as - "do_not_create_field") - -TODOCFA: Create auxiliary coordinates from non-standardised terms - -TODOCFA: Consider scanning for cfa variables to the top (e.g. where - scanning for geometry varables is). This will probably need a - change in cfdm so that a customizable hook can be overlaoded - (like `_customize_read_vars` does). - -TODOCFA: What about groups/netcdf_flattener? - -""" - class NetCDFRead(cfdm.read_write.netcdf.NetCDFRead): """A container for instantiating Fields from a netCDF dataset. diff --git a/cf/test/test_Data.py b/cf/test/test_Data.py index 55814cae50..f60abe97a7 100644 --- a/cf/test/test_Data.py +++ b/cf/test/test_Data.py @@ -4471,6 +4471,46 @@ def test_Data__str__(self): for element in elements0: self.assertNotIn(element, d._get_cached_elements()) + def test_Data_active_storage(self): + """Test `Data.active_storage`.""" + d = cf.Data([[9, 8]]) + self.assertFalse(d.active_storage) + + d._set_active_storage(True) + self.assertTrue(d.active_storage) + d._del_active_storage() + self.assertFalse(d.active_storage) + + # Check that operations correctly set active_storage to False, + # in particular those that do not invokde `Data._set_dask`. + d._set_active_storage(True) + d.transpose(inplace=True) + self.assertFalse(d.active_storage) + + d._set_active_storage(True) + d[...] = -1 + self.assertFalse(d.active_storage) + + d._set_active_storage(True) + d.persist(inplace=True) + self.assertFalse(d.active_storage) + + d._set_active_storage(True) + d.rechunk(1, inplace=True) + self.assertFalse(d.active_storage) + + # Test with data on disk + n = cf.NetCDFArray( + "test_file.nc", + "eastward_wind", + shape=(1, 9, 10), + dtype=np.dtype(float), + ) + d = cf.Data(n) + self.assertTrue(d.active_storage) + d = cf.Data(n, to_memory=True) + self.assertFalse(d.active_storage) + def test_Data_cull_graph(self): """Test `Data.cull`""" d = cf.Data([1, 2, 3, 4, 5], chunks=3) diff --git a/cf/test/test_functions.py b/cf/test/test_functions.py index 7a32b0d859..bab173dfea 100644 --- a/cf/test/test_functions.py +++ b/cf/test/test_functions.py @@ -54,7 +54,7 @@ def test_configuration(self): self.assertIsInstance(org, dict) # Check all keys that should be there are, with correct value type: - self.assertEqual(len(org), 8) # update expected len if add new key(s) + self.assertEqual(len(org), 9) # update expected len if add new key(s) # Types expected: self.assertIsInstance(org["atol"], float) @@ -64,6 +64,7 @@ def test_configuration(self): self.assertIsInstance(org["bounds_combination_mode"], str) self.assertIsInstance(org["regrid_logging"], bool) self.assertIsInstance(org["tempdir"], str) + self.assertIsInstance(org["active_storage"], bool) # Log level may be input as an int but always given as # equiv. string self.assertIsInstance(org["log_level"], str) @@ -83,6 +84,7 @@ def test_configuration(self): "bounds_combination_mode": "XOR", "log_level": "INFO", "chunksize": 8e9, + "active_storage": True, } # Test the setting of each lone item.