Skip to content

Feature Transformers

centimators.feature_transformers.ranking

Ranking transformers for cross-sectional normalization.

RankTransformer

Bases: _BaseFeatureTransformer

RankTransformer transforms features into their normalized rank within groups defined by a date series.

Parameters:

Name Type Description Default
feature_names list of str

Names of columns to transform. If None, all columns of X are used.

None

Examples:

>>> import pandas as pd
>>> from centimators.feature_transformers import RankTransformer
>>> df = pd.DataFrame({
...     'date': ['2021-01-01', '2021-01-01', '2021-01-02'],
...     'feature1': [3, 1, 2],
...     'feature2': [30, 20, 10]
... })
>>> transformer = RankTransformer(feature_names=['feature1', 'feature2'])
>>> result = transformer.fit_transform(df[['feature1', 'feature2']], date_series=df['date'])
>>> print(result)
   feature1_rank  feature2_rank
0            0.5            0.5
1            1.0            1.0
2            1.0            1.0
Source code in src/centimators/feature_transformers/ranking.py
class RankTransformer(_BaseFeatureTransformer):
    """
    RankTransformer transforms features into their normalized rank within groups defined by a date series.

    Args:
        feature_names (list of str, optional): Names of columns to transform.
            If None, all columns of X are used.

    Examples:
        >>> import pandas as pd
        >>> from centimators.feature_transformers import RankTransformer
        >>> df = pd.DataFrame({
        ...     'date': ['2021-01-01', '2021-01-01', '2021-01-02'],
        ...     'feature1': [3, 1, 2],
        ...     'feature2': [30, 20, 10]
        ... })
        >>> transformer = RankTransformer(feature_names=['feature1', 'feature2'])
        >>> result = transformer.fit_transform(df[['feature1', 'feature2']], date_series=df['date'])
        >>> print(result)
           feature1_rank  feature2_rank
        0            0.5            0.5
        1            1.0            1.0
        2            1.0            1.0
    """

    def __init__(self, feature_names=None):
        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None, date_series: IntoSeries = None) -> FrameT:
        """Transforms features to their normalized rank.

        Args:
            X (FrameT): Input data frame.
            y (Any, optional): Ignored. Kept for compatibility.
            date_series (IntoSeries, optional): Series defining groups for ranking (e.g., dates).

        Returns:
            FrameT: Transformed data frame with ranked features.
        """
        X, date_col_name = _attach_group(X, date_series, "date")

        # compute absolute rank for each feature
        rank_columns: list[nw.Expr] = [
            nw.col(feature_name)
            .rank()
            .over(date_col_name)
            .alias(f"{feature_name}_rank_temp")
            for feature_name in self.feature_names
        ]

        # compute count for each feature
        count_columns: list[nw.Expr] = [
            nw.col(feature_name)
            .count()
            .over(date_col_name)
            .alias(f"{feature_name}_count")
            for feature_name in self.feature_names
        ]

        X = X.select([*rank_columns, *count_columns])

        # compute normalized rank for each feature
        final_columns: list[nw.Expr] = [
            (
                nw.col(f"{feature_name}_rank_temp") / nw.col(f"{feature_name}_count")
            ).alias(f"{feature_name}_rank")
            for feature_name in self.feature_names
        ]

        X = X.select(final_columns)

        return X

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Returns the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names.
        """
        return [f"{feature_name}_rank" for feature_name in self.feature_names]

transform(X, y=None, date_series=None)

Transforms features to their normalized rank.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y Any

Ignored. Kept for compatibility.

None
date_series IntoSeries

Series defining groups for ranking (e.g., dates).

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with ranked features.

Source code in src/centimators/feature_transformers/ranking.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None, date_series: IntoSeries = None) -> FrameT:
    """Transforms features to their normalized rank.

    Args:
        X (FrameT): Input data frame.
        y (Any, optional): Ignored. Kept for compatibility.
        date_series (IntoSeries, optional): Series defining groups for ranking (e.g., dates).

    Returns:
        FrameT: Transformed data frame with ranked features.
    """
    X, date_col_name = _attach_group(X, date_series, "date")

    # compute absolute rank for each feature
    rank_columns: list[nw.Expr] = [
        nw.col(feature_name)
        .rank()
        .over(date_col_name)
        .alias(f"{feature_name}_rank_temp")
        for feature_name in self.feature_names
    ]

    # compute count for each feature
    count_columns: list[nw.Expr] = [
        nw.col(feature_name)
        .count()
        .over(date_col_name)
        .alias(f"{feature_name}_count")
        for feature_name in self.feature_names
    ]

    X = X.select([*rank_columns, *count_columns])

    # compute normalized rank for each feature
    final_columns: list[nw.Expr] = [
        (
            nw.col(f"{feature_name}_rank_temp") / nw.col(f"{feature_name}_count")
        ).alias(f"{feature_name}_rank")
        for feature_name in self.feature_names
    ]

    X = X.select(final_columns)

    return X

get_feature_names_out(input_features=None)

Returns the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names.

Source code in src/centimators/feature_transformers/ranking.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Returns the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names.
    """
    return [f"{feature_name}_rank" for feature_name in self.feature_names]

centimators.feature_transformers.time_series

Time-series feature transformers for grouped temporal operations.

LagTransformer

Bases: _BaseFeatureTransformer

LagTransformer shifts features by specified lag windows within groups defined by a ticker series.

Parameters:

Name Type Description Default
windows iterable of int

Lag periods to compute. Each feature will have shifted versions for each lag.

required
feature_names list of str

Names of columns to transform. If None, all columns of X are used.

None

Examples:

>>> import pandas as pd
>>> from centimators.feature_transformers import LagTransformer
>>> df = pd.DataFrame({
...     'ticker': ['A', 'A', 'A', 'B', 'B'],
...     'price': [10, 11, 12, 20, 21]
... })
>>> transformer = LagTransformer(windows=[1, 2], feature_names=['price'])
>>> result = transformer.fit_transform(df[['price']], ticker_series=df['ticker'])
>>> print(result)
   price_lag1  price_lag2
0         NaN         NaN
1        10.0         NaN
2        11.0        10.0
3         NaN         NaN
4        20.0         NaN
Source code in src/centimators/feature_transformers/time_series.py
class LagTransformer(_BaseFeatureTransformer):
    """
    LagTransformer shifts features by specified lag windows within groups defined by a ticker series.

    Args:
        windows (iterable of int): Lag periods to compute. Each feature will have
            shifted versions for each lag.
        feature_names (list of str, optional): Names of columns to transform.
            If None, all columns of X are used.

    Examples:
        >>> import pandas as pd
        >>> from centimators.feature_transformers import LagTransformer
        >>> df = pd.DataFrame({
        ...     'ticker': ['A', 'A', 'A', 'B', 'B'],
        ...     'price': [10, 11, 12, 20, 21]
        ... })
        >>> transformer = LagTransformer(windows=[1, 2], feature_names=['price'])
        >>> result = transformer.fit_transform(df[['price']], ticker_series=df['ticker'])
        >>> print(result)
           price_lag1  price_lag2
        0         NaN         NaN
        1        10.0         NaN
        2        11.0        10.0
        3         NaN         NaN
        4        20.0         NaN
    """

    def __init__(self, windows, feature_names=None):
        self.windows = sorted(windows, reverse=True)
        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(
        self,
        X: FrameT,
        y=None,
        ticker_series: IntoSeries = None,
    ) -> FrameT:
        """Applies lag transformation to the features.

        Args:
            X (FrameT): Input data frame.
            y (Any, optional): Ignored. Kept for compatibility.
            ticker_series (IntoSeries, optional): Series defining groups for lagging (e.g., tickers).

        Returns:
            FrameT: Transformed data frame with lagged features. Columns are ordered
                by lag (as in `self.windows`), then by feature (as in `self.feature_names`).
                For example, with `windows=[2,1]` and `feature_names=['A','B']`,
                the output columns will be `A_lag2, B_lag2, A_lag1, B_lag1`.
        """
        X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

        lag_columns = [
            nw.col(feature_name)
            .shift(lag)
            .alias(f"{feature_name}_lag{lag}")
            .over(ticker_col_name)
            for lag in self.windows  # Iterate over lags first
            for feature_name in self.feature_names  # Then over feature names
        ]

        X = X.select(lag_columns)

        return X

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Returns the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names, ordered by lag, then by feature.
        """
        return [
            f"{feature_name}_lag{lag}"
            for lag in self.windows  # Iterate over lags first
            for feature_name in self.feature_names  # Then over feature names
        ]

transform(X, y=None, ticker_series=None)

Applies lag transformation to the features.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y Any

Ignored. Kept for compatibility.

None
ticker_series IntoSeries

Series defining groups for lagging (e.g., tickers).

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with lagged features. Columns are ordered by lag (as in self.windows), then by feature (as in self.feature_names). For example, with windows=[2,1] and feature_names=['A','B'], the output columns will be A_lag2, B_lag2, A_lag1, B_lag1.

Source code in src/centimators/feature_transformers/time_series.py
@nw.narwhalify(allow_series=True)
def transform(
    self,
    X: FrameT,
    y=None,
    ticker_series: IntoSeries = None,
) -> FrameT:
    """Applies lag transformation to the features.

    Args:
        X (FrameT): Input data frame.
        y (Any, optional): Ignored. Kept for compatibility.
        ticker_series (IntoSeries, optional): Series defining groups for lagging (e.g., tickers).

    Returns:
        FrameT: Transformed data frame with lagged features. Columns are ordered
            by lag (as in `self.windows`), then by feature (as in `self.feature_names`).
            For example, with `windows=[2,1]` and `feature_names=['A','B']`,
            the output columns will be `A_lag2, B_lag2, A_lag1, B_lag1`.
    """
    X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

    lag_columns = [
        nw.col(feature_name)
        .shift(lag)
        .alias(f"{feature_name}_lag{lag}")
        .over(ticker_col_name)
        for lag in self.windows  # Iterate over lags first
        for feature_name in self.feature_names  # Then over feature names
    ]

    X = X.select(lag_columns)

    return X

get_feature_names_out(input_features=None)

Returns the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names, ordered by lag, then by feature.

Source code in src/centimators/feature_transformers/time_series.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Returns the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names, ordered by lag, then by feature.
    """
    return [
        f"{feature_name}_lag{lag}"
        for lag in self.windows  # Iterate over lags first
        for feature_name in self.feature_names  # Then over feature names
    ]

MovingAverageTransformer

Bases: _BaseFeatureTransformer

MovingAverageTransformer computes the moving average of a feature over a specified window.

Parameters:

Name Type Description Default
windows list of int

The windows over which to compute the moving average.

required
feature_names list of str

The names of the features to compute the moving average for.

None
Source code in src/centimators/feature_transformers/time_series.py
class MovingAverageTransformer(_BaseFeatureTransformer):
    """
    MovingAverageTransformer computes the moving average of a feature over a specified window.

    Args:
        windows (list of int): The windows over which to compute the moving average.
        feature_names (list of str, optional): The names of the features to compute
            the moving average for.
    """

    def __init__(self, windows, feature_names=None):
        self.windows = windows
        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None, ticker_series: IntoSeries = None) -> FrameT:
        """Applies moving average transformation to the features.

        Args:
            X (FrameT): Input data frame.
            y (Any, optional): Ignored. Kept for compatibility.
            ticker_series (IntoSeries, optional): Series defining groups for moving average (e.g., tickers).

        Returns:
            FrameT: Transformed data frame with moving average features.
        """
        X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

        ma_columns = [
            nw.col(feature_name)
            .rolling_mean(window_size=window)
            .over(ticker_col_name)
            .alias(f"{feature_name}_ma{window}")
            for feature_name in self.feature_names
            for window in self.windows
        ]

        X = X.select(ma_columns)

        return X

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Returns the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names.
        """
        return [
            f"{feature_name}_ma{window}"
            for feature_name in self.feature_names
            for window in self.windows
        ]

transform(X, y=None, ticker_series=None)

Applies moving average transformation to the features.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y Any

Ignored. Kept for compatibility.

None
ticker_series IntoSeries

Series defining groups for moving average (e.g., tickers).

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with moving average features.

Source code in src/centimators/feature_transformers/time_series.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None, ticker_series: IntoSeries = None) -> FrameT:
    """Applies moving average transformation to the features.

    Args:
        X (FrameT): Input data frame.
        y (Any, optional): Ignored. Kept for compatibility.
        ticker_series (IntoSeries, optional): Series defining groups for moving average (e.g., tickers).

    Returns:
        FrameT: Transformed data frame with moving average features.
    """
    X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

    ma_columns = [
        nw.col(feature_name)
        .rolling_mean(window_size=window)
        .over(ticker_col_name)
        .alias(f"{feature_name}_ma{window}")
        for feature_name in self.feature_names
        for window in self.windows
    ]

    X = X.select(ma_columns)

    return X

get_feature_names_out(input_features=None)

Returns the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names.

Source code in src/centimators/feature_transformers/time_series.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Returns the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names.
    """
    return [
        f"{feature_name}_ma{window}"
        for feature_name in self.feature_names
        for window in self.windows
    ]

LogReturnTransformer

Bases: _BaseFeatureTransformer

LogReturnTransformer computes the log return of a feature.

Parameters:

Name Type Description Default
feature_names list of str

Names of columns to transform. If None, all columns of X are used.

None
Source code in src/centimators/feature_transformers/time_series.py
class LogReturnTransformer(_BaseFeatureTransformer):
    """
    LogReturnTransformer computes the log return of a feature.

    Args:
        feature_names (list of str, optional): Names of columns to transform.
            If None, all columns of X are used.
    """

    def __init__(self, feature_names=None):
        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None, ticker_series: IntoSeries = None) -> FrameT:
        """Applies log return transformation to the features.

        Args:
            X (FrameT): Input data frame.
            y (Any, optional): Ignored. Kept for compatibility.
            ticker_series (IntoSeries, optional): Series defining groups for log return (e.g., tickers).

        Returns:
            FrameT: Transformed data frame with log return features.
        """
        X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

        log_return_columns = [
            nw.col(feature_name)
            .log()
            .diff()
            .over(ticker_col_name)
            .alias(f"{feature_name}_logreturn")
            for feature_name in self.feature_names
        ]

        X = X.select(log_return_columns)

        return X

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Returns the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names.
        """
        return [f"{feature_name}_logreturn" for feature_name in self.feature_names]

transform(X, y=None, ticker_series=None)

Applies log return transformation to the features.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y Any

Ignored. Kept for compatibility.

None
ticker_series IntoSeries

Series defining groups for log return (e.g., tickers).

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with log return features.

Source code in src/centimators/feature_transformers/time_series.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None, ticker_series: IntoSeries = None) -> FrameT:
    """Applies log return transformation to the features.

    Args:
        X (FrameT): Input data frame.
        y (Any, optional): Ignored. Kept for compatibility.
        ticker_series (IntoSeries, optional): Series defining groups for log return (e.g., tickers).

    Returns:
        FrameT: Transformed data frame with log return features.
    """
    X, ticker_col_name = _attach_group(X, ticker_series, "ticker")

    log_return_columns = [
        nw.col(feature_name)
        .log()
        .diff()
        .over(ticker_col_name)
        .alias(f"{feature_name}_logreturn")
        for feature_name in self.feature_names
    ]

    X = X.select(log_return_columns)

    return X

get_feature_names_out(input_features=None)

Returns the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names.

Source code in src/centimators/feature_transformers/time_series.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Returns the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names.
    """
    return [f"{feature_name}_logreturn" for feature_name in self.feature_names]

centimators.feature_transformers.stats

Statistical transformers for horizontal aggregations.

GroupStatsTransformer

Bases: _BaseFeatureTransformer

GroupStatsTransformer calculates statistical measures for defined feature groups.

This transformer computes mean, standard deviation, and skewness for each group of features specified in the feature_group_mapping.

Parameters:

Name Type Description Default
feature_group_mapping dict

Dictionary mapping group names to lists of feature columns. Example: {'group1': ['feature1', 'feature2'], 'group2': ['feature3', 'feature4']}

required
stats list of str

List of statistics to compute for each group. If None, all statistics are computed. Valid options are 'mean', 'std', 'skew', 'kurt', 'range', and 'cv'.

['mean', 'std', 'skew', 'kurt', 'range', 'cv']

Examples:

>>> import pandas as pd
>>> from centimators.feature_transformers import GroupStatsTransformer
>>> df = pd.DataFrame({
...     'feature1': [1, 2, 3],
...     'feature2': [4, 5, 6],
...     'feature3': [7, 8, 9],
...     'feature4': [10, 11, 12]
... })
>>> mapping = {'group1': ['feature1', 'feature2'], 'group2': ['feature3', 'feature4']}
>>> transformer = GroupStatsTransformer(feature_group_mapping=mapping)
>>> result = transformer.fit_transform(df)
>>> print(result)
   group1_groupstats_mean  group1_groupstats_std  group1_groupstats_skew  group2_groupstats_mean  group2_groupstats_std  group2_groupstats_skew
0                  2.5                 1.5                  0.0                  8.5                 1.5                  0.0
1                  3.5                 1.5                  0.0                  9.5                 1.5                  0.0
2                  4.5                 1.5                  0.0                 10.5                 1.5                  0.0
>>> transformer_mean_only = GroupStatsTransformer(feature_group_mapping=mapping, stats=['mean'])
>>> result_mean_only = transformer_mean_only.fit_transform(df)
>>> print(result_mean_only)
   group1_groupstats_mean  group2_groupstats_mean
0                  2.5                  8.5
1                  3.5                  9.5
2                  4.5                 10.5
Source code in src/centimators/feature_transformers/stats.py
class GroupStatsTransformer(_BaseFeatureTransformer):
    """
    GroupStatsTransformer calculates statistical measures for defined feature groups.

    This transformer computes mean, standard deviation, and skewness for each
    group of features specified in the feature_group_mapping.

    Args:
        feature_group_mapping (dict): Dictionary mapping group names to lists of
            feature columns. Example: {'group1': ['feature1', 'feature2'],
            'group2': ['feature3', 'feature4']}
        stats (list of str, optional): List of statistics to compute for each group.
            If None, all statistics are computed. Valid options are 'mean', 'std',
            'skew', 'kurt', 'range', and 'cv'.

    Examples:
        >>> import pandas as pd
        >>> from centimators.feature_transformers import GroupStatsTransformer
        >>> df = pd.DataFrame({
        ...     'feature1': [1, 2, 3],
        ...     'feature2': [4, 5, 6],
        ...     'feature3': [7, 8, 9],
        ...     'feature4': [10, 11, 12]
        ... })
        >>> mapping = {'group1': ['feature1', 'feature2'], 'group2': ['feature3', 'feature4']}
        >>> transformer = GroupStatsTransformer(feature_group_mapping=mapping)
        >>> result = transformer.fit_transform(df)
        >>> print(result)
           group1_groupstats_mean  group1_groupstats_std  group1_groupstats_skew  group2_groupstats_mean  group2_groupstats_std  group2_groupstats_skew
        0                  2.5                 1.5                  0.0                  8.5                 1.5                  0.0
        1                  3.5                 1.5                  0.0                  9.5                 1.5                  0.0
        2                  4.5                 1.5                  0.0                 10.5                 1.5                  0.0
        >>> transformer_mean_only = GroupStatsTransformer(feature_group_mapping=mapping, stats=['mean'])
        >>> result_mean_only = transformer_mean_only.fit_transform(df)
        >>> print(result_mean_only)
           group1_groupstats_mean  group2_groupstats_mean
        0                  2.5                  8.5
        1                  3.5                  9.5
        2                  4.5                 10.5
    """

    def __init__(
        self,
        feature_group_mapping: dict,
        stats: list[str] = ["mean", "std", "skew", "kurt", "range", "cv"],
    ):
        super().__init__(feature_names=None)
        self.feature_group_mapping = feature_group_mapping
        self.groups = list(feature_group_mapping.keys())
        # Supported statistics
        valid_stats = ["mean", "std", "skew", "kurt", "range", "cv"]
        if not all(stat in valid_stats for stat in stats):
            raise ValueError(
                f"stats must be a list containing only {valid_stats}. Got {stats}"
            )
        self.stats = stats

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None) -> FrameT:
        """Calculates group statistics on the features.

        Args:
            X (FrameT): Input data frame.
            y (Any, optional): Ignored. Kept for compatibility.

        Returns:
            FrameT: Transformed data frame with group statistics features.
        """
        _expr_factories: dict[str, Callable[[list[str]], nw.Expr]] = {
            "mean": lambda cols: nw.mean_horizontal(*cols),
            "std": lambda cols: std_horizontal(*cols, ddof=1),
            "skew": lambda cols: skew_horizontal(*cols),
            "kurt": lambda cols: kurtosis_horizontal(*cols),
            "range": lambda cols: range_horizontal(*cols),
            "cv": lambda cols: coefficient_of_variation_horizontal(*cols),
        }

        _min_required_cols: dict[str, int] = {
            "mean": 1,
            "range": 1,
            "std": 2,  # ddof=1 ⇒ need at least 2 values for a finite result
            "cv": 2,  # depends on std
            "skew": 3,  # bias-corrected formula needs ≥3
            "kurt": 4,  # bias-corrected formula needs ≥4
        }

        stat_expressions: list[nw.Expr] = []

        for group, cols in self.feature_group_mapping.items():
            if not cols:
                raise ValueError(
                    f"No valid columns found for group '{group}' in the input frame."
                )

            n_cols = len(cols)

            for stat in self.stats:
                # Warn early if result is guaranteed to be NaN
                min_required = _min_required_cols[stat]
                if n_cols < min_required:
                    warnings.warn(
                        (
                            f"{self.__class__.__name__}: statistic '{stat}' for group "
                            f"'{group}' requires at least {min_required} feature column(s) "
                            f"but only {n_cols} provided – the resulting column will be NaN."
                        ),
                        RuntimeWarning,
                        stacklevel=2,
                    )

                expr = _expr_factories[stat](cols).alias(f"{group}_groupstats_{stat}")
                stat_expressions.append(expr)

        return X.select(stat_expressions)

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Return feature names for all groups.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names.
        """
        return [
            f"{group}_groupstats_{stat}" for group in self.groups for stat in self.stats
        ]

transform(X, y=None)

Calculates group statistics on the features.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y Any

Ignored. Kept for compatibility.

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with group statistics features.

Source code in src/centimators/feature_transformers/stats.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None) -> FrameT:
    """Calculates group statistics on the features.

    Args:
        X (FrameT): Input data frame.
        y (Any, optional): Ignored. Kept for compatibility.

    Returns:
        FrameT: Transformed data frame with group statistics features.
    """
    _expr_factories: dict[str, Callable[[list[str]], nw.Expr]] = {
        "mean": lambda cols: nw.mean_horizontal(*cols),
        "std": lambda cols: std_horizontal(*cols, ddof=1),
        "skew": lambda cols: skew_horizontal(*cols),
        "kurt": lambda cols: kurtosis_horizontal(*cols),
        "range": lambda cols: range_horizontal(*cols),
        "cv": lambda cols: coefficient_of_variation_horizontal(*cols),
    }

    _min_required_cols: dict[str, int] = {
        "mean": 1,
        "range": 1,
        "std": 2,  # ddof=1 ⇒ need at least 2 values for a finite result
        "cv": 2,  # depends on std
        "skew": 3,  # bias-corrected formula needs ≥3
        "kurt": 4,  # bias-corrected formula needs ≥4
    }

    stat_expressions: list[nw.Expr] = []

    for group, cols in self.feature_group_mapping.items():
        if not cols:
            raise ValueError(
                f"No valid columns found for group '{group}' in the input frame."
            )

        n_cols = len(cols)

        for stat in self.stats:
            # Warn early if result is guaranteed to be NaN
            min_required = _min_required_cols[stat]
            if n_cols < min_required:
                warnings.warn(
                    (
                        f"{self.__class__.__name__}: statistic '{stat}' for group "
                        f"'{group}' requires at least {min_required} feature column(s) "
                        f"but only {n_cols} provided – the resulting column will be NaN."
                    ),
                    RuntimeWarning,
                    stacklevel=2,
                )

            expr = _expr_factories[stat](cols).alias(f"{group}_groupstats_{stat}")
            stat_expressions.append(expr)

    return X.select(stat_expressions)

get_feature_names_out(input_features=None)

Return feature names for all groups.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names.

Source code in src/centimators/feature_transformers/stats.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Return feature names for all groups.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names.
    """
    return [
        f"{group}_groupstats_{stat}" for group in self.groups for stat in self.stats
    ]

centimators.feature_transformers.neutralization

Neutralization transformers for reducing feature exposure.

FeatureNeutralizer

Bases: _BaseFeatureTransformer

Classic feature neutralization by subtracting a linear model to reduce feature exposure.

This transformer neutralizes predictions by removing their linear relationship with specified features. For each era, it: 1. Gaussianizes the predictions (rank -> normalize -> inverse CDF) 2. Fits a linear model: prediction ~ features 3. Subtracts proportion * exposure from predictions 4. Re-normalizes and scales to [0, 1]

Parameters:

Name Type Description Default
proportion float or list of float

How much to neutralize in range [0, 1]. 0 = no neutralization, 1 = full neutralization. If list, creates multiple output columns (one per proportion).

0.5
pred_name str or list of str

Name(s) of prediction column(s) to neutralize. Used for generating output column names.

'prediction'
feature_names list of str

Names of feature columns to neutralize against. If None, all columns of X are used.

None
suffix str

Suffix to append to output column names.

None
n_jobs int

Number of parallel jobs. 1 = sequential (default), -1 = all cores.

1
verbose bool

Show progress bar over eras. Default False.

False

Examples:

>>> import pandas as pd
>>> from centimators.feature_transformers import FeatureNeutralizer
>>> # Sample data with eras, features, and predictions
>>> df = pd.DataFrame({
...     'era': ['era1', 'era1', 'era1', 'era2', 'era2', 'era2'],
...     'feature1': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
...     'feature2': [0.6, 0.5, 0.4, 0.3, 0.2, 0.1],
...     'prediction': [0.7, 0.8, 0.9, 0.6, 0.7, 0.8]
... })
>>> neutralizer = FeatureNeutralizer(
...     proportion=0.5,
...     pred_name='prediction',
...     feature_names=['feature1', 'feature2']
... )
>>> # Predictions to neutralize (can be separate from features)
>>> result = neutralizer.fit_transform(
...     df[['prediction']],
...     features=df[['feature1', 'feature2']],
...     era_series=df['era']
... )
Source code in src/centimators/feature_transformers/neutralization.py
class FeatureNeutralizer(_BaseFeatureTransformer):
    """
    Classic feature neutralization by subtracting a linear model to reduce feature exposure.

    This transformer neutralizes predictions by removing their linear relationship with specified
    features. For each era, it:
    1. Gaussianizes the predictions (rank -> normalize -> inverse CDF)
    2. Fits a linear model: prediction ~ features
    3. Subtracts proportion * exposure from predictions
    4. Re-normalizes and scales to [0, 1]

    Args:
        proportion (float or list of float): How much to neutralize in range [0, 1].
            0 = no neutralization, 1 = full neutralization.
            If list, creates multiple output columns (one per proportion).
        pred_name (str or list of str): Name(s) of prediction column(s) to neutralize.
            Used for generating output column names.
        feature_names (list of str, optional): Names of feature columns to neutralize against.
            If None, all columns of X are used.
        suffix (str, optional): Suffix to append to output column names.
        n_jobs (int): Number of parallel jobs. 1 = sequential (default), -1 = all cores.
        verbose (bool): Show progress bar over eras. Default False.

    Examples:
        >>> import pandas as pd
        >>> from centimators.feature_transformers import FeatureNeutralizer
        >>> # Sample data with eras, features, and predictions
        >>> df = pd.DataFrame({
        ...     'era': ['era1', 'era1', 'era1', 'era2', 'era2', 'era2'],
        ...     'feature1': [0.1, 0.2, 0.3, 0.4, 0.5, 0.6],
        ...     'feature2': [0.6, 0.5, 0.4, 0.3, 0.2, 0.1],
        ...     'prediction': [0.7, 0.8, 0.9, 0.6, 0.7, 0.8]
        ... })
        >>> neutralizer = FeatureNeutralizer(
        ...     proportion=0.5,
        ...     pred_name='prediction',
        ...     feature_names=['feature1', 'feature2']
        ... )
        >>> # Predictions to neutralize (can be separate from features)
        >>> result = neutralizer.fit_transform(
        ...     df[['prediction']],
        ...     features=df[['feature1', 'feature2']],
        ...     era_series=df['era']
        ... )
    """

    def __init__(
        self,
        proportion: float | list[float] = 0.5,
        pred_name: str | list[str] = "prediction",
        feature_names: list[str] | None = None,
        suffix: str | None = None,
        n_jobs: int = 1,
        verbose: bool = False,
    ):
        # Normalize inputs to lists
        self.pred_names = [pred_name] if isinstance(pred_name, str) else pred_name
        self.proportions = [proportion] if isinstance(proportion, float) else proportion

        # Validate
        assert len(self.pred_names) == len(set(self.pred_names)), (
            "Duplicate pred_names found."
        )
        for prop in self.proportions:
            assert 0.0 <= prop <= 1.0, f"proportion should be in [0, 1]. Got {prop}."

        self.suffix = suffix
        self.n_jobs = n_jobs
        self.verbose = verbose

        # Generate output column names
        self._output_names = [
            (
                f"{pname}_neutralized_{prop}_{suffix}"
                if suffix
                else f"{pname}_neutralized_{prop}"
            )
            for pname in self.pred_names
            for prop in self.proportions
        ]

        # Initialize with feature_names for the features to neutralize against
        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(
        self,
        X: FrameT,
        y=None,
        features: FrameT | None = None,
        era_series: IntoSeries | None = None,
    ) -> FrameT:
        """Neutralizes predictions against features.

        Args:
            X: Input predictions to neutralize (shape: n_samples x n_predictions).
            y: Ignored. Kept for sklearn compatibility.
            features: DataFrame with features for neutralization.
                If None, uses X as both predictions and features.
            era_series: Series with era labels for grouping.
                If None, treats all data as a single era.

        Returns:
            DataFrame with neutralized predictions, scaled to [0, 1].
        """
        # If features not provided, use X as features
        if features is None:
            features = X

        # Convert to numpy for numerical operations
        predictions = _ensure_numpy(X)
        feature_array = _ensure_numpy(features)

        # Ensure predictions is 2D
        if predictions.ndim == 1:
            assert len(self.pred_names) == 1, (
                "predictions is 1D but multiple pred_names given"
            )
            predictions = predictions.reshape(-1, 1)
        else:
            assert predictions.shape[1] == len(self.pred_names), (
                f"predictions has {predictions.shape[1]} cols but {len(self.pred_names)} pred_names"
            )

        # Convert era_series to numpy
        if era_series is not None:
            eras = _ensure_numpy(era_series, allow_series=True)
        else:
            warnings.warn(
                "era_series not provided. Treating all data as a single era. "
                "This is fine for live inference (1 era) but may be incorrect "
                "for training data with multiple eras.",
                UserWarning,
            )
            eras = np.array(["X"] * len(predictions))

        # Process each prediction column and proportion
        if self.n_jobs == 1:
            # Sequential
            results = [
                self._neutralize_by_era(
                    predictions[:, pred_idx], feature_array, eras, prop, self.verbose
                )
                for pred_idx in range(len(self.pred_names))
                for prop in self.proportions
            ]
        else:
            # Parallel via joblib (disable verbose)
            tasks = [
                delayed(self._neutralize_by_era)(
                    predictions[:, pred_idx], feature_array, eras, prop, False
                )
                for pred_idx in range(len(self.pred_names))
                for prop in self.proportions
            ]
            results = Parallel(n_jobs=self.n_jobs)(tasks)

        # Stack results and convert back to dataframe with native type
        result_array = np.column_stack(results)

        # Create dictionary for dataframe construction (works with both pandas and polars)
        result_dict = {
            col_name: result_array[:, i]
            for i, col_name in enumerate(self._output_names)
        }

        # Get the native namespace to create the appropriate dataframe type
        native_namespace = nw.get_native_namespace(X)
        result_df = nw.from_native(
            native_namespace.DataFrame(result_dict),
            eager_only=True,
        )

        return result_df

    def _neutralize_by_era(
        self,
        predictions: np.ndarray,
        features: np.ndarray,
        eras: np.ndarray,
        proportion: float,
        verbose: bool = False,
    ) -> np.ndarray:
        """Neutralize predictions era by era."""
        unique_eras = np.unique(eras)
        neutralized = np.zeros_like(predictions)

        era_iter = tqdm(unique_eras, desc=f"prop={proportion}", disable=not verbose)
        for era in era_iter:
            mask = eras == era
            era_pred = predictions[mask]
            era_features = features[mask]

            # Gaussianize then neutralize
            era_pred_norm = _gaussianize(era_pred)
            era_pred_neut = self._neutralize(era_pred_norm, era_features, proportion)
            neutralized[mask] = era_pred_neut

        # Scale all neutralized predictions to [0, 1]
        return _min_max_scale(neutralized)

    @staticmethod
    def _neutralize(
        predictions: np.ndarray, features: np.ndarray, proportion: float
    ) -> np.ndarray:
        """Neutralize predictions by removing linear exposure to features.

        Args:
            predictions: Gaussianized predictions (1D)
            features: Feature matrix (2D)
            proportion: How much to neutralize [0, 1]

        Returns:
            Neutralized predictions, standardized to mean=0, std=1
        """
        # Fit linear model: predictions = features @ coeffs
        # Use lstsq to solve: features @ coeffs = predictions
        coeffs, _, _, _ = np.linalg.lstsq(features, predictions, rcond=None)

        # Compute exposure: features @ coeffs
        exposure = features @ coeffs

        # Subtract proportion of exposure
        neutralized = predictions - proportion * exposure

        # Standardize
        return neutralized / np.std(neutralized)

transform(X, y=None, features=None, era_series=None)

Neutralizes predictions against features.

Parameters:

Name Type Description Default
X FrameT

Input predictions to neutralize (shape: n_samples x n_predictions).

required
y

Ignored. Kept for sklearn compatibility.

None
features FrameT | None

DataFrame with features for neutralization. If None, uses X as both predictions and features.

None
era_series IntoSeries | None

Series with era labels for grouping. If None, treats all data as a single era.

None

Returns:

Type Description
FrameT

DataFrame with neutralized predictions, scaled to [0, 1].

Source code in src/centimators/feature_transformers/neutralization.py
@nw.narwhalify(allow_series=True)
def transform(
    self,
    X: FrameT,
    y=None,
    features: FrameT | None = None,
    era_series: IntoSeries | None = None,
) -> FrameT:
    """Neutralizes predictions against features.

    Args:
        X: Input predictions to neutralize (shape: n_samples x n_predictions).
        y: Ignored. Kept for sklearn compatibility.
        features: DataFrame with features for neutralization.
            If None, uses X as both predictions and features.
        era_series: Series with era labels for grouping.
            If None, treats all data as a single era.

    Returns:
        DataFrame with neutralized predictions, scaled to [0, 1].
    """
    # If features not provided, use X as features
    if features is None:
        features = X

    # Convert to numpy for numerical operations
    predictions = _ensure_numpy(X)
    feature_array = _ensure_numpy(features)

    # Ensure predictions is 2D
    if predictions.ndim == 1:
        assert len(self.pred_names) == 1, (
            "predictions is 1D but multiple pred_names given"
        )
        predictions = predictions.reshape(-1, 1)
    else:
        assert predictions.shape[1] == len(self.pred_names), (
            f"predictions has {predictions.shape[1]} cols but {len(self.pred_names)} pred_names"
        )

    # Convert era_series to numpy
    if era_series is not None:
        eras = _ensure_numpy(era_series, allow_series=True)
    else:
        warnings.warn(
            "era_series not provided. Treating all data as a single era. "
            "This is fine for live inference (1 era) but may be incorrect "
            "for training data with multiple eras.",
            UserWarning,
        )
        eras = np.array(["X"] * len(predictions))

    # Process each prediction column and proportion
    if self.n_jobs == 1:
        # Sequential
        results = [
            self._neutralize_by_era(
                predictions[:, pred_idx], feature_array, eras, prop, self.verbose
            )
            for pred_idx in range(len(self.pred_names))
            for prop in self.proportions
        ]
    else:
        # Parallel via joblib (disable verbose)
        tasks = [
            delayed(self._neutralize_by_era)(
                predictions[:, pred_idx], feature_array, eras, prop, False
            )
            for pred_idx in range(len(self.pred_names))
            for prop in self.proportions
        ]
        results = Parallel(n_jobs=self.n_jobs)(tasks)

    # Stack results and convert back to dataframe with native type
    result_array = np.column_stack(results)

    # Create dictionary for dataframe construction (works with both pandas and polars)
    result_dict = {
        col_name: result_array[:, i]
        for i, col_name in enumerate(self._output_names)
    }

    # Get the native namespace to create the appropriate dataframe type
    native_namespace = nw.get_native_namespace(X)
    result_df = nw.from_native(
        native_namespace.DataFrame(result_dict),
        eager_only=True,
    )

    return result_df

centimators.feature_transformers.embedding

Embedding transformers for text and categorical features using DSPy.

EmbeddingTransformer

Bases: _BaseFeatureTransformer

EmbeddingTransformer embeds text and categorical features using DSPy's Embedder.

This transformer converts text or categorical columns into dense vector embeddings using either hosted embedding models (e.g., OpenAI) or custom embedding functions (e.g., local SentenceTransformers). The embeddings are expanded into multiple columns for sklearn compatibility.

Parameters:

Name Type Description Default
model str or Callable

The embedding model to use. Can be: - A string for hosted models (e.g., "openai/text-embedding-3-small") - A callable function (e.g., SentenceTransformer.encode)

required
feature_names list[str] | None

Names of columns to embed. If None, all columns are embedded.

None
categorical_mapping dict[str, str] | None

Optional mapping from categorical column names to text templates. For example: {"sector": "Company sector: {}"} will format the sector value as "Company sector: Technology" before embedding.

None
batch_size int

Batch size for embedding computation. Default: 200.

200
caching bool

Whether to cache embeddings (for hosted models). Default: True.

True
**embedder_kwargs

Additional keyword arguments passed to dspy.Embedder.

{}

Examples:

>>> import polars as pl
>>> from centimators.feature_transformers import EmbeddingTransformer
>>> from sentence_transformers import SentenceTransformer
>>>
>>> # Example 1: Using a local model
>>> model = SentenceTransformer('all-MiniLM-L6-v2')
>>> df = pl.DataFrame({
...     'text': ['AI company', 'Bank', 'Pharma firm'],
...     'sector': ['Technology', 'Finance', 'Healthcare']
... })
>>>
>>> transformer = EmbeddingTransformer(
...     model=model.encode,
...     feature_names=['text', 'sector'],
...     categorical_mapping={'sector': 'Company sector: {}'}
... )
>>> embedded = transformer.fit_transform(df[['text', 'sector']])
>>> print(embedded.columns)  # text_embed_0, text_embed_1, ..., sector_embed_0, ...
>>>
>>> # Example 2: Using a hosted model
>>> transformer = EmbeddingTransformer(
...     model="openai/text-embedding-3-small",
...     feature_names=['text']
... )
>>> embedded = transformer.fit_transform(df[['text']])
Notes
  • Null values are skipped and filled with zero vectors
  • Embedding dimension is inferred from the first batch
  • Output columns follow the pattern: {feature_name}_embed_{dim_idx}
  • Requires centimators[dspy] installation
Source code in src/centimators/feature_transformers/embedding.py
class EmbeddingTransformer(_BaseFeatureTransformer):
    """
    EmbeddingTransformer embeds text and categorical features using DSPy's Embedder.

    This transformer converts text or categorical columns into dense vector embeddings
    using either hosted embedding models (e.g., OpenAI) or custom embedding functions
    (e.g., local SentenceTransformers). The embeddings are expanded into multiple
    columns for sklearn compatibility.

    Args:
        model (str or Callable): The embedding model to use. Can be:
            - A string for hosted models (e.g., "openai/text-embedding-3-small")
            - A callable function (e.g., SentenceTransformer.encode)
        feature_names (list[str] | None): Names of columns to embed. If None,
            all columns are embedded.
        categorical_mapping (dict[str, str] | None): Optional mapping from categorical
            column names to text templates. For example:
            {"sector": "Company sector: {}"} will format the sector value as
            "Company sector: Technology" before embedding.
        batch_size (int): Batch size for embedding computation. Default: 200.
        caching (bool): Whether to cache embeddings (for hosted models). Default: True.
        **embedder_kwargs: Additional keyword arguments passed to dspy.Embedder.

    Examples:
        >>> import polars as pl
        >>> from centimators.feature_transformers import EmbeddingTransformer
        >>> from sentence_transformers import SentenceTransformer
        >>>
        >>> # Example 1: Using a local model
        >>> model = SentenceTransformer('all-MiniLM-L6-v2')
        >>> df = pl.DataFrame({
        ...     'text': ['AI company', 'Bank', 'Pharma firm'],
        ...     'sector': ['Technology', 'Finance', 'Healthcare']
        ... })
        >>>
        >>> transformer = EmbeddingTransformer(
        ...     model=model.encode,
        ...     feature_names=['text', 'sector'],
        ...     categorical_mapping={'sector': 'Company sector: {}'}
        ... )
        >>> embedded = transformer.fit_transform(df[['text', 'sector']])
        >>> print(embedded.columns)  # text_embed_0, text_embed_1, ..., sector_embed_0, ...
        >>>
        >>> # Example 2: Using a hosted model
        >>> transformer = EmbeddingTransformer(
        ...     model="openai/text-embedding-3-small",
        ...     feature_names=['text']
        ... )
        >>> embedded = transformer.fit_transform(df[['text']])

    Notes:
        - Null values are skipped and filled with zero vectors
        - Embedding dimension is inferred from the first batch
        - Output columns follow the pattern: `{feature_name}_embed_{dim_idx}`
        - Requires `centimators[dspy]` installation
    """

    def __init__(
        self,
        model,
        feature_names: list[str] | None = None,
        categorical_mapping: dict[str, str] | None = None,
        batch_size: int = 200,
        caching: bool = True,
        **embedder_kwargs,
    ):
        super().__init__(feature_names=feature_names)
        self.model = model
        self.categorical_mapping = categorical_mapping or {}
        self.batch_size = batch_size
        self.caching = caching
        self.embedder_kwargs = embedder_kwargs
        self._embedder = None
        self._embedding_dims = {}  # Track dimension per feature

    def fit(self, X: FrameT, y=None):
        """Fit the transformer and initialize the embedder.

        Args:
            X (FrameT): Input data frame.
            y: Ignored. Kept for compatibility.

        Returns:
            EmbeddingTransformer: The fitted transformer.
        """
        super().fit(X, y)

        # Initialize DSPy embedder
        self._embedder = dspy.Embedder(
            model=self.model,
            batch_size=self.batch_size,
            caching=self.caching,
            **self.embedder_kwargs,
        )

        return self

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None) -> FrameT:
        """Transform features by embedding them into dense vectors.

        Args:
            X (FrameT): Input data frame.
            y: Ignored. Kept for compatibility.

        Returns:
            FrameT: Transformed data frame with embedding columns expanded.
                Each input feature becomes multiple columns:
                {feature_name}_embed_0, {feature_name}_embed_1, etc.
        """
        if self._embedder is None:
            raise ValueError("Transformer not fitted. Call fit() first.")

        all_embedding_cols = []

        for feature_name in self.feature_names:
            # Extract column values
            col_values = X.select(nw.col(feature_name)).to_native()

            # Convert to list of strings
            if hasattr(col_values, "to_list"):
                values_list = col_values[feature_name].to_list()
            elif hasattr(col_values, "tolist"):
                values_list = col_values[feature_name].tolist()
            else:
                values_list = list(col_values[feature_name])

            # Apply categorical mapping if specified
            if feature_name in self.categorical_mapping:
                template = self.categorical_mapping[feature_name]
                values_list = [
                    template.format(val) if val is not None else None
                    for val in values_list
                ]
            else:
                # Convert to string
                values_list = [
                    str(val) if val is not None else None for val in values_list
                ]

            # Separate null and non-null indices
            non_null_indices = [
                i for i, val in enumerate(values_list) if val is not None
            ]
            non_null_values = [values_list[i] for i in non_null_indices]

            # Compute embeddings for non-null values
            if non_null_values:
                embeddings = self._embedder(non_null_values)
                embedding_dim = embeddings.shape[1]

                # Store dimension for this feature
                self._embedding_dims[feature_name] = embedding_dim

                # Create full embedding matrix with zeros for nulls
                full_embeddings = np.zeros(
                    (len(values_list), embedding_dim), dtype=np.float32
                )
                full_embeddings[non_null_indices] = embeddings
            else:
                # All nulls - can't infer dimension
                if feature_name not in self._embedding_dims:
                    raise ValueError(
                        f"Cannot determine embedding dimension for '{feature_name}' - "
                        f"all values are null. Ensure at least one non-null value exists."
                    )
                embedding_dim = self._embedding_dims[feature_name]
                full_embeddings = np.zeros(
                    (len(values_list), embedding_dim), dtype=np.float32
                )

            # Store embeddings for this feature
            for dim_idx in range(embedding_dim):
                col_name = f"{feature_name}_embed_{dim_idx}"
                all_embedding_cols.append(
                    (col_name, full_embeddings[:, dim_idx].tolist())
                )

        # Build a df from all embedding columns and return as the same backend as X
        if all_embedding_cols:
            columns_dict = {col_name: values for col_name, values in all_embedding_cols}
            return nw.from_dict(columns_dict, backend=nw.get_native_namespace(X))
        else:
            # Return empty frame with correct number of rows
            return nw.from_dict(
                {"_empty": [None] * len(X)}, backend=nw.get_native_namespace(X)
            )

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Return the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of transformed feature names in the format
                {feature_name}_embed_{dim_idx}.

        Raises:
            ValueError: If called before transform() when dimensions are unknown.
        """
        output_names = []
        for feature_name in self.feature_names:
            if feature_name not in self._embedding_dims:
                raise ValueError(
                    f"Cannot determine output feature names for '{feature_name}' - "
                    f"call transform() first to infer embedding dimensions."
                )
            embedding_dim = self._embedding_dims[feature_name]
            for dim_idx in range(embedding_dim):
                output_names.append(f"{feature_name}_embed_{dim_idx}")
        return output_names

fit(X, y=None)

Fit the transformer and initialize the embedder.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y

Ignored. Kept for compatibility.

None

Returns:

Name Type Description
EmbeddingTransformer

The fitted transformer.

Source code in src/centimators/feature_transformers/embedding.py
def fit(self, X: FrameT, y=None):
    """Fit the transformer and initialize the embedder.

    Args:
        X (FrameT): Input data frame.
        y: Ignored. Kept for compatibility.

    Returns:
        EmbeddingTransformer: The fitted transformer.
    """
    super().fit(X, y)

    # Initialize DSPy embedder
    self._embedder = dspy.Embedder(
        model=self.model,
        batch_size=self.batch_size,
        caching=self.caching,
        **self.embedder_kwargs,
    )

    return self

transform(X, y=None)

Transform features by embedding them into dense vectors.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y

Ignored. Kept for compatibility.

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with embedding columns expanded. Each input feature becomes multiple columns: {feature_name}_embed_0, {feature_name}_embed_1, etc.

Source code in src/centimators/feature_transformers/embedding.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None) -> FrameT:
    """Transform features by embedding them into dense vectors.

    Args:
        X (FrameT): Input data frame.
        y: Ignored. Kept for compatibility.

    Returns:
        FrameT: Transformed data frame with embedding columns expanded.
            Each input feature becomes multiple columns:
            {feature_name}_embed_0, {feature_name}_embed_1, etc.
    """
    if self._embedder is None:
        raise ValueError("Transformer not fitted. Call fit() first.")

    all_embedding_cols = []

    for feature_name in self.feature_names:
        # Extract column values
        col_values = X.select(nw.col(feature_name)).to_native()

        # Convert to list of strings
        if hasattr(col_values, "to_list"):
            values_list = col_values[feature_name].to_list()
        elif hasattr(col_values, "tolist"):
            values_list = col_values[feature_name].tolist()
        else:
            values_list = list(col_values[feature_name])

        # Apply categorical mapping if specified
        if feature_name in self.categorical_mapping:
            template = self.categorical_mapping[feature_name]
            values_list = [
                template.format(val) if val is not None else None
                for val in values_list
            ]
        else:
            # Convert to string
            values_list = [
                str(val) if val is not None else None for val in values_list
            ]

        # Separate null and non-null indices
        non_null_indices = [
            i for i, val in enumerate(values_list) if val is not None
        ]
        non_null_values = [values_list[i] for i in non_null_indices]

        # Compute embeddings for non-null values
        if non_null_values:
            embeddings = self._embedder(non_null_values)
            embedding_dim = embeddings.shape[1]

            # Store dimension for this feature
            self._embedding_dims[feature_name] = embedding_dim

            # Create full embedding matrix with zeros for nulls
            full_embeddings = np.zeros(
                (len(values_list), embedding_dim), dtype=np.float32
            )
            full_embeddings[non_null_indices] = embeddings
        else:
            # All nulls - can't infer dimension
            if feature_name not in self._embedding_dims:
                raise ValueError(
                    f"Cannot determine embedding dimension for '{feature_name}' - "
                    f"all values are null. Ensure at least one non-null value exists."
                )
            embedding_dim = self._embedding_dims[feature_name]
            full_embeddings = np.zeros(
                (len(values_list), embedding_dim), dtype=np.float32
            )

        # Store embeddings for this feature
        for dim_idx in range(embedding_dim):
            col_name = f"{feature_name}_embed_{dim_idx}"
            all_embedding_cols.append(
                (col_name, full_embeddings[:, dim_idx].tolist())
            )

    # Build a df from all embedding columns and return as the same backend as X
    if all_embedding_cols:
        columns_dict = {col_name: values for col_name, values in all_embedding_cols}
        return nw.from_dict(columns_dict, backend=nw.get_native_namespace(X))
    else:
        # Return empty frame with correct number of rows
        return nw.from_dict(
            {"_empty": [None] * len(X)}, backend=nw.get_native_namespace(X)
        )

get_feature_names_out(input_features=None)

Return the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of transformed feature names in the format {feature_name}embed.

Raises:

Type Description
ValueError

If called before transform() when dimensions are unknown.

Source code in src/centimators/feature_transformers/embedding.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Return the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of transformed feature names in the format
            {feature_name}_embed_{dim_idx}.

    Raises:
        ValueError: If called before transform() when dimensions are unknown.
    """
    output_names = []
    for feature_name in self.feature_names:
        if feature_name not in self._embedding_dims:
            raise ValueError(
                f"Cannot determine output feature names for '{feature_name}' - "
                f"call transform() first to infer embedding dimensions."
            )
        embedding_dim = self._embedding_dims[feature_name]
        for dim_idx in range(embedding_dim):
            output_names.append(f"{feature_name}_embed_{dim_idx}")
    return output_names

centimators.feature_transformers.dimreduction

Dimensionality reduction transformers for feature compression.

DimReducer

Bases: _BaseFeatureTransformer

DimReducer applies dimensionality reduction to features using PCA, t-SNE, or UMAP.

This transformer reduces the dimensionality of input features by projecting them into a lower-dimensional space using one of three methods: Principal Component Analysis (PCA), t-distributed Stochastic Neighbor Embedding (t-SNE), or Uniform Manifold Approximation and Projection (UMAP).

Parameters:

Name Type Description Default
method str

The dimensionality reduction method to use. Options are: - 'pca': Principal Component Analysis (linear, preserves global structure) - 'tsne': t-SNE (non-linear, preserves local structure, visualization) - 'umap': UMAP (non-linear, preserves local + global structure) Default: 'pca'

'pca'
n_components int

Number of dimensions in the reduced space. Default: 2

2
feature_names list[str] | None

Names of columns to reduce. If None, all columns are used.

None
**reducer_kwargs

Additional keyword arguments passed to the underlying reducer (sklearn.decomposition.PCA, sklearn.manifold.TSNE, or umap.UMAP).

{}

Examples:

>>> import polars as pl
>>> from centimators.feature_transformers import DimReducer
>>> df = pl.DataFrame({
...     'feature1': [1.0, 2.0, 3.0, 4.0],
...     'feature2': [4.0, 5.0, 6.0, 7.0],
...     'feature3': [7.0, 8.0, 9.0, 10.0],
... })
>>>
>>> # PCA reduction
>>> reducer = DimReducer(method='pca', n_components=2)
>>> reduced = reducer.fit_transform(df)
>>> print(reduced.columns)  # ['dim_0', 'dim_1']
>>>
>>> # t-SNE for visualization
>>> reducer = DimReducer(method='tsne', n_components=2, random_state=42)
>>> reduced = reducer.fit_transform(df)
>>>
>>> # UMAP (requires umap-learn)
>>> reducer = DimReducer(method='umap', n_components=2, random_state=42)
>>> reduced = reducer.fit_transform(df)
Notes
  • PCA is deterministic and fast, suitable for preprocessing
  • t-SNE is stochastic and slower, primarily for visualization (does not support separate transform - uses fit_transform internally)
  • UMAP balances speed and quality, good for both preprocessing and visualization
  • UMAP requires the umap-learn package: uv add 'centimators[all]'
  • All methods work with any narwhals-compatible backend (pandas, polars, etc.)
Source code in src/centimators/feature_transformers/dimreduction.py
class DimReducer(_BaseFeatureTransformer):
    """
    DimReducer applies dimensionality reduction to features using PCA, t-SNE, or UMAP.

    This transformer reduces the dimensionality of input features by projecting them
    into a lower-dimensional space using one of three methods: Principal Component
    Analysis (PCA), t-distributed Stochastic Neighbor Embedding (t-SNE), or Uniform
    Manifold Approximation and Projection (UMAP).

    Args:
        method (str): The dimensionality reduction method to use. Options are:
            - 'pca': Principal Component Analysis (linear, preserves global structure)
            - 'tsne': t-SNE (non-linear, preserves local structure, visualization)
            - 'umap': UMAP (non-linear, preserves local + global structure)
            Default: 'pca'
        n_components (int): Number of dimensions in the reduced space. Default: 2
        feature_names (list[str] | None): Names of columns to reduce. If None,
            all columns are used.
        **reducer_kwargs: Additional keyword arguments passed to the underlying
            reducer (sklearn.decomposition.PCA, sklearn.manifold.TSNE, or umap.UMAP).

    Examples:
        >>> import polars as pl
        >>> from centimators.feature_transformers import DimReducer
        >>> df = pl.DataFrame({
        ...     'feature1': [1.0, 2.0, 3.0, 4.0],
        ...     'feature2': [4.0, 5.0, 6.0, 7.0],
        ...     'feature3': [7.0, 8.0, 9.0, 10.0],
        ... })
        >>>
        >>> # PCA reduction
        >>> reducer = DimReducer(method='pca', n_components=2)
        >>> reduced = reducer.fit_transform(df)
        >>> print(reduced.columns)  # ['dim_0', 'dim_1']
        >>>
        >>> # t-SNE for visualization
        >>> reducer = DimReducer(method='tsne', n_components=2, random_state=42)
        >>> reduced = reducer.fit_transform(df)
        >>>
        >>> # UMAP (requires umap-learn)
        >>> reducer = DimReducer(method='umap', n_components=2, random_state=42)
        >>> reduced = reducer.fit_transform(df)

    Notes:
        - PCA is deterministic and fast, suitable for preprocessing
        - t-SNE is stochastic and slower, primarily for visualization (does not support
          separate transform - uses fit_transform internally)
        - UMAP balances speed and quality, good for both preprocessing and visualization
        - UMAP requires the umap-learn package: `uv add 'centimators[all]'`
        - All methods work with any narwhals-compatible backend (pandas, polars, etc.)
    """

    def __init__(
        self,
        method: str = "pca",
        n_components: int = 2,
        feature_names: list[str] | None = None,
        **reducer_kwargs,
    ):
        super().__init__(feature_names=feature_names)

        valid_methods = ["pca", "tsne", "umap"]
        if method not in valid_methods:
            raise ValueError(f"method must be one of {valid_methods}, got '{method}'")

        self.method = method
        self.n_components = n_components
        self.reducer_kwargs = reducer_kwargs
        self._reducer = None

    def fit(self, X: FrameT, y=None):
        """Fit the dimensionality reduction model.

        Args:
            X (FrameT): Input data frame.
            y: Ignored. Kept for compatibility.

        Returns:
            DimReducer: The fitted transformer.
        """
        super().fit(X, y)

        # Initialize the appropriate reducer
        if self.method == "pca":
            self._reducer = PCA(n_components=self.n_components, **self.reducer_kwargs)
        elif self.method == "tsne":
            self._reducer = TSNE(n_components=self.n_components, **self.reducer_kwargs)
        elif self.method == "umap":
            try:
                import umap
            except ImportError as e:
                raise ImportError(
                    "DimReducer with method='umap' requires umap-learn. Install with:\n"
                    "  uv add 'centimators[all]'\n"
                    "or:\n"
                    "  pip install 'centimators[all]'"
                ) from e
            self._reducer = umap.UMAP(
                n_components=self.n_components, **self.reducer_kwargs
            )

        # Fit the reducer on the selected features
        X_native = nw.from_native(X)
        X_subset = X_native.select(self.feature_names)
        X_numpy = X_subset.to_numpy()

        # For t-SNE, we skip fit since it doesn't support separate fit/transform
        if self.method != "tsne":
            self._reducer.fit(X_numpy)

        return self

    @nw.narwhalify(allow_series=True)
    def transform(self, X: FrameT, y=None) -> FrameT:
        """Transform features by reducing their dimensionality.

        Args:
            X (FrameT): Input data frame.
            y: Ignored. Kept for compatibility.

        Returns:
            FrameT: Transformed data frame with reduced dimensionality.
                Columns are named 'dim_0', 'dim_1', ..., 'dim_{n_components-1}'.
        """
        if self._reducer is None:
            raise ValueError("Transformer not fitted. Call fit() first.")

        # Extract features and convert to numpy
        X_subset = X.select(self.feature_names)
        X_numpy = X_subset.to_numpy()

        # Apply dimensionality reduction
        # Note: t-SNE doesn't support transform(), so we use fit_transform
        if self.method == "tsne":
            X_reduced = self._reducer.fit_transform(X_numpy)
        else:
            X_reduced = self._reducer.transform(X_numpy)

        # Create output column names
        output_cols = {f"dim_{i}": X_reduced[:, i] for i in range(self.n_components)}

        # Return as narwhals DataFrame with the same backend as input
        return nw.from_dict(output_cols, backend=nw.get_native_namespace(X))

    def get_feature_names_out(self, input_features=None) -> list[str]:
        """Return the output feature names.

        Args:
            input_features (list[str], optional): Ignored. Kept for compatibility.

        Returns:
            list[str]: List of output feature names: ['dim_0', 'dim_1', ...].
        """
        return [f"dim_{i}" for i in range(self.n_components)]

fit(X, y=None)

Fit the dimensionality reduction model.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y

Ignored. Kept for compatibility.

None

Returns:

Name Type Description
DimReducer

The fitted transformer.

Source code in src/centimators/feature_transformers/dimreduction.py
def fit(self, X: FrameT, y=None):
    """Fit the dimensionality reduction model.

    Args:
        X (FrameT): Input data frame.
        y: Ignored. Kept for compatibility.

    Returns:
        DimReducer: The fitted transformer.
    """
    super().fit(X, y)

    # Initialize the appropriate reducer
    if self.method == "pca":
        self._reducer = PCA(n_components=self.n_components, **self.reducer_kwargs)
    elif self.method == "tsne":
        self._reducer = TSNE(n_components=self.n_components, **self.reducer_kwargs)
    elif self.method == "umap":
        try:
            import umap
        except ImportError as e:
            raise ImportError(
                "DimReducer with method='umap' requires umap-learn. Install with:\n"
                "  uv add 'centimators[all]'\n"
                "or:\n"
                "  pip install 'centimators[all]'"
            ) from e
        self._reducer = umap.UMAP(
            n_components=self.n_components, **self.reducer_kwargs
        )

    # Fit the reducer on the selected features
    X_native = nw.from_native(X)
    X_subset = X_native.select(self.feature_names)
    X_numpy = X_subset.to_numpy()

    # For t-SNE, we skip fit since it doesn't support separate fit/transform
    if self.method != "tsne":
        self._reducer.fit(X_numpy)

    return self

transform(X, y=None)

Transform features by reducing their dimensionality.

Parameters:

Name Type Description Default
X FrameT

Input data frame.

required
y

Ignored. Kept for compatibility.

None

Returns:

Name Type Description
FrameT FrameT

Transformed data frame with reduced dimensionality. Columns are named 'dim_0', 'dim_1', ..., 'dim_{n_components-1}'.

Source code in src/centimators/feature_transformers/dimreduction.py
@nw.narwhalify(allow_series=True)
def transform(self, X: FrameT, y=None) -> FrameT:
    """Transform features by reducing their dimensionality.

    Args:
        X (FrameT): Input data frame.
        y: Ignored. Kept for compatibility.

    Returns:
        FrameT: Transformed data frame with reduced dimensionality.
            Columns are named 'dim_0', 'dim_1', ..., 'dim_{n_components-1}'.
    """
    if self._reducer is None:
        raise ValueError("Transformer not fitted. Call fit() first.")

    # Extract features and convert to numpy
    X_subset = X.select(self.feature_names)
    X_numpy = X_subset.to_numpy()

    # Apply dimensionality reduction
    # Note: t-SNE doesn't support transform(), so we use fit_transform
    if self.method == "tsne":
        X_reduced = self._reducer.fit_transform(X_numpy)
    else:
        X_reduced = self._reducer.transform(X_numpy)

    # Create output column names
    output_cols = {f"dim_{i}": X_reduced[:, i] for i in range(self.n_components)}

    # Return as narwhals DataFrame with the same backend as input
    return nw.from_dict(output_cols, backend=nw.get_native_namespace(X))

get_feature_names_out(input_features=None)

Return the output feature names.

Parameters:

Name Type Description Default
input_features list[str]

Ignored. Kept for compatibility.

None

Returns:

Type Description
list[str]

list[str]: List of output feature names: ['dim_0', 'dim_1', ...].

Source code in src/centimators/feature_transformers/dimreduction.py
def get_feature_names_out(self, input_features=None) -> list[str]:
    """Return the output feature names.

    Args:
        input_features (list[str], optional): Ignored. Kept for compatibility.

    Returns:
        list[str]: List of output feature names: ['dim_0', 'dim_1', ...].
    """
    return [f"dim_{i}" for i in range(self.n_components)]

centimators.feature_transformers.penalization

Feature penalization transformers using iterative optimization (requires JAX).

FeaturePenalizer

Bases: _BaseFeatureTransformer

Feature penalization using iterative optimization to cap feature exposure.

Unlike FeatureNeutralizer which subtracts a fixed proportion of linear exposure, this transformer uses gradient descent to find the minimal adjustment that caps all feature exposures below a threshold. This preserves more of the original signal while ensuring no single feature dominates.

For each era, it: 1. Gaussianizes the predictions (rank -> normalize -> inverse CDF) 2. Trains a linear model to subtract from predictions such that |exposure to any feature| <= max_exposure 3. Re-normalizes and scales to [0, 1]

Parameters:

Name Type Description Default
max_exposure float or list of float

Maximum allowed feature exposure in [0, 1]. Lower = more aggressive penalization. If list, creates multiple outputs.

0.1
pred_name str or list of str

Name(s) of prediction column(s) to penalize.

'prediction'
feature_names list of str

Names of feature columns.

None
suffix str

Suffix to append to output column names.

None
lr float

Learning rate for Adamax optimizer. Default 1e-3.

0.001
max_iters int

Maximum optimization iterations per era.

100000
tol float

Early stopping tolerance for loss.

1e-07
n_jobs int

Number of parallel jobs. 1 = sequential, -1 = all cores.

1
verbose bool

Show progress bar over eras. Default False.

False

Examples:

>>> import numpy as np
>>> import pandas as pd
>>> from centimators.feature_transformers import FeaturePenalizer
>>> df = pd.DataFrame({
...     'era': ['era1'] * 50 + ['era2'] * 50,
...     'feature1': np.random.randn(100),
...     'feature2': np.random.randn(100),
...     'prediction': np.random.randn(100)
... })
>>> penalizer = FeaturePenalizer(
...     max_exposure=0.1,
...     pred_name='prediction',
...     feature_names=['feature1', 'feature2']
... )
>>> result = penalizer.fit_transform(
...     df[['prediction']],
...     features=df[['feature1', 'feature2']],
...     era_series=df['era']
... )
Source code in src/centimators/feature_transformers/penalization.py
class FeaturePenalizer(_BaseFeatureTransformer):
    """
    Feature penalization using iterative optimization to cap feature exposure.

    Unlike FeatureNeutralizer which subtracts a fixed proportion of linear exposure,
    this transformer uses gradient descent to find the minimal adjustment that caps
    all feature exposures below a threshold. This preserves more of the original
    signal while ensuring no single feature dominates.

    For each era, it:
    1. Gaussianizes the predictions (rank -> normalize -> inverse CDF)
    2. Trains a linear model to subtract from predictions such that
       |exposure to any feature| <= max_exposure
    3. Re-normalizes and scales to [0, 1]

    Args:
        max_exposure (float or list of float): Maximum allowed feature exposure in [0, 1].
            Lower = more aggressive penalization. If list, creates multiple outputs.
        pred_name (str or list of str): Name(s) of prediction column(s) to penalize.
        feature_names (list of str, optional): Names of feature columns.
        suffix (str, optional): Suffix to append to output column names.
        lr (float): Learning rate for Adamax optimizer. Default 1e-3.
        max_iters (int): Maximum optimization iterations per era.
        tol (float): Early stopping tolerance for loss.
        n_jobs (int): Number of parallel jobs. 1 = sequential, -1 = all cores.
        verbose (bool): Show progress bar over eras. Default False.

    Examples:
        >>> import numpy as np
        >>> import pandas as pd
        >>> from centimators.feature_transformers import FeaturePenalizer
        >>> df = pd.DataFrame({
        ...     'era': ['era1'] * 50 + ['era2'] * 50,
        ...     'feature1': np.random.randn(100),
        ...     'feature2': np.random.randn(100),
        ...     'prediction': np.random.randn(100)
        ... })
        >>> penalizer = FeaturePenalizer(
        ...     max_exposure=0.1,
        ...     pred_name='prediction',
        ...     feature_names=['feature1', 'feature2']
        ... )
        >>> result = penalizer.fit_transform(
        ...     df[['prediction']],
        ...     features=df[['feature1', 'feature2']],
        ...     era_series=df['era']
        ... )
    """

    def __init__(
        self,
        max_exposure: float | list[float] = 0.1,
        pred_name: str | list[str] = "prediction",
        feature_names: list[str] | None = None,
        suffix: str | None = None,
        lr: float = 1e-3,
        max_iters: int = 100_000,
        tol: float = 1e-7,
        n_jobs: int = 1,
        verbose: bool = False,
    ):
        # Normalize inputs to lists
        self.pred_names = [pred_name] if isinstance(pred_name, str) else pred_name
        self.max_exposures = (
            [max_exposure] if isinstance(max_exposure, float) else max_exposure
        )

        # Validate
        assert len(self.pred_names) == len(set(self.pred_names)), (
            "Duplicate pred_names found."
        )
        for exp in self.max_exposures:
            assert 0.0 <= exp <= 1.0, f"max_exposure should be in [0, 1]. Got {exp}."

        self.suffix = suffix
        self.lr = lr
        self.max_iters = max_iters
        self.tol = tol
        self.n_jobs = n_jobs
        self.verbose = verbose

        # Generate output column names
        self._output_names = [
            (
                f"{pname}_penalized_{exp}_{suffix}"
                if suffix
                else f"{pname}_penalized_{exp}"
            )
            for pname in self.pred_names
            for exp in self.max_exposures
        ]

        super().__init__(feature_names)

    @nw.narwhalify(allow_series=True)
    def transform(
        self,
        X: FrameT,
        y=None,
        features: FrameT | None = None,
        era_series: IntoSeries | None = None,
    ) -> FrameT:
        """Penalize predictions to cap feature exposure.

        Args:
            X: Input predictions to penalize (shape: n_samples x n_predictions).
            y: Ignored. Kept for sklearn compatibility.
            features: DataFrame with features for penalization.
            era_series: Series with era labels for grouping.

        Returns:
            DataFrame with penalized predictions, scaled to [0, 1].
        """
        if features is None:
            features = X

        predictions = _ensure_numpy(X)
        feature_array = _ensure_numpy(features)

        if predictions.ndim == 1:
            assert len(self.pred_names) == 1
            predictions = predictions.reshape(-1, 1)
        else:
            assert predictions.shape[1] == len(self.pred_names)

        if era_series is not None:
            eras = _ensure_numpy(era_series, allow_series=True)
        else:
            warnings.warn(
                "era_series not provided. Treating all data as a single era. "
                "This is fine for live inference (1 era) but may be incorrect "
                "for training data with multiple eras.",
                UserWarning,
            )
            eras = np.array(["X"] * len(predictions))

        # Process each prediction column and max_exposure
        if self.n_jobs == 1:
            results = [
                self._penalize_by_era(
                    predictions[:, pred_idx], feature_array, eras, max_exp, self.verbose
                )
                for pred_idx in range(len(self.pred_names))
                for max_exp in self.max_exposures
            ]
        else:
            # Disable verbose in parallel mode
            tasks = [
                delayed(self._penalize_by_era)(
                    predictions[:, pred_idx], feature_array, eras, max_exp, False
                )
                for pred_idx in range(len(self.pred_names))
                for max_exp in self.max_exposures
            ]
            results = Parallel(n_jobs=self.n_jobs)(tasks)

        result_array = np.column_stack(results)
        result_dict = {
            col_name: result_array[:, i]
            for i, col_name in enumerate(self._output_names)
        }

        native_namespace = nw.get_native_namespace(X)
        return nw.from_native(
            native_namespace.DataFrame(result_dict),
            eager_only=True,
        )

    def _penalize_by_era(
        self,
        predictions: np.ndarray,
        features: np.ndarray,
        eras: np.ndarray,
        max_exposure: float,
        verbose: bool = False,
    ) -> np.ndarray:
        """Penalize predictions era by era."""
        unique_eras = np.unique(eras)
        penalized = np.zeros_like(predictions)

        era_iter = tqdm(
            unique_eras, desc=f"max_exp={max_exposure}", disable=not verbose
        )
        for era in era_iter:
            mask = eras == era
            era_pred = predictions[mask]
            era_features = features[mask]

            # Gaussianize then penalize
            era_pred_norm = _gaussianize(era_pred)
            era_pred_pen = self._reduce_exposure(
                era_pred_norm, era_features, max_exposure
            )
            # Standardize within era
            era_pred_pen = era_pred_pen / np.std(era_pred_pen)
            penalized[mask] = era_pred_pen

        return _min_max_scale(penalized)

    def _reduce_exposure(
        self,
        prediction: np.ndarray,
        features: np.ndarray,
        max_exp: float,
    ) -> np.ndarray:
        """
        Learn a linear adjustment to predictions that caps feature exposure.

        Uses Adamax optimization with full JIT compilation via lax.while_loop
        to find weights such that:
            neutralized = prediction - features @ weights
        has |exposure to any feature| <= max_exp.
        """
        feats = jnp.asarray(features - 0.5, dtype=jnp.float32)
        pred = jnp.asarray(prediction, dtype=jnp.float32)[:, None]
        n_features = feats.shape[1]

        # Target: clamp current exposures to [-max_exp, max_exp]
        target_exp = jnp.clip(self._exposures(feats, pred), -max_exp, max_exp)

        # Adamax hyperparameters
        beta1, beta2 = 0.9, 0.999
        eps = 1e-7
        lr = self.lr
        tol = self.tol
        max_iters = self.max_iters

        def loss_fn(w):
            neutralized = pred - feats @ w
            exps = self._exposures(feats, neutralized)
            pos_excess = jax.nn.relu(jax.nn.relu(exps) - jax.nn.relu(target_exp))
            neg_excess = jax.nn.relu(jax.nn.relu(-exps) - jax.nn.relu(-target_exp))
            return jnp.sum(pos_excess + neg_excess)

        def cond_fn(state):
            w, m, u, t, loss = state
            return (loss >= tol) & (t < max_iters)

        def body_fn(state):
            w, m, u, t, _ = state
            loss, grads = jax.value_and_grad(loss_fn)(w)
            m_new = beta1 * m + (1 - beta1) * grads
            u_new = jnp.maximum(beta2 * u, jnp.abs(grads))
            m_hat = m_new / (1 - beta1 ** (t + 1))
            w_new = w - lr * m_hat / (u_new + eps)
            return w_new, m_new, u_new, t + 1, loss

        @jax.jit
        def optimize():
            init_state = (
                jnp.zeros((n_features, 1)),  # weights
                jnp.zeros((n_features, 1)),  # m (first moment)
                jnp.zeros((n_features, 1)),  # u (infinity norm)
                jnp.array(0),  # t (iteration)
                jnp.array(float("inf")),  # loss
            )
            w, m, u, t, loss = lax.while_loop(cond_fn, body_fn, init_state)
            return pred - feats @ w

        neutralized = optimize()
        return np.asarray(neutralized).squeeze()

    @staticmethod
    def _exposures(x: jnp.ndarray, y: jnp.ndarray) -> jnp.ndarray:
        """Correlation between features (x) and predictions (y)."""
        x = x - jnp.mean(x, axis=0)
        x = x / jnp.linalg.norm(x, axis=0)
        y = y - jnp.mean(y, axis=0)
        y = y / jnp.linalg.norm(y, axis=0)
        return x.T @ y

transform(X, y=None, features=None, era_series=None)

Penalize predictions to cap feature exposure.

Parameters:

Name Type Description Default
X FrameT

Input predictions to penalize (shape: n_samples x n_predictions).

required
y

Ignored. Kept for sklearn compatibility.

None
features FrameT | None

DataFrame with features for penalization.

None
era_series IntoSeries | None

Series with era labels for grouping.

None

Returns:

Type Description
FrameT

DataFrame with penalized predictions, scaled to [0, 1].

Source code in src/centimators/feature_transformers/penalization.py
@nw.narwhalify(allow_series=True)
def transform(
    self,
    X: FrameT,
    y=None,
    features: FrameT | None = None,
    era_series: IntoSeries | None = None,
) -> FrameT:
    """Penalize predictions to cap feature exposure.

    Args:
        X: Input predictions to penalize (shape: n_samples x n_predictions).
        y: Ignored. Kept for sklearn compatibility.
        features: DataFrame with features for penalization.
        era_series: Series with era labels for grouping.

    Returns:
        DataFrame with penalized predictions, scaled to [0, 1].
    """
    if features is None:
        features = X

    predictions = _ensure_numpy(X)
    feature_array = _ensure_numpy(features)

    if predictions.ndim == 1:
        assert len(self.pred_names) == 1
        predictions = predictions.reshape(-1, 1)
    else:
        assert predictions.shape[1] == len(self.pred_names)

    if era_series is not None:
        eras = _ensure_numpy(era_series, allow_series=True)
    else:
        warnings.warn(
            "era_series not provided. Treating all data as a single era. "
            "This is fine for live inference (1 era) but may be incorrect "
            "for training data with multiple eras.",
            UserWarning,
        )
        eras = np.array(["X"] * len(predictions))

    # Process each prediction column and max_exposure
    if self.n_jobs == 1:
        results = [
            self._penalize_by_era(
                predictions[:, pred_idx], feature_array, eras, max_exp, self.verbose
            )
            for pred_idx in range(len(self.pred_names))
            for max_exp in self.max_exposures
        ]
    else:
        # Disable verbose in parallel mode
        tasks = [
            delayed(self._penalize_by_era)(
                predictions[:, pred_idx], feature_array, eras, max_exp, False
            )
            for pred_idx in range(len(self.pred_names))
            for max_exp in self.max_exposures
        ]
        results = Parallel(n_jobs=self.n_jobs)(tasks)

    result_array = np.column_stack(results)
    result_dict = {
        col_name: result_array[:, i]
        for i, col_name in enumerate(self._output_names)
    }

    native_namespace = nw.get_native_namespace(X)
    return nw.from_native(
        native_namespace.DataFrame(result_dict),
        eager_only=True,
    )