Creating additional stages
Extending PdPipelineStage
To use other stages than the built-in ones (see Types of Pipeline Stages) you can extend the class. The constructor must pass the PdPipelineStage constructor the exmsg, appmsg and desc keyword arguments to set the exception message, application message and description for the pipeline stage, respectively. Additionally, the _prec and _transform abstract methods must be implemented to define the precondition and the effect of the new pipeline stage, respectively.
Here is an example with a simple - non-fitable - version of the Schematize pipeline stage:
class Schematize(PdPipelineStage):
def __init__(self, columns: List[object],**kwargs: object) -> None:
self._columns = columns
self._columns_str = _list_str(self._columns)
exmsg = (
f"Not all required columns {self._columns_str} "
f"found in input dataframe!"
)
desc = (
f"Transform input dataframes to the following schema: "
f"{self._columns_str}"
)
super_kwargs = {
'exmsg': exmsg,
'desc': desc,
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df: pandas.DataFrame) -> bool:
return set(self._columns).issubset(df.columns)
def _transform(
self, df: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
return df[self._columns]
Fittable custom pipeline stages should implement, additionally to the method, the _fit_transform method, which should both fit pipeline stage by the input dataframe and transform transform the dataframe, while also setting self.is_fitted = True.
Here is the the Schematize stage, this time with an adaptive capability
(activated when the parameter columns=None) that makes it a fittable pipeline
stage:
class Schematize(PdPipelineStage):
def __init__(
self,
columns: Optional[List[object]],
**kwargs: object,
) -> None:
if columns is None:
self._adaptive = True
self._columns = None
self._columns_str = '<Learnable Schema>'
exmsg = "Learnable schematize failed in precondition unexpectedly!"
else:
self._adaptive = False
self._columns = columns
self._columns_str = _list_str(self._columns)
exmsg = (
f"Not all required columns {self._columns_str} "
f"found in input dataframe!"
)
desc = (
f"Transform input dataframes to the following schema: "
f"{self._columns_str}"
)
super_kwargs = {
'exmsg': exmsg,
'desc': desc,
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, df: pandas.DataFrame) -> bool:
if self._adaptive and not self.is_fitted:
return True
return set(self._columns).issubset(df.columns)
def _transform(
self, df: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
return df[self._columns]
def _fit_transform(
self, df: pandas.DataFrame, verbose=None) -> pandas.DataFrame:
if self._adaptive:
self._columns = df.columns
self.is_fitted = True
return df
return df[self._columns]
Creating pipeline stages that operate on column subsets
Many pipeline stages in pdpipe operate on a subset of columns, allowing the
caller to determine this subset by either providing a fixed set of column
labels or by providing a callable that determines the column subset dynamically
from input dataframes. The pdpipe.cq module addresses a unique but important
use case of fittable column qualifier, which is to dynamically extract a column
subset on stage fit time, but keep it fixed for future transformations.
As a general rule, every pipeline stage in pdpipe that supports the columns
parameter should inherently support fittable column qualifier, and generally
the correct interpretation of both single and multiple labels as arguments. To
unify the implementation of such functionality, and to ease the creation of new
pipeline stages, such columns should be created by extending the
ColumnsBasedPipelineStage base class, found in the pdpipe.core module.
Extending the ColumnsBasedPipelineStage class
The main way sub-classes of ColumnBasedPipelineStage should interact with it
is through the columns, exclude_columns and none_columns constructor
arguments, and the "private" _get_columns(df, fit) method.
Any extending subclass should accept the columns constructor parameter
and forward it, without transforming it, to the constructor of
ColumnsBasedPipelineStage. E.g.
python super().__init__(columns=columns, **kwargs). See the
implementation of any such extending class for a more complete example.
class ColDrop(ColumnsBasedPipelineStage):
def __init__(
self,
columns: ColumnsParamType,
**kwargs: object,
) -> None:
self._post_cond = cond.HasNoColumn(columns) # (1)
super().__init__(columns=columns, **kwargs)
- Unrelated to this specific use case, this is a good example of a post-condition that makes sure the output dataframe the stage returns indeed does not include the columns meant to be removed.
The exclude_columns parameter
Extending subclasses can decide if they want to expose the
exclude_columns parameter or not. Note that most of its functionality
can anyway be gained by providing the columns parameter with a column
qualifier object that is a difference between two column qualifiers; e.g.
columns=cq.OfDtype(np.number) - cq.OfDtype(np.int64) is equivalent to
providing columns=cq.OfDtype(np.number),
exclude_columns=cq.OfDtype(np.int64). However, exposing the
exclude_columns parameter can allow for specific unique behaviours; for
example, if the none_columns parameter - which configures the behavior
when columns is provided with None - is set with
a cq.OfDtypes('category') column qualifier, which means that all
categorical columns are selected when columns=None, then exposing
exclude_columns allows for easy specification of the "all categorical
columns except X" by just giving a column qualifier capturing X to
exclude_columns, instead of having to reconstruct the default column
qualifier by hand and substract from it the one representing X.
Getting the columns to operate on
When wishing to get the subset of columns to operate on, in
fit_transform or transform time, it is attained by calling
self._get_columns(df, fit=True) (or with fit=False if just
transforming), providing it with the input dataframe.
Description and application message
Additionally, to get a description and application message with a nice
string representation of the list of columns to operate on, the
desc_temp constructor parameter of ColumnsBasedPipelineStage can be
provided with a format string with a place holder where the column list
should go. E.g. "Drop columns {}" for the DropCol pipeline stage.
Wrapping it all up we get the following example for the constructor of a columns-based pipeline
Code-example
class ColDrop(ColumnsBasedPipelineStage):
def __init__(
self,
columns: ColumnsParamType,
errors: Optional[str] = None,
**kwargs: object,
) -> None:
self._errors = errors
self._post_cond = cond.HasNoColumn(columns)
super_kwargs = {
'columns': columns,
'desc_temp': 'Drop columns {}',
}
super_kwargs.update(**kwargs)
super_kwargs['none_columns'] = 'error'
super().__init__(**super_kwargs)
Fittable vs unfittable ColumnBasedPipelineStage
There are two correct ways to extend it, depending on whether the pipeline stage you're creating is inherently fittable or not:
-
If the stage is NOT inherently fittable, then the ability to accept fittable column qualifier objects makes it so. However, to enable extending subclasses to implement their transformation using a single method, they can simply implement the abstract method
_transformation(self, df, verbose, fit). It should treat thedfandverboseparameters normally, but forward thefitparameter to the_get_columnsmethod when calling it. This is enough to get a pipeline stage with the desired behavior, with the super-class handling all the fit/transform functionality. -
If the stage IS inherently fittable, then do not use the
_transformationabstract method (it has to be implemented, so just have it raise aNotImplementedError). Instead, simply override the_fit_transformand_transformmethod of ColumnsBasedPipelineStage, calling thefitparameter of the_get_columnsmethod with the correct arguement:Truewhen fit-transforming andFalsewhen transforming.
Again, taking a look at the VERY concise implementation of simple columns-based
stages, like ColDrop or ValDrop in pdpipe.basic_stages, will probably make
things clearer, and you can use those implementations as a template for yours.
Transforming both X and y
pdpipe has built-in support for X-y transformations for supervised learning, and both pipelines and pipeline stages are adaptive: If only X, and input dataframe, was provided, than the transformed dataframe is returned. If both X and y were returned, the appropriately transformed versions of both of them are returned, as an (X, y) tuple.
However, since most pipeline stages only transform X, the common way to define custom pipeline stages only requires you to implement your transformation of the input dataframe. As long as you only drop and/or rearrange rows, we will make sure y will go through the respective transformation, as pdpipe makes sure X and y has an identical index.
If you want to write pipeline stages that either add rows or change the index, you must explicitly define your transformation for both X and y. This is done by additionally defining the _transform_Xy() method if you're writing a transform-only stage (with no fit/not-fit state), and the _fit_transform_Xy() method if you need your stage to have a fit-dependent state.
Take, for example, a very simplified version of the DropLabelsByValues stage (the actual version supports several ways to detail the by-value dropping logic), as an example for a transform-only X-y tranformer:
class DropLabelsByValues(PdPipelineStage):
def __init__(
self,
in_set: Optional[Iterable[object]] = None,
**kwargs: object,
) -> None:
self.in_set = in_set
super_kwargs = {
'desc': "Drop labels by values",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X, y): # (1)
return y is not None
def _transform(self, X, verbose): # (2)
raise UnexpectedPipelineMethodCallError( # (3)
"DropLabelsByValues._transform() is not expected to be called!")
def _transform_Xy(self, X, y, verbose): # (4)
post_y = y
if self.in_set is not None:
post_y = post_y.loc[~ post_y.isin(self.in_set)]
elif self.in_ranges is not None:
to_drop = y.copy()
to_drop.loc[:] = False
for in_range in self.in_ranges:
to_drop = to_drop | (y.between(*in_range))
post_y = post_y.loc[~to_drop]
elif self.not_in_set is not None:
post_y = y.isin(self.not_in_set)
elif self.not_in_ranges is not None:
to_keep = y.copy()
to_keep.loc[:] = False
for in_range in self.not_in_ranges:
to_keep = to_keep | (y.between(*in_range))
post_y = post_y.loc[to_keep]
else:
raise PipelineInitializationError(
"DropLabelsByValues: No drop conditions specified.")
return X, post_y # (5)
- We implement a standard precondition for pipeline stages that wish to transform
y, or bothXandy; checking that the inputyparameter isn'tNone. - We have to implement
_transform()as its an abstract method ofPdPipelineStage. - We make sure our benign implementation of
_transform()raise the uniqueUnexpectedPipelineMethodCallErrorexception on each call. This code would never be called (unless someone calls it by hand, or an implementation bug is found in thepdpipelibrary itself. - Unlike
_transform(), the_transform_Xy()recieves bothXandyas parameters, and return both of them. - A nice thing that
PdPipelineStagedoes for us is automatically re-align and re-indexXaccording to the transformedy(and the other way around), so the method just needs to detail the transformation fory. You may, of course, transform both, or manually re-align them usingreturn X.loc[post_y.index], post_y.
Similarly, the EncodeLabel pipeline stage provides a simple example for an X-y tranformer with a fit-state, so one implementing both the _transform_Xy() and the _fit_transform_Xy() methods:
class EncodeLabel(PdPipelineStage):
def __init__(self, **kwargs: object) -> None:
super_kwargs = {
'desc': "Encode label values",
}
super_kwargs.update(**kwargs)
super().__init__(**super_kwargs)
def _prec(self, X, y):
return y is not None
def _transform(self, X, verbose):
raise UnexpectedPipelineMethodCallError(
"EncodeLabel._transform() is not expected to be called!")
def _fit_transform_Xy(self, X, y, verbose):
self.encoder_ = sklearn.preprocessing.LabelEncoder()
post_y = self.encoder_.fit_transform(y)
post_y = pd.Series(data=post_y, index=y.index)
self.is_fitted = True
return X, post_y
def _transform_Xy(self, X, y, verbose):
try:
post_y = self.encoder_.transform(y)
post_y = pd.Series(data=post_y, index=y.index)
return X, post_y
except AttributeError:
raise UnfittedPipelineStageError("EncodeLabel is not fitted!")
That's it!
Getting help
Remember you can get help on our Gitter chat or on our GitHub Discussions forum.