Skip to content

Basic Pipeline Stages

Basic pdpipe PdPipelineStages.

Attributes

Classes

ColDrop

Bases: ColumnsBasedPipelineStage

A pipeline stage that drops columns by name.

Parameters:

Name Type Description Default
columns single label, list-like or callable

The label, or an iterable of labels, of columns to drop. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame (see pdpipe.cq).

required
errors Optional[str]

If ā€˜ignoreā€™, suppress error and existing labels are dropped.

ā€˜ignoreā€™
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColDrop('num').apply(df)
  char
1    a
2    b
Source code in pdpipe/basic_stages.py
class ColDrop(ColumnsBasedPipelineStage):
    """
    A pipeline stage that drops columns by name.

    Parameters
    ----------
    columns : single label, list-like or callable
        The label, or an iterable of labels, of columns to drop. Alternatively,
        this parameter can be assigned a callable returning an iterable of
        labels from an input pandas.DataFrame (see `pdpipe.cq`).
    errors : {ā€˜ignoreā€™, ā€˜raiseā€™}, default ā€˜raiseā€™
        If ā€˜ignoreā€™, suppress error and existing labels are dropped.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
    >>> pdp.ColDrop('num').apply(df)
      char
    1    a
    2    b
    """

    def __init__(
        self,
        columns: ColumnsParamType,
        errors: Optional[str] = None,
        **kwargs: object,
    ) -> None:
        self._errors = errors
        self._post_cond = cond.HasNoColumn(columns)
        super_kwargs = {
            "columns": columns,
            "desc_temp": "Drop columns {}",
        }
        super_kwargs.update(**kwargs)
        super_kwargs["none_columns"] = "error"
        super().__init__(**super_kwargs)

    def _prec(self, X: pandas.DataFrame) -> bool:
        if self._errors != "ignore":
            return super()._prec(X)
        return True

    def _post(self, X: pandas.DataFrame) -> bool:
        return self._post_cond(X)

    def _transformation(
        self,
        X: pandas.DataFrame,
        verbose: bool,
        fit: bool,
    ) -> pandas.DataFrame:
        to_drop = self._get_columns(X, fit=fit)
        if verbose:
            print(f"Dropping columns {_list_str(to_drop)}")
        return X.drop(to_drop, axis=1, errors=self._errors)

ValDrop

Bases: ColumnsBasedPipelineStage

A pipeline stage that drops rows by value.

Parameters:

Name Type Description Default
values list-like

A list of the values to drop.

required
columns single label, list-like or callable, default None

The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If set to None, all columns are checked.

None
exclude_columns label, iterable or callable, optional

The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

None
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b'])
>>> pdp.ValDrop([4], 'a').apply(df)
    a   b
1   1   4
3  18  11
>>> pdp.ValDrop([4]).apply(df)
    a   b
3  18  11
Source code in pdpipe/basic_stages.py
class ValDrop(ColumnsBasedPipelineStage):
    """
    A pipeline stage that drops rows by value.

    Parameters
    ----------
    values : list-like
        A list of the values to drop.
    columns : single label, list-like or callable, default None
        The label, or an iterable of labels, of columns to check for the given
        values. Alternatively, this parameter can be assigned a callable
        returning an iterable of labels from an input pandas.DataFrame. See
        `pdpipe.cq`. If set to None, all columns are checked.
    exclude_columns : label, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,5],[18,11]], [1,2,3], ['a','b'])
    >>> pdp.ValDrop([4], 'a').apply(df)
        a   b
    1   1   4
    3  18  11
    >>> pdp.ValDrop([4]).apply(df)
        a   b
    3  18  11
    """

    def __init__(
        self,
        values: List[object],
        columns: ColumnsParamType = None,
        exclude_columns: ColumnsParamType = None,
        **kwargs: object,
    ) -> None:
        self._values = values
        self._values_str = _list_str(self._values)
        super_kwargs = {
            "columns": columns,
            "exclude_columns": exclude_columns,
            "desc_temp": f"Drop values {self._values_str} in columns {{}}",
        }
        super_kwargs.update(**kwargs)
        super_kwargs["none_columns"] = "all"
        super().__init__(**super_kwargs)

    def _transformation(
        self,
        X: pandas.DataFrame,
        verbose: bool,
        fit: bool,
    ) -> pandas.DataFrame:
        inter_X = X
        before_count = len(inter_X)
        columns_to_check = self._get_columns(X, fit=fit)
        for col in columns_to_check:
            inter_X = inter_X[~inter_X[col].isin(self._values)]
        if verbose:
            print(f"{before_count - len(inter_X)} rows dropped.")
        return inter_X

ValKeep

Bases: ColumnsBasedPipelineStage

A pipeline stage that keeps rows by value.

Parameters:

Name Type Description Default
values list-like

A list of the values to keep.

required
columns single label, list-like or callable, default None

The label, or an iterable of labels, of columns to check for the given values. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If set to None, all columns are checked.

None
exclude_columns single label, iterable or callable, optional

The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

None
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.ValKeep([4, 5], 'a').apply(df)
   a   b
2  4   5
3  5  11
>>> pdp.ValKeep([4, 5]).apply(df)
   a  b
2  4  5
Source code in pdpipe/basic_stages.py
class ValKeep(ColumnsBasedPipelineStage):
    """
    A pipeline stage that keeps rows by value.

    Parameters
    ----------
    values : list-like
        A list of the values to keep.
    columns : single label, list-like or callable, default None
        The label, or an iterable of labels, of columns to check for the given
        values. Alternatively, this parameter can be assigned a callable
        returning an iterable of labels from an input pandas.DataFrame. See
        `pdpipe.cq`. If set to None, all columns are checked.
    exclude_columns : single label, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
    >>> pdp.ValKeep([4, 5], 'a').apply(df)
       a   b
    2  4   5
    3  5  11
    >>> pdp.ValKeep([4, 5]).apply(df)
       a  b
    2  4  5
    """

    def __init__(
        self,
        values: List[object],
        columns: ColumnsParamType = None,
        exclude_columns: ColumnsParamType = None,
        **kwargs: object,
    ):
        self._values = values
        self._values_str = _list_str(self._values)
        super_kwargs = {
            "columns": columns,
            "exclude_columns": exclude_columns,
            "desc_temp": f"Keep values {self._values_str} in columns {{}}",
        }
        super_kwargs.update(**kwargs)
        super_kwargs["none_columns"] = "all"
        super().__init__(**super_kwargs)

    def _transformation(self, X, verbose, fit):
        inter_X = X
        before_count = len(inter_X)
        columns_to_check = self._get_columns(X, fit=fit)
        for col in columns_to_check:
            inter_X = inter_X[inter_X[col].isin(self._values)]
        if verbose:
            print(f"{before_count - len(inter_X)} rows dropped.")
        return inter_X

ColRename

Bases: PdPipelineStage

A pipeline stage that renames a column or columns.

Parameters:

Name Type Description Default
rename_mapper dict-like or callable

Maps old column names to new ones.

required
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
>>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df)
   len initial
1    8       a
2    5       b
>>> def renamer(lbl: str):
...    if lbl.startswith('n'):
...       return 'foo'
...    return lbl
>>> pdp.ColRename(renamer).apply(df)
   foo char
1    8    a
2    5    b
Source code in pdpipe/basic_stages.py
class ColRename(PdPipelineStage):
    """
    A pipeline stage that renames a column or columns.

    Parameters
    ----------
    rename_mapper : dict-like or callable
        Maps old column names to new ones.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'char'])
    >>> pdp.ColRename({'num': 'len', 'char': 'initial'}).apply(df)
       len initial
    1    8       a
    2    5       b

    >>> def renamer(lbl: str):
    ...    if lbl.startswith('n'):
    ...       return 'foo'
    ...    return lbl
    >>> pdp.ColRename(renamer).apply(df)
       foo char
    1    8    a
    2    5    b
    """

    _DEF_COLDRENAME_EXC_MSG = (
        "ColRename stage failed because not all columns"
        " {} were found in input dataframe."
    )

    def __init__(self, rename_mapper: Union[Dict, Callable], **kwargs):
        self._rename_mapper = rename_mapper
        try:
            columns_str = _list_str(list(rename_mapper.keys()))
            mapper_repr = str(rename_mapper)
            keys_set = set(self._rename_mapper.keys())
            required_labels = list(keys_set)
            _tprec = cond.HasAllColumns(required_labels)
        except AttributeError:  # rename mapper is a callable
            mapper_repr = rename_mapper.__name__
            doc = rename_mapper.__doc__
            if doc is None:
                columns_str = f"by func {rename_mapper.__name__}"
            else:
                columns_str = (
                    f"by func {rename_mapper.__name__} with "
                    f"doc: {rename_mapper.__doc__}"
                )
            _tprec = cond.AlwaysTrue()
        try:
            suffix = "s" if len(rename_mapper) > 1 else ""
        except TypeError:
            suffix = "s"
        self._tprec = _tprec
        super_kwargs = {
            "exmsg": ColRename._DEF_COLDRENAME_EXC_MSG.format(columns_str),
            "desc": f"Rename column{suffix} with {mapper_repr}",
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return self._tprec(X)

    def _transform(self, X, verbose):
        return X.rename(columns=self._rename_mapper)

DropNa

Bases: PdPipelineStage

A pipeline stage that drops null values.

Supports all parameter supported by pandas.dropna function.

Parameters:

Name Type Description Default
**kwargs object

All PdPipelineStage constructor parameters are supported, as are all parameters of the pandas.dropna function.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.DropNa().apply(df)
   a     b
1  1   4.0
3  1  11.0
Source code in pdpipe/basic_stages.py
class DropNa(PdPipelineStage):
    """
    A pipeline stage that drops null values.

    Supports all parameter supported by pandas.dropna function.

    Parameters
    ----------
    **kwargs : object
        All PdPipelineStage constructor parameters are supported, as are all
        parameters of the pandas.dropna function.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
    >>> pdp.DropNa().apply(df)
       a     b
    1  1   4.0
    3  1  11.0
    """

    _DEF_DROPNA_EXC_MSG = "DropNa stage failed."
    _DROPNA_KWARGS = ["axis", "how", "thresh", "subset", "inplace"]

    def __init__(self, **kwargs):
        common = set(kwargs.keys()).intersection(DropNa._DROPNA_KWARGS)
        self._dropna_kwargs = {key: kwargs.pop(key) for key in common}
        super_kwargs = {
            "exmsg": DropNa._DEF_DROPNA_EXC_MSG,
            "desc": "Drops null values.",
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return True

    def _transform(self, X, verbose):
        before_count = len(X)
        ncols_before = len(X.columns)
        inter_X = X.dropna(**self._dropna_kwargs)
        if verbose:
            print(
                f"{before_count - len(inter_X)} rows, "
                f"{ncols_before - len(inter_X.columns)} columns dropeed"
            )
        return inter_X

SetIndex

Bases: PdPipelineStage

A pipeline stage that set existing columns as index.

Supports all parameter supported by pandas.set_index function except for inplace.

Parameters:

Name Type Description Default
**kwargs object

All PdPipelineStage constructor parameters are supported, as are all parameters of the pandas.set_index function, except for 'inplace'.

{}

Examples:

import pandas as pd; import pdpipe as pdp; df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b']) pdp.SetIndex('a').apply(df) b a 1 4 3 11

Source code in pdpipe/basic_stages.py
class SetIndex(PdPipelineStage):
    """
    A pipeline stage that set existing columns as index.

    Supports all parameter supported by pandas.set_index function except for
    `inplace`.

    Parameters
    ----------
    **kwargs : object
        All PdPipelineStage constructor parameters are supported, as are all
        parameters of the pandas.set_index function, except for 'inplace'.

    Examples
    --------
    >> import pandas as pd; import pdpipe as pdp;
    >> df = pd.DataFrame([[1,4],[3, 11]], [1,2], ['a','b'])
    >> pdp.SetIndex('a').apply(df)
        b
    a
    1   4
    3  11
    """

    _DEF_SETIDX_EXC_MSG = "SetIndex stage failed."
    _DEF_SETIDX_APP_MSG = "Setting indexes..."
    _SETINDEX_KWARGS = ["drop", "append", "verify_integrity"]

    def __init__(self, keys, **kwargs):
        common = set(kwargs.keys()).intersection(SetIndex._SETINDEX_KWARGS)
        self._setindex_kwargs = {key: kwargs.pop(key) for key in common}
        self._keys = keys
        if hasattr(keys, "__iter__") and not isinstance(keys, str):
            _tprec = cond.HasAllColumns(list(keys))
        else:
            _tprec = cond.HasAllColumns([keys])
        self._tprec = _tprec
        super_kwargs = {"exmsg": SetIndex._DEF_SETIDX_EXC_MSG, "desc": "Set indexes."}
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return self._tprec(X)

    def _transform(self, X, verbose):
        return X.set_index(keys=self._keys, **self._setindex_kwargs)

FreqDrop

Bases: PdPipelineStage

A pipeline stage that drops rows by value frequency.

Parameters:

Name Type Description Default
threshold int

The minimum frequency required for a value to be kept.

required
column str

The name of the colum to check for the given value frequency.

required
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b'])
>>> pdp.FreqDrop(2, 'a').apply(df)
   a   b
1  1   4
3  1  11
Source code in pdpipe/basic_stages.py
class FreqDrop(PdPipelineStage):
    """
    A pipeline stage that drops rows by value frequency.

    Parameters
    ----------
    threshold : int
        The minimum frequency required for a value to be kept.
    column : str
        The name of the colum to check for the given value frequency.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,5],[1,11]], [1,2,3], ['a','b'])
    >>> pdp.FreqDrop(2, 'a').apply(df)
       a   b
    1  1   4
    3  1  11
    """

    _DEF_FREQDROP_EXC_MSG = (
        "FreqDrop stage failed because column {} was not" " found in input dataframe."
    )
    _DEF_FREQDROP_DESC = "Drop values with frequency < {} in column {}."

    def __init__(self, threshold: int, column: str, **kwargs):
        self._threshold = threshold
        self._column = column
        super_kwargs = {
            "exmsg": FreqDrop._DEF_FREQDROP_EXC_MSG.format(self._column),
            "desc": FreqDrop._DEF_FREQDROP_DESC.format(self._threshold, self._column),
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return self._column in X.columns

    def _transform(self, X, verbose):
        inter_X = X
        before_count = len(inter_X)
        valcount = X[self._column].value_counts()
        to_drop = valcount[valcount < self._threshold].index
        inter_X = inter_X[~inter_X[self._column].isin(to_drop)]
        if verbose:
            print(f"{before_count - len(inter_X)} rows dropped.")
        return inter_X

ColReorder

Bases: PdPipelineStage

A pipeline stage that reorders columns.

Parameters:

Name Type Description Default
positions dict

A mapping of column names to their desired positions after reordering. Columns not included in the mapping will maintain their relative positions over the non-mapped colums.

required
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd'])
>>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df)
   b  a  d  c
0  4  8  7  3
Source code in pdpipe/basic_stages.py
class ColReorder(PdPipelineStage):
    """
    A pipeline stage that reorders columns.

    Parameters
    ----------
    positions : dict
        A mapping of column names to their desired positions after reordering.
        Columns not included in the mapping will maintain their relative
        positions over the non-mapped colums.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[8,4,3,7]], columns=['a', 'b', 'c', 'd'])
    >>> pdp.ColReorder({'b': 0, 'c': 3}).apply(df)
       b  a  d  c
    0  4  8  7  3
    """

    _DEF_ORD_EXC_MSG = (
        "ColReorder stage failed because not all columns in {}"
        " were found in input dataframe."
    )

    def __init__(self, positions, **kwargs):
        self._col_to_pos = positions
        self._pos_to_col = reverse_dict_partial(positions)
        super_kwargs = {
            "exmsg": ColReorder._DEF_ORD_EXC_MSG.format(self._col_to_pos),
            "desc": f"Reorder columns by {self._col_to_pos}",
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return set(self._col_to_pos.keys()).issubset(X.columns)

    def _transform(self, X, verbose):
        cols = X.columns
        map_cols = list(self._col_to_pos.keys())
        non_map_cols = deque(x for x in cols if x not in map_cols)
        new_columns = []
        try:
            for pos in range(len(cols)):
                if pos in self._pos_to_col:
                    new_columns.append(self._pos_to_col[pos])
                else:
                    new_columns.append(non_map_cols.popleft())
            return X[new_columns]
        except (IndexError):
            raise ValueError(f"Bad positions mapping given: {new_columns}")

RowDrop

Bases: ColumnsBasedPipelineStage

A pipeline stage that drops rows by callable conditions.

Parameters:

Name Type Description Default
conditions list-like or dict

The list of conditions that make a row eligible to be dropped. Each condition must be a callable that take a cell value and return a bool value. If a list of callables is given, the conditions are checked for each column value of each row. If a dict mapping column labels to callables is given, then each condition is only checked for the column values of the designated column.

required
reduce 'any', 'all' or 'xor', default 'any'

Determines how row conditions are reduced. If set to 'all', a row must satisfy all given conditions to be dropped. If set to 'any', rows satisfying at least one of the conditions are dropped. If set to 'xor', rows satisfying exactly one of the conditions will be dropped. Set to 'any' by default.

None
columns single label, iterable or callable, optional

The label, or an iterable of labels, of columns. Alternatively, this parameter can be assigned a callable returning an iterable of labels from an input pandas.DataFrame. See pdpipe.cq. If given, input conditions will be applied to the sub-dataframe made up of these columns to determine which rows to drop. Ignored if conditions is provided with a dict object. If conditions is a list and this parameter is not provided, all columns are checked (unless exclude_columns is additionally provided)

None
exclude_columns single label, iterable or callable, optional

The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

None
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
>>> pdp.RowDrop([lambda x: x < 2]).apply(df)
   a   b
2  4   5
3  5  11
>>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df)
   a   b
1  1   4
3  5  11
Source code in pdpipe/basic_stages.py
class RowDrop(ColumnsBasedPipelineStage):
    """
    A pipeline stage that drops rows by callable conditions.

    Parameters
    ----------
    conditions : list-like or dict
        The list of conditions that make a row eligible to be dropped. Each
        condition must be a callable that take a cell value and return a bool
        value. If a list of callables is given, the conditions are checked for
        each column value of each row. If a dict mapping column labels to
        callables is given, then each condition is only checked for the column
        values of the designated column.
    reduce : 'any', 'all' or 'xor', default 'any'
        Determines how row conditions are reduced. If set to 'all', a row must
        satisfy all given conditions to be dropped. If set to 'any', rows
        satisfying at least one of the conditions are dropped. If set to 'xor',
        rows satisfying exactly one of the conditions will be dropped. Set to
        'any' by default.
    columns : single label, iterable or callable, optional
        The label, or an iterable of labels, of columns. Alternatively,
        this parameter can be assigned a callable returning an iterable of
        labels from an input pandas.DataFrame. See `pdpipe.cq`. If given,
        input conditions will be applied to the sub-dataframe made up of
        these columns to determine which rows to drop. Ignored if `conditions`
        is provided with a dict object. If `conditions` is a list and this
        parameter is not provided, all columns are checked (unless
        `exclude_columns` is additionally provided)
    exclude_columns : single label, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,5],[5,11]], [1,2,3], ['a','b'])
    >>> pdp.RowDrop([lambda x: x < 2]).apply(df)
       a   b
    2  4   5
    3  5  11
    >>> pdp.RowDrop({'a': lambda x: x == 4}).apply(df)
       a   b
    1  1   4
    3  5  11
    """

    _REDUCERS = {"all": all, "any": any, "xor": lambda x: sum(x) == 1}

    class _DictRowCond(object):
        """Filter rows by a dict of conditions."""

        def __init__(self, conditions, reducer):
            self.conditions = conditions
            self.reducer = reducer

        def __call__(self, row):
            res = [cond(row[lbl]) for lbl, cond in self.conditions.items()]
            return self.reducer(res)

    class _ListRowCond(object):
        """Filter rows by a list of conditions."""

        def __init__(self, conditions, reducer):
            self.conditions = conditions
            self.reducer = reducer

        def __call__(self, row):
            res = [self.reducer(row.apply(cond)) for cond in self.conditions]
            return self.reducer(res)

    def _row_condition_builder(self, conditions, reduce):
        reducer = RowDrop._REDUCERS[reduce]
        if self._cond_is_dict:
            row_cond = RowDrop._DictRowCond(conditions=conditions, reducer=reducer)
        else:
            row_cond = RowDrop._ListRowCond(conditions=conditions, reducer=reducer)
        return row_cond

    def __init__(
        self,
        conditions: Union[List[object], Dict[object, object]],
        reduce: str = None,
        columns: ColumnsParamType = None,
        exclude_columns: ColumnsParamType = None,
        **kwargs: object,
    ):
        self._conditions = conditions
        if reduce is None:
            reduce = "any"
        self._reduce = reduce
        if reduce not in RowDrop._REDUCERS.keys():
            raise ValueError(
                (
                    "{} is an unsupported argument for the 'reduce' parameter of "
                    "the RowDrop constructor!"
                ).format(reduce)
            )
        self._cond_is_dict = isinstance(conditions, dict)
        if self._cond_is_dict:
            valid = all([callable(cond) for cond in conditions.values()])
            if not valid:
                raise ValueError(
                    "Condition dicts given to RowDrop must map to callables!"
                )
            columns = list(conditions.keys())
        else:
            valid = all([callable(cond) for cond in conditions])
            if not valid:
                raise ValueError("RowDrop condition lists can contain only callables!")
        self._row_cond = self._row_condition_builder(conditions, reduce)
        super_kwargs = {
            "columns": columns,
            "exclude_columns": exclude_columns,
            "desc_temp": "Drop rows in columns {} by conditions",
        }
        super_kwargs.update(**kwargs)
        super_kwargs["none_columns"] = "all"
        super().__init__(**super_kwargs)

    def _transformation(self, X, verbose, fit):
        before_count = len(X)
        columns = self._get_columns(X, fit=fit)
        subX = X[columns]
        drop_index = ~subX.apply(self._row_cond, axis=1)
        inter_X = X[drop_index]
        if verbose:
            print(f"{before_count - len(inter_X)} rows dropped.")
        return inter_X

Schematize

Bases: PdPipelineStage

Enforces a column schema on input dataframes.

Parameters:

Name Type Description Default
columns Optional[List[object]]

The dataframe schema to enforce on input dataframes. If set to None, the schema is learned in fit time and applied in subsequent transforms.

None
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c'])
>>> pdp.Schematize(['a', 'c']).apply(df)
   a  c
1  2  8
2  3  9
>>> pdp.Schematize(['c', 'b']).apply(df)
   c  b
1  8  4
2  9  6
Source code in pdpipe/basic_stages.py
class Schematize(PdPipelineStage):
    """
    Enforces a column schema on input dataframes.

    Parameters
    ----------
    columns: sequence of label, optional
        The dataframe schema to enforce on input dataframes. If set to None,
        the schema is learned in fit time and applied in subsequent transforms.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[2, 4, 8],[3, 6, 9]], [1, 2], ['a', 'b', 'c'])
    >>> pdp.Schematize(['a', 'c']).apply(df)
       a  c
    1  2  8
    2  3  9
    >>> pdp.Schematize(['c', 'b']).apply(df)
       c  b
    1  8  4
    2  9  6
    """

    def __init__(
        self,
        columns: Optional[List[object]] = None,
        **kwargs: object,
    ) -> None:
        if columns is None:
            self._adaptive = True
            self._columns = None
            self._columns_str = "<Learnable Schema>"
            exmsg = "Learnable schematize failed in precondition unexpectedly!"
        else:
            self._adaptive = False
            self._columns = _interpret_columns_param(columns)
            self._columns_str = _list_str(self._columns)
            exmsg = (
                f"Not all required columns {self._columns_str} "
                f"found in input dataframe!"
            )
        desc = (
            f"Transform input dataframes to the following schema: "
            f"{self._columns_str}"
        )
        super_kwargs = {
            "exmsg": exmsg,
            "desc": desc,
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X: pandas.DataFrame) -> bool:
        if self._adaptive and self._is_being_fitted:
            return True
        return set(self._columns).issubset(X.columns)

    def _transform(self, X: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
        return X[self._columns]

    def _fit_transform(self, X: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
        if self._adaptive:
            self._columns = X.columns
            self.is_fitted = True
            return X
        return X[self._columns]

DropDuplicates

Bases: ColumnsBasedPipelineStage

Drop duplicates in the given columns.

Parameters:

Name Type Description Default
columns ColumnsParamType

The labels of the columns to consider for duplication drop. If not populated, duplicates are dropped from all columns.

None
exclude_columns object, iterable or callable, optional

The label, or an iterable of labels, of columns to exclude, given the columns parameter. Alternatively, this parameter can be assigned a callable returning a labels iterable from an input pandas.DataFrame. See pdpipe.cq. Optional. By default no columns are excluded.

None
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b'])
>>> pdp.DropDuplicates('a').apply(df)
   a  b
1  8  1
3  9  2
Source code in pdpipe/basic_stages.py
class DropDuplicates(ColumnsBasedPipelineStage):
    """
    Drop duplicates in the given columns.

    Parameters
    ----------
    columns: column label or sequence of labels, optional
        The labels of the columns to consider for duplication drop. If not
        populated, duplicates are dropped from all columns.
    exclude_columns : object, iterable or callable, optional
        The label, or an iterable of labels, of columns to exclude, given the
        `columns` parameter. Alternatively, this parameter can be assigned a
        callable returning a labels iterable from an input pandas.DataFrame.
        See `pdpipe.cq`. Optional. By default no columns are excluded.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
        >>> import pandas as pd; import pdpipe as pdp;
        >>> df = pd.DataFrame([[8, 1],[8, 2], [9, 2]], [1,2,3], ['a', 'b'])
        >>> pdp.DropDuplicates('a').apply(df)
           a  b
        1  8  1
        3  9  2
    """

    def __init__(
        self,
        columns: ColumnsParamType = None,
        exclude_columns: ColumnsParamType = None,
        **kwargs: object,
    ) -> None:
        super_kwargs = {
            "columns": columns,
            "exclude_columns": exclude_columns,
            "desc_temp": "Drop duplicates in columns {}",
        }
        super_kwargs.update(**kwargs)
        super_kwargs["none_columns"] = "all"
        super().__init__(**super_kwargs)

    def _transformation(self, X, verbose, fit):
        columns = self._get_columns(X, fit=fit)
        inter_X = X.drop_duplicates(subset=columns)
        if verbose:
            print(f"{len(X) - len(inter_X)} rows dropped.")
        return inter_X

ColumnDtypeEnforcer

Bases: PdPipelineStage

A pipeline stage enforcing column dtypes.

Parameters:

Name Type Description Default
column_to_dtype Dict

Use {col: dtype, ā€¦}, where col is a column label and dtype is a numpy.dtype or Python type to cast one or more of the DataFrameā€™s columns to column-specific types. Alternatively, you can provide ColumnQualifier objects as keys. If at least one such key is present, the lbl-to-dtype dict is dynamically inferred each time the pipeline stage is applied (note that ColumnQualifier objects are fittable by default, so to have column labels re-inferred after the first stage application you'll have to set fittable=False for the ColumnQualifier you use, see pdpipe.cq).

required
errors Optional[str]

Control raising of exceptions on invalid data for provided dtype. - raise : allow exceptions to be raised - ignore : suppress exceptions. On error return original object.

'raise'
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial'])
>>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df)
   num initial
1  8.0       a
2  5.0       b
>>> pdp.ColumnDtypeEnforcer({pdp.cq.StartsWith('n'): float}).apply(df)
   num initial
1  8.0       a
2  5.0       b
Source code in pdpipe/basic_stages.py
class ColumnDtypeEnforcer(PdPipelineStage):
    """
    A pipeline stage enforcing column dtypes.

    Parameters
    ----------
    column_to_dtype: dict of labels / ColumnQualifiers to dtypes
        Use {col: dtype, ā€¦}, where col is a column label and dtype is a
        numpy.dtype or Python type to cast one or more of the DataFrameā€™s
        columns to column-specific types. Alternatively, you can provide
        `ColumnQualifier` objects as keys. If at least one such key is present,
        the lbl-to-dtype dict is dynamically inferred each time the pipeline
        stage is applied (note that `ColumnQualifier` objects are fittable by
        default, so to have column labels re-inferred after the first stage
        application you'll have to set `fittable=False` for the
        `ColumnQualifier` you use, see `pdpipe.cq`).
    errors: {ā€˜raiseā€™, ā€˜ignoreā€™}, default ā€˜raiseā€™
        Control raising of exceptions on invalid data for provided dtype.
        - raise : allow exceptions to be raised
        - ignore : suppress exceptions. On error return original object.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[8,'a'],[5,'b']], [1,2], ['num', 'initial'])
    >>> pdp.ColumnDtypeEnforcer({'num': float}).apply(df)
       num initial
    1  8.0       a
    2  5.0       b

    >>> pdp.ColumnDtypeEnforcer({pdp.cq.StartsWith('n'): float}).apply(df)
       num initial
    1  8.0       a
    2  5.0       b
    """

    _DEF_COL_DTYPE_ENF_EXC_MSG = (
        "ColumnDtypeEnforcer stage failed because not all columns"
        " {} were found in input dataframe."
    )

    def __init__(
        self,
        column_to_dtype: Dict,
        errors: Optional[str] = "raise",
        **kwargs: object,
    ) -> None:
        # if none of the keys in column_to_dtype is a ColumnQualifier
        if not any(isinstance(x, ColumnQualifier) for x in column_to_dtype.keys()):
            # its a static map; use it as is
            self._column_to_dtype = column_to_dtype
            keys_set = set(column_to_dtype.keys())
            _tprec = cond.HasAllColumns(list(keys_set))
        else:
            # else, it's at least partly dynamic, and will have to infer it
            # on run time
            self._dynamic_column_to_dtype = column_to_dtype
            _tprec = cond.AlwaysTrue()
        self._tprec = _tprec
        self._errors = errors
        columns_str = _list_str(list(column_to_dtype.keys()))
        suffix = "s" if len(column_to_dtype) > 1 else ""
        super_kwargs = {
            "exmsg": ColumnDtypeEnforcer._DEF_COL_DTYPE_ENF_EXC_MSG.format(columns_str),
            "desc": f"Enforce column{suffix} dtype with {column_to_dtype}",
        }
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _col_to_dtype_from_X(self, X: pandas.DataFrame) -> Dict:
        try:
            return self._column_to_dtype
        except AttributeError:
            column_to_dtype = {}
            for k, dtype in self._dynamic_column_to_dtype.items():
                try:
                    columns = []
                    try:
                        if k._fittable and self._is_being_fitted:
                            columns = k.fit_transform(X)
                    except AttributeError:
                        # k is not a ColumnQualifier
                        columns = k(X)
                    column_to_dtype.update({lbl: dtype for lbl in columns})
                except TypeError:  # k is not a callable
                    column_to_dtype[k] = dtype
            return column_to_dtype

    def _prec(self, X: pandas.DataFrame) -> bool:
        if self._is_being_fitted:
            return self._tprec.fit_transform(X)
        return self._tprec(X)

    def _transform(
        self,
        X: pandas.DataFrame,
        verbose: bool,
    ) -> pandas.DataFrame:
        lbl_to_dtype = self._col_to_dtype_from_X(X)
        return X.astype(
            dtype=lbl_to_dtype,
            copy=True,
            errors=self._errors,
        )

ConditionValidator

Bases: PdPipelineStage

A pipeline stage that validates boolean conditions on dataframes.

The stage does not change the input dataframe in any way.

The constructor expects either a single callable or a list-like of callable objects, and checks that all these callable return True - meaning all defined conditions hold - for input dataframes.

Naturally, pdpipe Condition objects from the pdpipe.cond module can be used.

Parameters:

Name Type Description Default
conditions callable or list-like of callable

The conditions to check for input dataframes. Naturally, pdpipe Condition objects from the pdpipe.cond module can be used.

required
reducer callable, optional

The callable that reduces the list of boolean result to a single result. By default the built-in all function is used, so all conditions must hold for this pipeline stage to validate an input dataframe. The built-in any function may be used to validate at least one condition holds, and of course custom reducing functions can be used.

all
errors str, default 'raise'

If set to 'raise', the default, then if the result boolean result is False a FailedConditionError is raised on stage application. If set to 'ignore', then conditions are checked, the results are printed if the application was called with verbose=True, and pipeline application continues. Any other value is interpreted as 'raise'.

'raise'
**kwargs object

All PdPipelineStage constructor parameters are supported.

{}

Examples:

>>> import pandas as pd; import pdpipe as pdp;
>>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
>>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df)
Traceback (most recent call last):
   ...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
>>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df)
Traceback (most recent call last):
   ...
pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
Source code in pdpipe/basic_stages.py
class ConditionValidator(PdPipelineStage):
    """
    A pipeline stage that validates boolean conditions on dataframes.

    The stage does not change the input dataframe in any way.

    The constructor expects either a single callable or a list-like of callable
    objects, and checks that all these callable return True - meaning all
    defined conditions hold - for input dataframes.

    Naturally, pdpipe `Condition` objects from the `pdpipe.cond` module can be used.

    Parameters
    ----------
    conditions : callable or list-like of callable
        The conditions to check for input dataframes. Naturally, pdpipe
        `Condition` objects from the `pdpipe.cond` module can be used.
    reducer : callable, optional
        The callable that reduces the list of boolean result to a single
        result. By default the built-in `all` function is used, so all
        conditions must hold for this pipeline stage to validate an input
        dataframe. The built-in `any` function may be used to validate at least
        one condition holds, and of course custom reducing functions can be
        used.
    errors : str, default 'raise'
        If set to 'raise', the default, then if the result boolean result is
        False a FailedConditionError is raised on stage application. If set to
        'ignore', then conditions are checked, the results are printed if the
        application was called with `verbose=True`, and pipeline application
        continues. Any other value is interpreted as 'raise'.
    **kwargs : object
        All PdPipelineStage constructor parameters are supported.

    Examples
    --------
    >>> import pandas as pd; import pdpipe as pdp;
    >>> df = pd.DataFrame([[1,4],[4,None],[1,11]], [1,2,3], ['a','b'])
    >>> pdp.ConditionValidator(lambda df: len(df.columns) == 5).apply(df)
    Traceback (most recent call last):
       ...
    pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!

    >>> pdp.ConditionValidator(pdp.cond.HasNoMissingValues()).apply(df)
    Traceback (most recent call last):
       ...
    pdpipe.exceptions.FailedConditionError: ConditionValidator stage failed; some conditions did not hold for the input dataframe!
    """  # noqa: E501

    def __init__(
        self,
        conditions: Union[Callable, List[Callable]],
        reducer: Optional[Callable] = all,
        errors: Optional[str] = "raise",
        **kwargs: object,
    ):
        if callable(conditions):
            conditions = [conditions]
        self._conditions = conditions
        self._reducer = reducer
        self._errors = "raise"
        self._raise = True
        if errors == "ignore":
            self._errors = "ignore"
            self._raise = False
        super_kwargs = {"desc": "Validates conditions"}
        super_kwargs.update(**kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return True

    def _transform(self, X, verbose):
        results = []
        for cond_obj in self._conditions:
            try:
                res = cond_obj(X)
            except Exception as e:
                raise ValueError(
                    f"Supplied condition raised a {e} exception when applied "
                    "to input dataframe!"
                ) from e
            if verbose and not res:
                cond_repr = cond_obj.__doc__
                if cond_repr is None:
                    cond_repr = str(cond_obj)
                print(f"  + Condition failed for input dataframe: {cond_repr}")
            results.append(res)
        reduced_result = self._reducer(results)
        if self._raise and not reduced_result:
            raise FailedConditionError(
                "ConditionValidator stage failed; some conditions did not hold"
                " for the input dataframe!"
            )
        return X

ApplicationContextEnricher

Bases: PdPipelineStage

A pipeline stage that enriches the pipeline's application context.

Keyword arguments can be either PdPipelineStage constructor arguments, in which case they are passed to the stage constructor, or they can be mappings to be added to the application context. If a key maps to a callable, then the callable is called with the input dataframe as the first argument, and the application context is passed as a keyword argument if the callable expects it (otherwise only the input dataframe is passed), and the result is stored in the application context mapped by the key. If the key maps to a non-callable object, the mapping is simply stored in the application context.

Mappings are evaluated in the order they are passed to the constructor.

For example, ApplicationContextEnricher(suma=lambda df: df['a'].sum()) will add the sum of the 'a' column to the application context keys under the key 'suma'. Lated stages can then access the value of 'suma' with self.application_context['suma'].

Similarly, ApplicationContextEnricher(b=5) will add the {'b': 5} mapping to the application context.

Gradual mappings that use earlier mappings should be given in order, e.g.:

ApplicationContextEnricher(
    asum=lambda df: df['a'].sum(),
    bsum=lambda df: df['a'].mean(),
    absumdiff=lambda df, application_context: application_context['asum'] - application_context['bsum'],
)
If all three quantities are required by later stages, and computation time for the intermediate quantities is substantial, then this approach can save redundant computation.

Parameters:

Name Type Description Default
**kwargs str to object mapping

The mappings to be added to the application context. Also supports all PdPipelineStage constructor parameters.

{}
Source code in pdpipe/basic_stages.py
class ApplicationContextEnricher(PdPipelineStage):
    """
    A pipeline stage that enriches the pipeline's application context.

    Keyword arguments can be either PdPipelineStage constructor arguments, in
    which case they are passed to the stage constructor, or they can be
    mappings to be added to the application context. If a key maps to a
    callable, then the callable is called with the input dataframe as the first
    argument, and the application context is passed as a keyword argument if
    the callable expects it (otherwise only the input dataframe is passed),
    and the result is stored in the application context mapped by the key. If
    the key maps to a non-callable object, the mapping is simply stored in the
    application context.

    Mappings are evaluated in the order they are passed to the constructor.

    For example, `ApplicationContextEnricher(suma=lambda df: df['a'].sum())`
    will add the sum of the 'a' column to the application context keys under
    the key 'suma'. Lated stages can then access the value of 'suma' with
    `self.application_context['suma']`.

    Similarly, `ApplicationContextEnricher(b=5)` will add the {'b': 5} mapping
    to the application context.

    Gradual mappings that use earlier mappings should be given in order, e.g.:
    ```
    ApplicationContextEnricher(
        asum=lambda df: df['a'].sum(),
        bsum=lambda df: df['a'].mean(),
        absumdiff=lambda df, application_context: application_context['asum'] - application_context['bsum'],
    )
    ```
    If all three quantities are required by later stages, and computation time
    for the intermediate quantities is substantial, then this approach can save
    redundant computation.

    Parameters
    ----------
    **kwargs : str to object mapping
        The mappings to be added to the application context. Also supports
        all PdPipelineStage constructor parameters.
    """  # noqa: E501

    def __init__(
        self,
        **kwargs: object,
    ):
        init_kwargs, enrichments = self._split_kwargs(kwargs)
        self._enrichments = enrichments
        super_kwargs = {"desc": "Enrich application context"}
        super_kwargs.update(**init_kwargs)
        super().__init__(**super_kwargs)

    def _prec(self, X):
        return True

    def _transform(self, X, verbose):
        for k, v in self._enrichments.items():
            if callable(v):
                try:
                    self.application_context[k] = v(
                        X, application_context=self.application_context
                    )
                except TypeError:
                    try:
                        self.application_context[k] = v(X)
                    except Exception as e:
                        raise ValueError(
                            (
                                f"Supplied enrichment function raised a {e} "
                                "exception when applied to input dataframe!"
                            )
                        ) from e
            else:
                self.application_context[k] = v
        return X

Last update: 2022-01-19