Creating custom pipeline stages
Basic tranformers
Basic pipeline stages, that just want to perform the same transformation, on fit_transform and on transform, need only implement the _transform() and _prec() methods, as in this simplified version of the ColRename pipeline stage:
import pdpipe as pdp
class ColRename(pdp.PdPipelineStage):
def __init__(self, rename_mapper: Union[Dict, Callable], **kwargs):
self._rename_mapper = rename_mapper
try:
keys_set = set(self._rename_mapper.keys())
required_labels = list(keys_set)
_tprec = pdp.cond.HasAllColumns(required_labels)
except AttributeError: # rename mapper is a callable
_tprec = pdp.cond.AlwaysTrue()
self._tprec = _tprec
super_kwargs = {
'exmsg': (
"ColRename stage failed because not all expected columns "
"were found in the input dataframe!
),
'desc': f"Rename column with {self.mapper_repr}",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X):
return self._tprec(X)
def _transform(self, X, verbose):
return X.rename(columns=self._rename_mapper)
What happens to y?
pdpipe has built-in support for X-y transformations for supervised learning, and both pipelines and pipeline stages are adaptive: If only X, and input dataframe, was provided, than the transformed dataframe is returned. If both X and y were returned, the appropriately transformed versions of both of them are returned, as an (X, y) tuple.
However, since most pipeline stages only transform X, the common way to define custom pipeline stages only requires you to implement your transformation of the input dataframe. As long as you only drop and/or rearrange rows, we will make sure y will go through the respective transformation, as pdpipe makes sure X and y has an identical index.
If you want to write pipeline stages that either add rows or change the index, you must explicitly define your transformation for both X and y. In that case, take a look at the last section, detailing how to do that.
Column-based pipeline stages
If you wish to write a stage operating on a specific set of columns, you can extend the ColumnsBasedPipelineStage, which has built-in flexible interpretation abilities of the columns parameter, enabling you to get a single label, a list of labels, or a callable (and thus work with pdpipe's powerful ColumnQualifier objects), and for users to also detail column sets by exclucsion rather than inclusion.
In this case, instead of implementing the _transform() method, you need to implement the unique (to ColumnsBasedPipelineStage) version of it, _transformation() (and you're spared for implementint the precondition method _prec() yourself):
import pdpipe as pdp
class DropDuplicates(pdp.ColumnsBasedPipelineStage):
def __init__(self, **kwargs):
super_kwargs = {
'desc_temp': 'Drop duplicates in columns {}', # (1)
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'all'
super().__init__(**super_kwargs)
def _transformation(self, X, verbose, fit):
columns = self._get_columns(X, fit=fit)
inter_X = X.drop_duplicates(subset=columns)
if verbose:
print(f"{len(X) - len(inter_X)} rows dropped.")
return inter_X
desc_tempis another unique constructor parameter ofColumnsBasedPipelineStage. You can put in a format string with{}as a template for the stage description, and the super-class will fill it with an appropriate string representation of the set of columns the user chose to operate on (e.g. "all", "X except for y and z", and so on).
Getting columns when using ColumnsBasedPipelineStage
If you're extending ColumnsBasedPipelineStage, let its constructor handle all column-related parameters (see the documentation of ColumnsBasedPipelineStage) . Then, inside your implementation of the _transformation() method, call the _get_columns() method, providing it with both the input dataframe X, and the boolean fit context parameter fit. You'll get back the set of columns to operate on and ca take it from there.
Transformers with fit status
If, alternatively, you want to build a pipeline stage that fits on the input dataframe during fit_transform, keeping some parameters that determine specific future transformations done using calls to transform, you also need to implement the _fit_transform method, as in this simplified version of the Schematize pipeline stage:
import pdpipe as pdp
class Schematize(pdp.PdPipelineStage):
"""Enforces a column schema on input dataframes."""
def __init__(
self,
columns: Optional[List[object]],
**kwargs: object,
) -> None:
if columns is None:
self._adaptive = True
self._columns = None
self._columns_str = '<Learnable Schema>'
exmsg = "Learnable schematize failed in precondition unexpectedly!"
else:
self._adaptive = False
self._columns = _interpret_columns_param(columns)
self._columns_str = _list_str(self._columns)
exmsg = (
f"Not all required columns {self._columns_str} "
f"found in input dataframe!"
)
desc = (
f"Transform input dataframes to the following schema: "
f"{self._columns_str}"
)
super_kwargs = {
'exmsg': exmsg,
'desc': desc,
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X: pandas.DataFrame) -> bool:
if self._adaptive and not self.is_fitted:
return True
return set(self._columns).issubset(X.columns)
def _transform(
self, X: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
return X[self._columns]
def _fit_transform(
self, X: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
if self._adaptive:
self._columns = X.columns
self.is_fitted = True
return X
return X[self._columns]
The is_fitted attribute
Don't forget to set self.is_fitted = True when youre done fit-transforming your data! That's how the pipeline will know to direct future applications of the stage to the _transform method, where you can assume to have any fit-dependent attributes set.
Transforming both X and y
pdpipe has built-in support for X-y transformations for supervised learning, and both pipelines and pipeline stages are adaptive: If only X, and input dataframe, was provided, than the transformed dataframe is returned. If both X and y were returned, the appropriately transformed versions of both of them are returned, as an (X, y) tuple.
However, since most pipeline stages only transform X, the common way to define custom pipeline stages only requires you to implement your transformation of the input dataframe. As long as you only drop and/or rearrange rows, we will make sure y will go through the respective transformation, as pdpipe makes sure X and y has an identical index.
If you want to write pipeline stages that either add rows or change the index, you must explicitly define your transformation for both X and y. This is done by additionally defining the _transform_Xy() method if you're writing a transform-only stage (with no fit/not-fit state), and the _fit_transform_Xy() method if you need your stage to have a fit-dependent state.
Take, for example, a very simplified version of the DropLabelsByValues stage (the actual version supports several ways to detail the by-value dropping logic), as an example for a transform-only X-y tranformer:
class DropLabelsByValues(PdPipelineStage):
def __init__(
self,
in_set: Optional[Iterable[object]] = None,
**kwargs: object,
) -> None:
self.in_set = in_set
super_kwargs = {
'desc': "Drop labels by values",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X, y): # (1)
return y is not None
def _transform(self, X, verbose): # (2)
raise UnexpectedPipelineMethodCallError( # (3)
"DropLabelsByValues._transform() is not expected to be called!")
def _transform_Xy(self, X, y, verbose): # (4)
post_y = y
if self.in_set is not None:
post_y = post_y.loc[~ post_y.isin(self.in_set)]
elif self.in_ranges is not None:
to_drop = y.copy()
to_drop.loc[:] = False
for in_range in self.in_ranges:
to_drop = to_drop | (y.between(*in_range))
post_y = post_y.loc[~to_drop]
elif self.not_in_set is not None:
post_y = y.isin(self.not_in_set)
elif self.not_in_ranges is not None:
to_keep = y.copy()
to_keep.loc[:] = False
for in_range in self.not_in_ranges:
to_keep = to_keep | (y.between(*in_range))
post_y = post_y.loc[to_keep]
else:
raise PipelineInitializationError(
"DropLabelsByValues: No drop conditions specified.")
return X, post_y # (5)
- We implement a standard precondition for pipeline stages that wish to transform
y, or bothXandy; checking that the inputyparameter isn'tNone. - We have to implement
_transform()as its an abstract method ofPdPipelineStage. - We make sure our benign implementation of
_transform()raise the uniqueUnexpectedPipelineMethodCallErrorexception on each call. This code would never be called (unless someone calls it by hand, or an implementation bug is found in thepdpipelibrary itself. - Unlike
_transform(), the_transform_Xy()recieves bothXandyas parameters, and return both of them. - A nice thing that
PdPipelineStagedoes for us is automatically re-align and re-indexXaccording to the transformedy(and the other way around), so the method just needs to detail the transformation fory. You may, of course, transform both, or manually re-align them usingreturn X.loc[post_y.index], post_y.
Similarly, the EncodeLabel pipeline stage provides a simple example for an X-y tranformer with a fit-state, so one implementing both the _transform_Xy() and the _fit_transform_Xy() methods:
class EncodeLabel(PdPipelineStage):
def __init__(self, **kwargs: object) -> None:
super_kwargs = {
'desc': "Encode label values",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X, y):
return y is not None
def _transform(self, X, verbose):
raise UnexpectedPipelineMethodCallError(
"EncodeLabel._transform() is not expected to be called!")
def _fit_transform_Xy(self, X, y, verbose):
self.encoder_ = sklearn.preprocessing.LabelEncoder()
post_y = self.encoder_.fit_transform(y)
post_y = pd.Series(data=post_y, index=y.index)
self.is_fitted = True
return X, post_y
def _transform_Xy(self, X, y, verbose):
try:
post_y = self.encoder_.transform(y)
post_y = pd.Series(data=post_y, index=y.index)
return X, post_y
except AttributeError:
raise UnfittedPipelineStageError("EncodeLabel is not fitted!")
Continue to the in-depth guide
A more in-depth guide to subclassing pdpipe.PdPipelineStage, and related classes, can be found in our Develop section:
That's it!
Getting help
Remember you can get help on our Gitter chat or on our GitHub Discussions forum.