ModelPipeline

Putting it all together.

Overview

The functionality below uses the NumerFrame, PreProcessor, Model and PostProcessor objects to easily propagate data, generate predictions and postprocess them in one go.

Specifically, this section introduces two objects: 1. ModelPipeline: Run all preprocessing, models and postprocessing that you define and return a NumerFrame. 2. ModelPipelineCollection: Manage and run multiple ModelPipeline objects.

1. ModelPipeline

ModelPipeline handles all preprocessing, model prediction and postprocessing. It returns a NumerFrame with the preprocessed data, metadata and postprocessed prediction columns.


source

ModelPipeline

 ModelPipeline (models:List[numerblox.model.BaseModel],
                preprocessors:List[numerblox.preprocessing.BaseProcessor]=
                [], postprocessors:List[numerblox.preprocessing.BaseProces
                sor]=[], copy_first=True, standardize=True,
                pipeline_name:str=None)

Execute all preprocessing, prediction and postprocessing for a given setup.

:param models: Initiliazed numerai-blocks Models (Objects inheriting from BaseModel)

:param preprocessors: List of initialized Preprocessors.

:param postprocessors: List of initialized Postprocessors.

:param copy_first: Whether to copy the NumerFrame as a first preprocessing step.

Highly recommended in order to avoid surprise behaviour by manipulating the original dataset.

:param pipeline_name: Unique name for pipeline. Only used for display purposes.

Example using several preprocessor, dummy models and postprocessors

model_names = ["test_0.5", "test_0.8"]

dataf = create_numerframe("test_assets/mini_numerai_version_2_data.parquet")
preprocessors = [FeatureSelectionPreProcessor(feature_cols=['feature_rheumy_epistemic_prancer'])]
models = [ConstantModel(constant=0.5, model_name=model_names[0]), ConstantModel(constant=0.8, model_name=model_names[1])]
postprocessors = [MeanEnsembler(cols=[f"prediction_{name}" for name in model_names], final_col_name='prediction_ensembled'),
                  FeatureNeutralizer(feature_names=['feature_rheumy_epistemic_prancer'],
                                     pred_name='prediction_ensembled', proportion=0.8)]
test_pipeline = ModelPipeline(preprocessors=preprocessors, models=models,
                              postprocessors=postprocessors, pipeline_name="test_pipeline",
                              standardize=False)
processed_dataf = test_pipeline(dataf)
✅ Finished step CopyPreProcessor. Output shape=(10, 314). Time taken for step: 0:00:00.001927. ✅
🚧 Applying preprocessing: 'GroupStatsPreProcessor' 🚧
✅ Finished step GroupStatsPreProcessor. Output shape=(10, 332). Time taken for step: 0:00:00.045558. ✅
🚧 Applying preprocessing: 'FeatureSelectionPreProcessor' 🚧
✅ Finished step FeatureSelectionPreProcessor. Output shape=(10, 6). Time taken for step: 0:00:00.000785. ✅
🤖 Generating model predictions with 'ConstantModel'. 🤖
🤖 Generating model predictions with 'ConstantModel'. 🤖
🚧 Applying postprocessing: 'MeanEnsembler' 🚧
🍲 Ensembled '['prediction_test_0.5', 'prediction_test_0.8']' with simple mean and saved in 'prediction_ensembled' 
🍲
✅ Finished step MeanEnsembler. Output shape=(10, 9). Time taken for step: 0:00:00.003600. ✅
🚧 Applying postprocessing: 'FeatureNeutralizer' 🚧
🤖 Neutralized 'prediction_ensembled' with proportion '0.8' 🤖
New neutralized column = 'prediction_ensembled_neutralized_0.8'.
✅ Finished step FeatureNeutralizer. Output shape=(10, 10). Time taken for step: 0:00:00.015465. ✅
🏁 Finished pipeline: 'test_pipeline'! 🏁
assert processed_dataf.meta == dataf.meta
assert isinstance(processed_dataf, NumerFrame)
processed_dataf.head(2)
feature_intelligence_mean feature_intelligence_std target id era data_type prediction_test_0.5 prediction_test_0.8 prediction_ensembled prediction_ensembled_neutralized_0.8
0 0.333333 0.246183 0.50 n000315175b67977 era1 train 0.5 0.8 0.65 0.00000
1 0.208333 0.234359 0.25 n0014af834a96cdd era1 train 0.5 0.8 0.65 0.36088

2. ModelPipelineCollection

ModelPipelineCollection can be used to manage and run multiple ModelPipeline objects.

ModelPipelineCollection simply takes a list of ModelPipeline objects as input.


source

ModelPipelineCollection

 ModelPipelineCollection (pipelines:List[__main__.ModelPipeline])

Execute multiple initialized ModelPipelines in a sequence.

:param pipelines: List of initialized ModelPipelines.

We introduce a different pipeline with no preprocessing or postprocessing. Only a RandomModel.

test_pipeline2 = ModelPipeline(models=[RandomModel()], pipeline_name="test_pipeline2")

We process two ModelPipelines with different characteristics on the same data.

collection = ModelPipelineCollection([test_pipeline, test_pipeline2])
assert collection.get_pipeline("test_pipeline2").pipeline_name == 'test_pipeline2'
result_datasets = collection(dataf=dataf)
👷 Processing model pipeline: 'test_pipeline' 👷
✅ Finished step CopyPreProcessor. Output shape=(10, 314). Time taken for step: 0:00:00.002404. ✅
🚧 Applying preprocessing: 'GroupStatsPreProcessor' 🚧
✅ Finished step GroupStatsPreProcessor. Output shape=(10, 332). Time taken for step: 0:00:00.026447. ✅
🚧 Applying preprocessing: 'FeatureSelectionPreProcessor' 🚧
✅ Finished step FeatureSelectionPreProcessor. Output shape=(10, 6). Time taken for step: 0:00:00.000721. ✅
🤖 Generating model predictions with 'ConstantModel'. 🤖
🤖 Generating model predictions with 'ConstantModel'. 🤖
🚧 Applying postprocessing: 'MeanEnsembler' 🚧
🍲 Ensembled '['prediction_test_0.5', 'prediction_test_0.8']' with simple mean and saved in 'prediction_ensembled' 
🍲
✅ Finished step MeanEnsembler. Output shape=(10, 9). Time taken for step: 0:00:00.003288. ✅
🚧 Applying postprocessing: 'FeatureNeutralizer' 🚧
🤖 Neutralized 'prediction_ensembled' with proportion '0.8' 🤖
New neutralized column = 'prediction_ensembled_neutralized_0.8'.
✅ Finished step FeatureNeutralizer. Output shape=(10, 10). Time taken for step: 0:00:00.012334. ✅
🏁 Finished pipeline: 'test_pipeline'! 🏁
👷 Processing model pipeline: 'test_pipeline2' 👷
✅ Finished step CopyPreProcessor. Output shape=(10, 314). Time taken for step: 0:00:00.002063. ✅
🤖 Generating model predictions with 'RandomModel'. 🤖
✅ Finished step Standardizer. Output shape=(10, 315). Time taken for step: 0:00:00.004046. ✅
🏁 Finished pipeline: 'test_pipeline2'! 🏁

The ModelPipelineCollection returns a dictionary mapping pipeline names to NumerFrame objects, retaining all metadata and added prediction columns for each. Note that in this example, the 1st NumerFrame had a feature selection step, so it did not retain all columns. However, the second dataset retained all feature columns, because no preprocessing was done.

result_datasets.keys()
dict_keys(['test_pipeline', 'test_pipeline2'])
result_datasets['test_pipeline'].head(2)
feature_intelligence_mean feature_intelligence_std target id era data_type prediction_test_0.5 prediction_test_0.8 prediction_ensembled prediction_ensembled_neutralized_0.8
0 0.333333 0.246183 0.50 n000315175b67977 era1 train 0.5 0.8 0.65 0.00000
1 0.208333 0.234359 0.25 n0014af834a96cdd era1 train 0.5 0.8 0.65 0.36088
result_datasets['test_pipeline2'].head(2)
id era data_type feature_intelligence1 feature_intelligence2 feature_intelligence3 feature_intelligence4 feature_intelligence5 feature_intelligence6 feature_intelligence7 ... feature_wisdom39 feature_wisdom40 feature_wisdom41 feature_wisdom42 feature_wisdom43 feature_wisdom44 feature_wisdom45 feature_wisdom46 target prediction_random
0 n000315175b67977 era1 train 0.0 0.5 0.25 0.00 0.5 0.25 0.25 ... 1.0 0.75 0.5 0.75 0.50 1.0 0.50 0.75 0.50 0.5
1 n0014af834a96cdd era1 train 0.0 0.0 0.00 0.25 0.5 0.00 0.00 ... 1.0 0.00 0.0 0.75 0.25 0.0 0.25 1.00 0.25 0.1

2 rows × 315 columns