From 5ddf4c5c6f566fd98ca61f794c0402d1d0a7ce56 Mon Sep 17 00:00:00 2001 From: yonishelach Date: Tue, 18 Jun 2024 12:15:59 +0300 Subject: [PATCH] [Feature-selection] Replace matplotlib with plotly --- feature_selection/feature_selection.py | 52 +++++-------------- feature_selection/function.yaml | 17 +++---- feature_selection/item.yaml | 4 +- feature_selection/requirements.txt | 4 +- feature_selection/test_feature_selection.py | 56 ++++++++++++++------- 5 files changed, 59 insertions(+), 74 deletions(-) diff --git a/feature_selection/feature_selection.py b/feature_selection/feature_selection.py index 630a09694..30fa8f904 100644 --- a/feature_selection/feature_selection.py +++ b/feature_selection/feature_selection.py @@ -13,17 +13,15 @@ # limitations under the License. # import json -import os -import matplotlib.pyplot as plt import mlrun import mlrun.datastore -import mlrun.utils import mlrun.feature_store as fs +import mlrun.utils import numpy as np import pandas as pd -import seaborn as sns -from mlrun.artifacts import PlotArtifact +import plotly.express as px +from mlrun.artifacts import PlotlyArtifact from mlrun.datastore.targets import ParquetTarget # MLRun utils from mlrun.utils.helpers import create_class @@ -42,15 +40,6 @@ } -def _clear_current_figure(): - """ - Clear matplotlib current figure. - """ - plt.cla() - plt.clf() - plt.close() - - def show_values_on_bars(axs, h_v="v", space=0.4): def _show_on_single_plot(ax_): if h_v == "v": @@ -74,33 +63,18 @@ def _show_on_single_plot(ax_): def plot_stat(context, stat_name, stat_df): - _clear_current_figure() - - # Add chart - ax = plt.axes() - stat_chart = sns.barplot( + sorted_df = stat_df.sort_values(stat_name) + fig = px.bar( + data_frame=sorted_df, x=stat_name, - y="index", - data=stat_df.sort_values(stat_name, ascending=False).reset_index(), - ax=ax, + y=sorted_df.index, + title=f"{stat_name} feature scores", + color=stat_name, ) - plt.tight_layout() - - for p in stat_chart.patches: - width = p.get_width() - plt.text( - 5 + p.get_width(), - p.get_y() + 0.55 * p.get_height(), - "{:1.2f}".format(width), - ha="center", - va="center", - ) - context.log_artifact( - PlotArtifact(f"{stat_name}", body=plt.gcf()), - local_path=os.path.join("plots", "feature_selection", f"{stat_name}.html"), + item=PlotlyArtifact(key=stat_name, figure=fig), + local_path=f"{stat_name}.html", ) - _clear_current_figure() def feature_selection( @@ -115,7 +89,6 @@ def feature_selection( sample_ratio: float = None, output_vector_name: float = None, ignore_type_errors: bool = False, - is_feature_vector: bool = False, ): """ Applies selected feature selection statistical functions or models on our 'df_artifact'. @@ -138,10 +111,9 @@ def feature_selection( model name (ex. LinearSVC), formalized json (contains 'CLASS', 'FIT', 'META') or a path to such json file. :param max_scaled_scores: produce feature scores table scaled with max_scaler. - :param sample_ratio: percentage of the dataset the user whishes to compute the feature selection process on. + :param sample_ratio: percentage of the dataset the user wishes to compute the feature selection process on. :param output_vector_name: creates a new feature vector containing only the identifies features. :param ignore_type_errors: skips datatypes that are neither float nor int within the feature vector. - :param is_feature_vector: bool stating if the data is passed as a feature vector. """ stat_filters = stat_filters or DEFAULT_STAT_FILTERS model_filters = model_filters or DEFAULT_MODEL_FILTERS diff --git a/feature_selection/function.yaml b/feature_selection/function.yaml index 0851f54d3..aca1f0c0c 100644 --- a/feature_selection/function.yaml +++ b/feature_selection/function.yaml @@ -2,7 +2,7 @@ kind: job metadata: name: feature-selection tag: '' - hash: 6dba16d062d81f78d3d210fee75edfe8b1def9b3 + hash: 5815ef4c27a1f08c9d8d3f88ad6bd4c9cb5c7f4a project: '' labels: author: orz @@ -14,7 +14,7 @@ spec: args: [] image: mlrun/mlrun build: - functionSourceCode: # Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os

import matplotlib.pyplot as plt
import mlrun
import mlrun.datastore
import mlrun.utils
import mlrun.feature_store as fs
import numpy as np
import pandas as pd
import seaborn as sns
from mlrun.artifacts import PlotArtifact
from mlrun.datastore.targets import ParquetTarget
# MLRun utils
from mlrun.utils.helpers import create_class
# Feature selection strategies
from sklearn.feature_selection import SelectFromModel, SelectKBest
# Scale feature scoresgit st
from sklearn.preprocessing import MinMaxScaler
# SKLearn estimators list
from sklearn.utils import all_estimators

DEFAULT_STAT_FILTERS = ["f_classif", "mutual_info_classif", "chi2", "f_regression"]
DEFAULT_MODEL_FILTERS = {
    "LinearSVC": "LinearSVC",
    "LogisticRegression": "LogisticRegression",
    "ExtraTreesClassifier": "ExtraTreesClassifier",
}


def _clear_current_figure():
    """
    Clear matplotlib current figure.
    """
    plt.cla()
    plt.clf()
    plt.close()


def show_values_on_bars(axs, h_v="v", space=0.4):
    def _show_on_single_plot(ax_):
        if h_v == "v":
            for p in ax_.patches:
                _x = p.get_x() + p.get_width() / 2
                _y = p.get_y() + p.get_height()
                value = int(p.get_height())
                ax_.text(_x, _y, value, ha="center")
        elif h_v == "h":
            for p in ax_.patches:
                _x = p.get_x() + p.get_width() + float(space)
                _y = p.get_y() + p.get_height()
                value = int(p.get_width())
                ax_.text(_x, _y, value, ha="left")

    if isinstance(axs, np.ndarray):
        for idx, ax in np.ndenumerate(axs):
            _show_on_single_plot(ax)
    else:
        _show_on_single_plot(axs)


def plot_stat(context, stat_name, stat_df):
    _clear_current_figure()

    # Add chart
    ax = plt.axes()
    stat_chart = sns.barplot(
        x=stat_name,
        y="index",
        data=stat_df.sort_values(stat_name, ascending=False).reset_index(),
        ax=ax,
    )
    plt.tight_layout()

    for p in stat_chart.patches:
        width = p.get_width()
        plt.text(
            5 + p.get_width(),
            p.get_y() + 0.55 * p.get_height(),
            "{:1.2f}".format(width),
            ha="center",
            va="center",
        )

    context.log_artifact(
        PlotArtifact(f"{stat_name}", body=plt.gcf()),
        local_path=os.path.join("plots", "feature_selection", f"{stat_name}.html"),
    )
    _clear_current_figure()


def feature_selection(
    context,
    df_artifact,
    k: int = 5,
    min_votes: float = 0.5,
    label_column: str = None,
    stat_filters: list = None,
    model_filters: dict = None,
    max_scaled_scores: bool = True,
    sample_ratio: float = None,
    output_vector_name: float = None,
    ignore_type_errors: bool = False,
    is_feature_vector: bool = False,
):
    """
    Applies selected feature selection statistical functions or models on our 'df_artifact'.

    Each statistical function or model will vote for it's best K selected features.
    If a feature has >= 'min_votes' votes, it will be selected.

    :param context:             the function context.
    :param df_artifact:         dataframe to pass as input.
    :param k:                   number of top features to select from each statistical
                                function or model.
    :param min_votes:           minimal number of votes (from a model or by statistical
                                function) needed for a feature to be selected.
                                Can be specified by percentage of votes or absolute
                                number of votes.
    :param label_column:        ground-truth (y) labels.
    :param stat_filters:        statistical functions to apply to the features
                                (from sklearn.feature_selection).
    :param model_filters:       models to use for feature evaluation, can be specified by
                                model name (ex. LinearSVC), formalized json (contains 'CLASS',
                                'FIT', 'META') or a path to such json file.
    :param max_scaled_scores:   produce feature scores table scaled with max_scaler.
    :param sample_ratio:        percentage of the dataset the user whishes to compute the feature selection process on.
    :param output_vector_name:  creates a new feature vector containing only the identifies features.
    :param ignore_type_errors:  skips datatypes that are neither float nor int within the feature vector.
    :param is_feature_vector:   bool stating if the data is passed as a feature vector.
    """
    stat_filters = stat_filters or DEFAULT_STAT_FILTERS
    model_filters = model_filters or DEFAULT_MODEL_FILTERS
    # Check if df.meta is valid, if it is, look for a feature vector
    store_uri_prefix, _ = mlrun.datastore.parse_store_uri(df_artifact.artifact_url)
    is_feature_vector = mlrun.utils.StorePrefix.FeatureVector == store_uri_prefix

    # Look inside meta.spec.label_feature to identify the label_column if the user did not specify it
    if label_column is None:
        if is_feature_vector:
            label_column = df_artifact.meta.spec.label_feature.split(".")[1]
        else:
            raise ValueError("No label_column was given, please add a label_column.")

    # Use the feature vector as dataframe
    df = df_artifact.as_df()

    # Ensure k is not bigger than the total number of features
    if k > df.shape[1]:
        raise ValueError(
            f"K cannot be bigger than the total number of features ({df.shape[1]}). Please choose a smaller K."
        )
    elif k < 1:
        raise ValueError("K cannot be smaller than 1. Please choose a bigger K.")

    # Create a sample dataframe of the original feature vector
    if sample_ratio:
        df = (
            df.groupby(label_column)
            .apply(lambda x: x.sample(frac=sample_ratio))
            .reset_index(drop=True)
        )
        df = df.dropna()

    # Set feature vector and labels
    y = df.pop(label_column)
    X = df

    if np.object_ in list(X.dtypes) and ignore_type_errors is False:
        raise ValueError(
            f"{df.select_dtypes(include=['object']).columns.tolist()} are neither float or int."
        )

    # Create selected statistical estimators
    stat_functions_list = {
        stat_name: SelectKBest(
            score_func=create_class(f"sklearn.feature_selection.{stat_name}"), k=k
        )
        for stat_name in stat_filters
    }
    requires_abs = ["chi2"]

    # Run statistic filters
    selected_features_agg = {}
    stats_df = pd.DataFrame(index=X.columns).dropna()

    for stat_name, stat_func in stat_functions_list.items():
        try:
            params = (X, y) if stat_name in requires_abs else (abs(X), y)
            stat = stat_func.fit(*params)

            # Collect stat function results
            stat_df = pd.DataFrame(
                index=X.columns, columns=[stat_name], data=stat.scores_
            )
            plot_stat(context, stat_name, stat_df)
            stats_df = stats_df.join(stat_df)

            # Select K Best features
            selected_features = X.columns[stat_func.get_support()]
            selected_features_agg[stat_name] = selected_features

        except Exception as e:
            context.logger.info(f"Couldn't calculate {stat_name} because of: {e}")

    # Create models from class name / json file / json params
    all_sklearn_estimators = dict(all_estimators()) if len(model_filters) > 0 else {}
    selected_models = {}
    for model_name, model in model_filters.items():
        if ".json" in model:
            current_model = json.load(open(model, "r"))
            classifier_class = create_class(current_model["META"]["class"])
            selected_models[model_name] = classifier_class(**current_model["CLASS"])
        elif model in all_sklearn_estimators:
            selected_models[model_name] = all_sklearn_estimators[model_name]()

        else:
            try:
                current_model = json.loads(model)
                classifier_class = create_class(current_model["META"]["class"])
                selected_models[model_name] = classifier_class(**current_model["CLASS"])
            except Exception as e:
                context.logger.info(f"unable to load {model} because of: {e}")

    # Run model filters
    models_df = pd.DataFrame(index=X.columns)
    for model_name, model in selected_models.items():

        if model_name == "LogisticRegression":
            model.set_params(solver="liblinear")

        # Train model and get feature importance
        select_from_model = SelectFromModel(model).fit(X, y)
        feature_idx = select_from_model.get_support()
        feature_names = X.columns[feature_idx]
        selected_features_agg[model_name] = feature_names.tolist()

        # Collect model feature importance
        if hasattr(select_from_model.estimator_, "coef_"):
            stat_df = select_from_model.estimator_.coef_
        elif hasattr(select_from_model.estimator_, "feature_importances_"):
            stat_df = select_from_model.estimator_.feature_importances_

        stat_df = pd.DataFrame(index=X.columns, columns=[model_name], data=stat_df[0])
        models_df = models_df.join(stat_df)

        plot_stat(context, model_name, stat_df)

    # Create feature_scores DF with stat & model filters scores
    result_matrix_df = pd.concat([stats_df, models_df], axis=1, sort=False)
    context.log_dataset(
        key="feature_scores",
        df=result_matrix_df,
        local_path="feature_scores.parquet",
        format="parquet",
    )
    if max_scaled_scores:
        normalized_df = result_matrix_df.replace([np.inf, -np.inf], np.nan).values
        min_max_scaler = MinMaxScaler()
        normalized_df = min_max_scaler.fit_transform(normalized_df)
        normalized_df = pd.DataFrame(
            data=normalized_df,
            columns=result_matrix_df.columns,
            index=result_matrix_df.index,
        )
        context.log_dataset(
            key="max_scaled_scores_feature_scores",
            df=normalized_df,
            local_path="max_scaled_scores_feature_scores.parquet",
            format="parquet",
        )

    # Create feature count DataFrame
    for test_name in selected_features_agg:
        result_matrix_df[test_name] = [
            1 if x in selected_features_agg[test_name] else 0 for x in X.columns
        ]
    result_matrix_df.loc[:, "num_votes"] = result_matrix_df.sum(axis=1)
    context.log_dataset(
        key="selected_features_count",
        df=result_matrix_df,
        local_path="selected_features_count.parquet",
        format="parquet",
    )

    # How many votes are needed for a feature to be selected?
    if isinstance(min_votes, int):
        votes_needed = min_votes
    else:
        num_filters = len(stat_filters) + len(model_filters)
        votes_needed = int(np.floor(num_filters * max(min(min_votes, 1), 0)))
    context.logger.info(f"votes needed to be selected: {votes_needed}")

    # Create final feature dataframe
    selected_features = result_matrix_df[
        result_matrix_df.num_votes >= votes_needed
    ].index.tolist()
    good_feature_df = df.loc[:, selected_features]
    final_df = pd.concat([good_feature_df, y], axis=1)
    context.log_dataset(
        key="selected_features",
        df=final_df,
        local_path="selected_features.parquet",
        format="parquet",
    )

    # Creating a new feature vector containing only the identified top features
    if is_feature_vector and df_artifact.meta.spec.features and output_vector_name:
        # Selecting the top K features from our top feature dataframe
        selected_features = result_matrix_df.head(k).index

        # Match the selected feature names to the FS Feature annotations
        matched_selections = [
            feature
            for feature in list(df_artifact.meta.spec.features)
            for selected in list(selected_features)
            if feature.endswith(selected)
        ]

        # Defining our new feature vector
        top_features_fv = fs.FeatureVector(
            output_vector_name,
            matched_selections,
            label_feature="labels.label",
            description="feature vector composed strictly of our top features",
        )

        # Saving
        top_features_fv.save()
        fs.get_offline_features(top_features_fv, target=ParquetTarget())

        # Logging our new feature vector URI
        context.log_result("top_features_vector", top_features_fv.uri)
 + functionSourceCode: # Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json

import mlrun
import mlrun.datastore
import mlrun.feature_store as fs
import mlrun.utils
import numpy as np
import pandas as pd
import plotly.express as px
from mlrun.artifacts import PlotlyArtifact
from mlrun.datastore.targets import ParquetTarget
# MLRun utils
from mlrun.utils.helpers import create_class
# Feature selection strategies
from sklearn.feature_selection import SelectFromModel, SelectKBest
# Scale feature scoresgit st
from sklearn.preprocessing import MinMaxScaler
# SKLearn estimators list
from sklearn.utils import all_estimators

DEFAULT_STAT_FILTERS = ["f_classif", "mutual_info_classif", "chi2", "f_regression"]
DEFAULT_MODEL_FILTERS = {
    "LinearSVC": "LinearSVC",
    "LogisticRegression": "LogisticRegression",
    "ExtraTreesClassifier": "ExtraTreesClassifier",
}


def show_values_on_bars(axs, h_v="v", space=0.4):
    def _show_on_single_plot(ax_):
        if h_v == "v":
            for p in ax_.patches:
                _x = p.get_x() + p.get_width() / 2
                _y = p.get_y() + p.get_height()
                value = int(p.get_height())
                ax_.text(_x, _y, value, ha="center")
        elif h_v == "h":
            for p in ax_.patches:
                _x = p.get_x() + p.get_width() + float(space)
                _y = p.get_y() + p.get_height()
                value = int(p.get_width())
                ax_.text(_x, _y, value, ha="left")

    if isinstance(axs, np.ndarray):
        for idx, ax in np.ndenumerate(axs):
            _show_on_single_plot(ax)
    else:
        _show_on_single_plot(axs)


def plot_stat(context, stat_name, stat_df):
    sorted_df = stat_df.sort_values(stat_name)
    fig = px.bar(
        data_frame=sorted_df,
        x=stat_name,
        y=sorted_df.index,
        title=f"{stat_name} feature scores",
        color=stat_name,
    )
    context.log_artifact(
        item=PlotlyArtifact(key=stat_name, figure=fig),
        local_path=f"{stat_name}.html",
    )


def feature_selection(
    context,
    df_artifact,
    k: int = 5,
    min_votes: float = 0.5,
    label_column: str = None,
    stat_filters: list = None,
    model_filters: dict = None,
    max_scaled_scores: bool = True,
    sample_ratio: float = None,
    output_vector_name: float = None,
    ignore_type_errors: bool = False,
):
    """
    Applies selected feature selection statistical functions or models on our 'df_artifact'.

    Each statistical function or model will vote for it's best K selected features.
    If a feature has >= 'min_votes' votes, it will be selected.

    :param context:             the function context.
    :param df_artifact:         dataframe to pass as input.
    :param k:                   number of top features to select from each statistical
                                function or model.
    :param min_votes:           minimal number of votes (from a model or by statistical
                                function) needed for a feature to be selected.
                                Can be specified by percentage of votes or absolute
                                number of votes.
    :param label_column:        ground-truth (y) labels.
    :param stat_filters:        statistical functions to apply to the features
                                (from sklearn.feature_selection).
    :param model_filters:       models to use for feature evaluation, can be specified by
                                model name (ex. LinearSVC), formalized json (contains 'CLASS',
                                'FIT', 'META') or a path to such json file.
    :param max_scaled_scores:   produce feature scores table scaled with max_scaler.
    :param sample_ratio:        percentage of the dataset the user wishes to compute the feature selection process on.
    :param output_vector_name:  creates a new feature vector containing only the identifies features.
    :param ignore_type_errors:  skips datatypes that are neither float nor int within the feature vector.
    """
    stat_filters = stat_filters or DEFAULT_STAT_FILTERS
    model_filters = model_filters or DEFAULT_MODEL_FILTERS
    # Check if df.meta is valid, if it is, look for a feature vector
    store_uri_prefix, _ = mlrun.datastore.parse_store_uri(df_artifact.artifact_url)
    is_feature_vector = mlrun.utils.StorePrefix.FeatureVector == store_uri_prefix

    # Look inside meta.spec.label_feature to identify the label_column if the user did not specify it
    if label_column is None:
        if is_feature_vector:
            label_column = df_artifact.meta.spec.label_feature.split(".")[1]
        else:
            raise ValueError("No label_column was given, please add a label_column.")

    # Use the feature vector as dataframe
    df = df_artifact.as_df()

    # Ensure k is not bigger than the total number of features
    if k > df.shape[1]:
        raise ValueError(
            f"K cannot be bigger than the total number of features ({df.shape[1]}). Please choose a smaller K."
        )
    elif k < 1:
        raise ValueError("K cannot be smaller than 1. Please choose a bigger K.")

    # Create a sample dataframe of the original feature vector
    if sample_ratio:
        df = (
            df.groupby(label_column)
            .apply(lambda x: x.sample(frac=sample_ratio))
            .reset_index(drop=True)
        )
        df = df.dropna()

    # Set feature vector and labels
    y = df.pop(label_column)
    X = df

    if np.object_ in list(X.dtypes) and ignore_type_errors is False:
        raise ValueError(
            f"{df.select_dtypes(include=['object']).columns.tolist()} are neither float or int."
        )

    # Create selected statistical estimators
    stat_functions_list = {
        stat_name: SelectKBest(
            score_func=create_class(f"sklearn.feature_selection.{stat_name}"), k=k
        )
        for stat_name in stat_filters
    }
    requires_abs = ["chi2"]

    # Run statistic filters
    selected_features_agg = {}
    stats_df = pd.DataFrame(index=X.columns).dropna()

    for stat_name, stat_func in stat_functions_list.items():
        try:
            params = (X, y) if stat_name in requires_abs else (abs(X), y)
            stat = stat_func.fit(*params)

            # Collect stat function results
            stat_df = pd.DataFrame(
                index=X.columns, columns=[stat_name], data=stat.scores_
            )
            plot_stat(context, stat_name, stat_df)
            stats_df = stats_df.join(stat_df)

            # Select K Best features
            selected_features = X.columns[stat_func.get_support()]
            selected_features_agg[stat_name] = selected_features

        except Exception as e:
            context.logger.info(f"Couldn't calculate {stat_name} because of: {e}")

    # Create models from class name / json file / json params
    all_sklearn_estimators = dict(all_estimators()) if len(model_filters) > 0 else {}
    selected_models = {}
    for model_name, model in model_filters.items():
        if ".json" in model:
            current_model = json.load(open(model, "r"))
            classifier_class = create_class(current_model["META"]["class"])
            selected_models[model_name] = classifier_class(**current_model["CLASS"])
        elif model in all_sklearn_estimators:
            selected_models[model_name] = all_sklearn_estimators[model_name]()

        else:
            try:
                current_model = json.loads(model)
                classifier_class = create_class(current_model["META"]["class"])
                selected_models[model_name] = classifier_class(**current_model["CLASS"])
            except Exception as e:
                context.logger.info(f"unable to load {model} because of: {e}")

    # Run model filters
    models_df = pd.DataFrame(index=X.columns)
    for model_name, model in selected_models.items():

        if model_name == "LogisticRegression":
            model.set_params(solver="liblinear")

        # Train model and get feature importance
        select_from_model = SelectFromModel(model).fit(X, y)
        feature_idx = select_from_model.get_support()
        feature_names = X.columns[feature_idx]
        selected_features_agg[model_name] = feature_names.tolist()

        # Collect model feature importance
        if hasattr(select_from_model.estimator_, "coef_"):
            stat_df = select_from_model.estimator_.coef_
        elif hasattr(select_from_model.estimator_, "feature_importances_"):
            stat_df = select_from_model.estimator_.feature_importances_

        stat_df = pd.DataFrame(index=X.columns, columns=[model_name], data=stat_df[0])
        models_df = models_df.join(stat_df)

        plot_stat(context, model_name, stat_df)

    # Create feature_scores DF with stat & model filters scores
    result_matrix_df = pd.concat([stats_df, models_df], axis=1, sort=False)
    context.log_dataset(
        key="feature_scores",
        df=result_matrix_df,
        local_path="feature_scores.parquet",
        format="parquet",
    )
    if max_scaled_scores:
        normalized_df = result_matrix_df.replace([np.inf, -np.inf], np.nan).values
        min_max_scaler = MinMaxScaler()
        normalized_df = min_max_scaler.fit_transform(normalized_df)
        normalized_df = pd.DataFrame(
            data=normalized_df,
            columns=result_matrix_df.columns,
            index=result_matrix_df.index,
        )
        context.log_dataset(
            key="max_scaled_scores_feature_scores",
            df=normalized_df,
            local_path="max_scaled_scores_feature_scores.parquet",
            format="parquet",
        )

    # Create feature count DataFrame
    for test_name in selected_features_agg:
        result_matrix_df[test_name] = [
            1 if x in selected_features_agg[test_name] else 0 for x in X.columns
        ]
    result_matrix_df.loc[:, "num_votes"] = result_matrix_df.sum(axis=1)
    context.log_dataset(
        key="selected_features_count",
        df=result_matrix_df,
        local_path="selected_features_count.parquet",
        format="parquet",
    )

    # How many votes are needed for a feature to be selected?
    if isinstance(min_votes, int):
        votes_needed = min_votes
    else:
        num_filters = len(stat_filters) + len(model_filters)
        votes_needed = int(np.floor(num_filters * max(min(min_votes, 1), 0)))
    context.logger.info(f"votes needed to be selected: {votes_needed}")

    # Create final feature dataframe
    selected_features = result_matrix_df[
        result_matrix_df.num_votes >= votes_needed
    ].index.tolist()
    good_feature_df = df.loc[:, selected_features]
    final_df = pd.concat([good_feature_df, y], axis=1)
    context.log_dataset(
        key="selected_features",
        df=final_df,
        local_path="selected_features.parquet",
        format="parquet",
    )

    # Creating a new feature vector containing only the identified top features
    if is_feature_vector and df_artifact.meta.spec.features and output_vector_name:
        # Selecting the top K features from our top feature dataframe
        selected_features = result_matrix_df.head(k).index

        # Match the selected feature names to the FS Feature annotations
        matched_selections = [
            feature
            for feature in list(df_artifact.meta.spec.features)
            for selected in list(selected_features)
            if feature.endswith(selected)
        ]

        # Defining our new feature vector
        top_features_fv = fs.FeatureVector(
            output_vector_name,
            matched_selections,
            label_feature="labels.label",
            description="feature vector composed strictly of our top features",
        )

        # Saving
        top_features_fv.save()
        fs.get_offline_features(top_features_fv, target=ParquetTarget())

        # Logging our new feature vector URI
        context.log_result("top_features_vector", top_features_fv.uri)
 commands: [] code_origin: '' origin_filename: '' @@ -30,7 +30,7 @@ spec: - name: space default: 0.4 outputs: [] - lineno: 54 + lineno: 43 has_varargs: false has_kwargs: false plot_stat: @@ -41,7 +41,7 @@ spec: - name: stat_name - name: stat_df outputs: [] - lineno: 76 + lineno: 65 has_varargs: false has_kwargs: false feature_selection: @@ -88,7 +88,7 @@ spec: default: true - name: sample_ratio type: float - doc: percentage of the dataset the user whishes to compute the feature selection + doc: percentage of the dataset the user wishes to compute the feature selection process on. default: null - name: output_vector_name @@ -99,18 +99,13 @@ spec: type: bool doc: skips datatypes that are neither float nor int within the feature vector. default: false - - name: is_feature_vector - type: bool - doc: bool stating if the data is passed as a feature vector. - default: false outputs: [] - lineno: 106 + lineno: 80 has_varargs: false has_kwargs: false description: Select features through multiple Statistical and Model filters default_handler: feature_selection disable_auto_mount: false - clone_target_dir: '' env: [] priority_class_name: '' preemption_mode: prevent diff --git a/feature_selection/item.yaml b/feature_selection/item.yaml index 7e80a417b..ced618e00 100644 --- a/feature_selection/item.yaml +++ b/feature_selection/item.yaml @@ -12,7 +12,7 @@ labels: author: orz maintainers: [] marketplaceType: '' -mlrunVersion: 1.1.0 +mlrunVersion: 1.6.3 name: feature-selection platformVersion: 3.5.0 spec: @@ -22,4 +22,4 @@ spec: kind: job requirements: [] url: '' -version: 1.4.0 +version: 1.5.0 diff --git a/feature_selection/requirements.txt b/feature_selection/requirements.txt index 961f64ea4..a13fc8ce6 100644 --- a/feature_selection/requirements.txt +++ b/feature_selection/requirements.txt @@ -1,5 +1,3 @@ scikit-learn~=1.0.2 -matplotlib -seaborn scikit-plot - +plotly~=5.4.0 diff --git a/feature_selection/test_feature_selection.py b/feature_selection/test_feature_selection.py index 6289648f2..3032b3193 100644 --- a/feature_selection/test_feature_selection.py +++ b/feature_selection/test_feature_selection.py @@ -12,14 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from mlrun import code_to_function -from pathlib import Path +import os import shutil +from pathlib import Path -METRICS_PATH = 'data/metrics.pq' -ARTIFACTS_PATH = 'artifacts' -RUNS_PATH = 'runs' -SCHEDULES_PATH = 'schedules' +import mlrun + +METRICS_PATH = "data/metrics.pq" +ARTIFACTS_PATH = "artifacts" +RUNS_PATH = "runs" +SCHEDULES_PATH = "schedules" +PLOTS_PATH = os.path.abspath("./artifacts/feature-selection-feature-selection/0") + + +def _validate_paths(paths): + """ + Check if all the expected plot are saved + """ + base_folder = PLOTS_PATH + for path in paths: + full_path = os.path.join(base_folder, path) + if Path(full_path).is_file(): + print(f"{path} exist") + else: + raise FileNotFoundError(f"{path} not found!") + return True def _delete_outputs(paths): @@ -29,20 +46,23 @@ def _delete_outputs(paths): def test_run_local_feature_selection(): - fn = code_to_function(name='test_run_local_feature_selection', - filename="feature_selection.py", - handler="feature_selection", - kind="local", - ) - fn.spec.command = "feature_selection.py" + fn = mlrun.import_function("function.yaml") run = fn.run( params={ - 'k': 2, - 'min_votes': 0.3, - 'label_column': 'is_error', + "k": 2, + "min_votes": 0.3, + "label_column": "is_error", }, - inputs={'df_artifact': 'data/metrics.pq'}, - artifact_path='artifacts/', + inputs={"df_artifact": "data/metrics.pq"}, + artifact_path="artifacts/", + local=True, + ) + assert _validate_paths( + [ + "chi2.html", + "f_classif.html", + "f_regression.html", + "mutual_info_classif.html", + ] ) - assert run.artifact('feature_scores').get() and run.artifact('selected_features').get() _delete_outputs({ARTIFACTS_PATH, RUNS_PATH, SCHEDULES_PATH})