Source code for cf_pandas.accessor

"""
From cf-xarray.
"""

import itertools
from collections import ChainMap
from typing import (
    Any,
    Callable,
    Dict,
    Hashable,
    Iterable,
    List,
    Mapping,
    MutableMapping,
    Sequence,
    Set,
    Tuple,
    TypeVar,
    Union,
    cast,
)

import pandas as pd
from pandas import DataFrame, Series

import cf_pandas as cfp

from .criteria import coordinate_criteria, guess_regex
from .options import OPTIONS
from .utils import (
    _is_datetime_like,
    always_iterable,
    match_criteria_key,
    set_up_criteria,
)
from .vocab import Vocab

#:  `axis` names understood by cf_xarray
_AXIS_NAMES = ("X", "Y", "Z", "T")

#:  `coordinate` types understood by cf_xarray.
_COORD_NAMES = ("longitude", "latitude", "vertical", "time")

# Type for Mapper functions
Mapper = Callable[[DataFrame, str], List[str]]


try:
    # delete the accessor to avoid warning
    del pd.DataFrame.cf
except AttributeError:
    pass


[docs]@pd.api.extensions.register_dataframe_accessor("cf") class CFAccessor: """Dataframe accessor analogous to cf-xarray accessor.""" def __init__(self, pandas_obj): # don't automatically validate but can when needed # self._validate(pandas_obj) self._obj = pandas_obj # @staticmethod def _validate(self): """what is necessary for basic use.""" # verify that necessary keys are present. Z would also be nice but might be missing. # but don't use the accessor to check keys = ["T", "longitude", "latitude"] missing_keys = [ key for key in keys if len(_get_axis_coord(self._obj, key)) == 0 ] if len(missing_keys) > 0: raise AttributeError( f'{"longitude", "latitude", "time"} must be identifiable in DataFrame but {missing_keys} are missing.' ) # for key in keys: # if len(_get_axis_coord(obj, "T")) == 0: # if (len(_get_axis_coord(obj, "T")) == 0) or (len(_get_axis_coord(obj, "longitude")) == 0) # if not {"longitude", "latitude", "time"} <= obj.cf.coordinates(): # raise AttributeError(f'{"longitude", "latitude", "time"} must be identifiable in DataFrame but recognized keys are {obj.cf.keys()}.') def __getitem__(self, key: str) -> Union[pd.Series, pd.DataFrame]: """Select columns or columns by alias. If one column matches key, return a Series. Otherwise return a DataFrame. Parameters ---------- key: str key in custom criteria/vocabulary to match with columns of DataFrame, or in axes or coordinates. Returns ------- Series, DataFrame with matching column(s) included. Example ------- >>> df.cf[alias] """ # if key is a coordinate or axes, use a different method to match valid_keys = _COORD_NAMES + _AXIS_NAMES # return the key if it is already a name in the object and doesn't need to be interpreted if key in self._obj.keys(): col_names = [key] elif key in valid_keys: col_names = _get_axis_coord(self._obj, key) else: col_names = _get_custom_criteria(self._obj, key) # return series for column if len(col_names) == 1 and col_names[0] in self._obj.columns: return self._obj[col_names[0]] # return index elif len(col_names) == 1 and col_names[0] in self._obj.index.names: return self._obj.index.get_level_values(col_names[0]) # return DataFrame elif len(col_names) > 1: return self._obj[col_names] else: raise ValueError("Some error has occurred.") def __setitem__(self, key: str, values: Union[Sequence, Series]): """Set column by alias. Parameters ---------- key: str key in custom criteria/vocabulary to match with columns of DataFrame, or in axes or coordinates. values : Union[Sequence, pd.Series] Values to set into object. Raises ------ ValueError Can only set one column at once. """ col = self.__getitem__(key) if isinstance(col, Series): self._obj[col.name] = values # return self._obj[col.name] elif col is None: # make new column self._obj[key] = values # return self._obj[key] elif isinstance(col, pd.Index): # which of possible multi index it is # does single and multi index need to be separated? # ilev = self._obj.index.names.index(col.name) # self._obj.index = self._obj.index.set_levels(values, level=ilev) # loop over levels in index so we know which level to replace inds = [] for lev in range(self._obj.index.nlevels): ind = self._obj.index.get_level_values(lev) if self._obj.index.names[lev] == col.name: save_type = type(col) ind = save_type(values) ind.name = col.name inds.append(ind) self._obj = self._obj.set_index(inds) else: raise ValueError("Setting item only works if key matches one column only.") def __contains__(self, item: str) -> bool: """ Check whether item is a valid key for indexing with .cf """ return item in self.keys()
[docs] def keys(self) -> Set[str]: """ Utility function that returns valid keys for .cf[]. This is useful for checking whether a key is valid for indexing, i.e. that the attributes necessary to allow indexing by that key exist. Returns ------- set Set of valid key names that can be used with __getitem__ or .cf[key]. """ varnames = list(self.axes) + list(self.coordinates) try: # see which custom keys have matched values in object matched_keys = [ key for key, val in self.custom_keys.items() if len(val) > 0 ] varnames.extend(matched_keys) except ValueError: # don't have criteria defined, then no custom keys to report pass # varnames.extend(list(self.cell_measures)) # varnames.extend(list(self.standard_names)) # varnames.extend(list(self.cf_roles)) return set(varnames)
@property def axes(self) -> Dict[str, List[str]]: """ Property that returns a dictionary mapping valid Axis standard names for ``.cf[]`` to variable names. This is useful for checking whether a key is valid for indexing, i.e. that the attributes necessary to allow indexing by that key exist. It will return the Axis names ``("X", "Y", "Z", "T")`` present in ``.columns``. Returns ------- dict Dictionary with keys that can be used with ``__getitem__`` or as ``.cf[key]``. Keys will be the appropriate subset of ("X", "Y", "Z", "T"). Values are lists of variable names that match that particular key. """ # vardict = {key: self.__getitem__(key) for key in _AXIS_NAMES} vardict = {key: _get_all(self._obj, key) for key in _AXIS_NAMES} return {k: sorted(v) for k, v in vardict.items() if v} @property def coordinates(self) -> Dict[str, List[str]]: """ Property that returns a dictionary mapping valid Coordinate standard names for ``.cf[]`` to variable names. This is useful for checking whether a key is valid for indexing, i.e. that the attributes necessary to allow indexing by that key exist. It will return the Coordinate names ``("latitude", "longitude", "vertical", "time")`` present in ``.columns``. Returns ------- dict Dictionary of valid Coordinate names that can be used with ``__getitem__`` or ``.cf[key]``. Keys will be the appropriate subset of ``("latitude", "longitude", "vertical", "time")``. Values are lists of variable names that match that particular key. """ # vardict = {key: self.__getitem__(key) for key in _COORD_NAMES} vardict = {key: _get_all(self._obj, key) for key in _COORD_NAMES} return {k: sorted(v) for k, v in vardict.items() if v} @property def custom_keys(self): """ Returns a dictionary mapping criteria keys to variable names. Returns ------- dict Dictionary mapping criteria keys to variable names. Notes ----- Need to use this with context manager version of providing custom_criteria. """ custom_criteria = set_up_criteria() vardict = { key: _get_custom_criteria(self._obj, key) for key in custom_criteria.keys() } return vardict @property def axes_cols(self) -> List[str]: """ Property that returns a list of column names from the axes mapping. Returns ------- list Variable names that are the column names which represent axes. """ return list(itertools.chain(*[*self.axes.values()])) @property def coordinates_cols(self) -> List[str]: """ Property that returns a list of column names from the coordinates mapping. Returns ------- list Variable names that are the column names which represent coordinates. """ return list(itertools.chain(*[*self.coordinates.values()])) @property def standard_names(self): """ Returns a dictionary mapping standard_names to variable names, if there is a match. Compares with all cf-standard names. Returns ------- dict Dictionary mapping standard_names to variable names. Notes ----- This is not the same as the cf-xarray accessor method of the same name, which searches for variables with standard_name attributes and surfaces those values to map to the variable name. """ names = cfp.standard_names() vardict = {} for key in names: local_criteria = Vocab().make_entry(key, f"{key}$") key_match = _get_custom_criteria( self._obj, key, criteria=local_criteria.vocab ) if len(key_match) > 0: vardict[key] = key_match return vardict
def _get_axis_coord(obj: Union[DataFrame, Series], key: str) -> list: """ Translate from axis or coord name to variable name. After matching based on coordinate_criteria, if there are no matches for key, then guess_regex is used to search for matches. Parameters ---------- obj : DataArray, Dataset DataArray belonging to the coordinate to be checked key : str, ["X", "Y", "Z", "T", "longitude", "latitude", "vertical", "time"] key to check for. Returns ------- List[str], Variable name(s) in parent xarray object that matches axis or coordinate `key` Notes ----- This functions checks for the following attributes in order - `standard_name` (CF option) - `_CoordinateAxisType` (from THREDDS) - `axis` (CF option) - `positive` (CF standard for non-pressure vertical coordinate) References ---------- MetPy's parse_cf """ valid_keys = _COORD_NAMES + _AXIS_NAMES if key not in valid_keys: raise KeyError( f"cf_xarray did not understand key {key!r}. Expected one of {valid_keys!r}" ) # loop over column names and index names results: set = set() cols_and_indices = list(obj.columns) cols_and_indices += obj.index.names # remove None if in names from index cols_and_indices = [name for name in cols_and_indices if name is not None] for col in cols_and_indices: if key in coordinate_criteria: for criterion, expected in coordinate_criteria[key].items(): # allow for the column header having a space in it that separate # the name from the units, for example strings = col.split() for string in strings: string = string.lower() if string.startswith("(") and string.endswith(")"): if string.strip(")(") in expected: results.update((col,)) if string in expected: # if col.attrs.get(criterion, None) in expected: results.update((col,)) # if criterion == "units": # # deal with pint-backed objects # units = getattr(col.data, "units", None) # if units in expected: # results.update((col,)) # also use the guess_regex approach by default, but only if no results so far # this takes the logic from cf-xarray guess_coord_axis if len(results) == 0: if col in obj.columns: if key in ("T", "time") and _is_datetime_like(obj[col]): results.update((col,)) continue # prevent second detection elif col in obj.index.names: if key in ("T", "time") and _is_datetime_like( obj.index.get_level_values(col) ): results.update((col,)) continue # prevent second detection pattern = guess_regex[key] if pattern.match(col.lower()): results.update((col,)) return list(results) def _get_all(obj: DataFrame, key: str) -> List[str]: """ One or more of ('X', 'Y', 'Z', 'T', 'longitude', 'latitude', 'vertical', 'time', 'area', 'volume'), or arbitrary measures, or standard names """ all_mappers = ( _get_custom_criteria, # functools.partial(_get_custom_criteria, criteria=cf_role_criteria), _get_axis_coord, # _get_measure, # _get_with_standard_name, ) results = apply_mapper(all_mappers, obj, key, error=False, default=None) return list(set(results))
[docs]def apply_mapper( mappers: Union[Mapper, Tuple[Mapper, ...]], obj: DataFrame, key: Hashable, error: bool = True, default: Any = None, ) -> List[Any]: """ Applies a mapping function; does error handling / returning defaults. Expects the mapper function to raise an error if passed a bad key. It should return a list in all other cases including when there are no results for a good key. """ if not isinstance(key, Hashable): if default is None: raise ValueError( "`default` must be provided when `key` is not not a valid DataArray name (of hashable type)." ) return list(always_iterable(default)) default = [] if default is None else list(always_iterable(default)) def _apply_single_mapper(mapper): try: results = mapper(obj, key) except (KeyError, ValueError) as e: if error or "I expected only one." in repr(e): raise e else: results = [] return results if not isinstance(mappers, Iterable): mappers = (mappers,) # apply a sequence of mappers # if the mapper fails, it *should* return an empty list # if the mapper raises an error, that is processed based on `error` results = [] for mapper in mappers: results.append(_apply_single_mapper(mapper)) flat = list(itertools.chain(*results)) # # de-duplicate # if all(not isinstance(r, DataArray) for r in flat): # results = list(set(flat)) # else: # results = flat results = flat nresults = any(bool(v) for v in [results]) if not nresults: if error: raise KeyError( f"cf-xarray cannot interpret key {key!r}. Perhaps some needed attributes are missing." ) else: # none of the mappers worked. Return the default return default return results
# Already use match_criteria_key in other functions, and it is a bit more generic so can be used # without accessor. def _get_custom_criteria(obj: DataFrame, key: str, criteria=None) -> List[str]: results = match_criteria_key(obj.columns, key, criteria, split=True) return results