diff --git a/model_monitoring_batch/function.yaml b/model_monitoring_batch/function.yaml deleted file mode 100644 index 1009c4e72..000000000 --- a/model_monitoring_batch/function.yaml +++ /dev/null @@ -1,128 +0,0 @@ -kind: job -metadata: - name: model-monitoring-batch - tag: '' - hash: 0a07259a35b487d80169a53e31ce0c62dc288c2c - project: '' - categories: - - monitoring -spec: - command: '' - args: [] - image: mlrun/mlrun - env: [] - default_handler: handler - entry_points: - compute: - name: compute - doc: '' - parameters: - - name: self - default: '' - - name: capping - default: null - - name: kld_scaling - default: 0.0001 - outputs: - - default: '' - type: float - lineno: 64 - dict_to_histogram: - name: dict_to_histogram - doc: '' - parameters: - - name: self - default: '' - - name: histogram_dict - default: '' - outputs: - - default: '' - lineno: 112 - compute_metrics_over_df: - name: compute_metrics_over_df - doc: '' - parameters: - - name: self - default: '' - - name: base_histogram - default: '' - - name: latest_histogram - default: '' - outputs: - - default: '' - lineno: 129 - compute_drift_from_histograms: - name: compute_drift_from_histograms - doc: '' - parameters: - - name: self - default: '' - - name: feature_stats - default: '' - - name: current_stats - default: '' - outputs: - - default: '' - lineno: 140 - post_init: - name: post_init - doc: '' - parameters: - - name: self - default: '' - outputs: - - default: '' - lineno: 283 - run: - name: run - doc: '' - parameters: - - name: self - default: '' - outputs: - - default: '' - lineno: 295 - check_for_drift: - name: check_for_drift - doc: '' - parameters: - - name: self - default: '' - - name: drift_result - default: '' - - name: endpoint - default: '' - outputs: - - default: '' - lineno: 421 - get_last_created_dir: - name: get_last_created_dir - doc: '' - parameters: - - name: fs - default: '' - - name: endpoint_dir - default: '' - outputs: - - default: '' - lineno: 447 - handler: - name: handler - doc: '' - parameters: - - name: context - type: MLClientCtx - default: '' - outputs: - - default: '' - lineno: 453 - description: '' - build: - functionSourceCode: import json
import os
from collections import defaultdict
from dataclasses import dataclass
from typing import Optional, List, Dict

import numpy as np
import pandas as pd
import v3io
from mlrun import get_run_db
from mlrun import store_manager
from mlrun.data_types.infer import DFDataInfer, InferOptions
from mlrun.run import MLClientCtx
from mlrun.utils import logger, config
from mlrun.utils.model_monitoring import EndpointType, parse_model_endpoint_store_prefix
from mlrun.utils.v3io_clients import get_v3io_client, get_frames_client
from sklearn.preprocessing import KBinsDiscretizer

TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f%z"


@dataclass
class TotalVarianceDistance:
    """
    Provides a symmetric drift distance between two periods t and u
    Z - vector of random variables
    Pt - Probability distribution over time span t
    """

    distrib_t: np.ndarray
    distrib_u: np.ndarray

    def compute(self) -> float:
        return np.sum(np.abs(self.distrib_t - self.distrib_u)) / 2


@dataclass
class HellingerDistance:
    """
    Hellinger distance is an f divergence measure, similar to the Kullback-Leibler (KL) divergence.
    However, unlike KL Divergence the Hellinger divergence is symmetric and bounded over a probability space.
    """

    distrib_t: np.ndarray
    distrib_u: np.ndarray

    def compute(self) -> float:
        return np.sqrt(
            0.5 * ((np.sqrt(self.distrib_u) - np.sqrt(self.distrib_t)) ** 2).sum()
        )


@dataclass
class KullbackLeiblerDivergence:
    """
    KL Divergence (or relative entropy) is a measure of how one probability distribution differs from another.
    It is an asymmetric measure (thus it's not a metric) and it doesn't satisfy the triangle inequality.
    KL Divergence of 0, indicates two identical distributions.
    """

    distrib_t: np.ndarray
    distrib_u: np.ndarray

    def compute(self, capping=None, kld_scaling=0.0001) -> float:
        t_u = np.sum(
            np.where(
                self.distrib_t != 0,
                (self.distrib_t)
                * np.log(
                    self.distrib_t
                    / np.where(self.distrib_u != 0, self.distrib_u, kld_scaling)
                ),
                0,
            )
        )
        u_t = np.sum(
            np.where(
                self.distrib_u != 0,
                (self.distrib_u)
                * np.log(
                    self.distrib_u
                    / np.where(self.distrib_t != 0, self.distrib_t, kld_scaling)
                ),
                0,
            )
        )
        result = t_u + u_t
        if capping:
            return capping if result == float("inf") else result
        return result


class VirtualDrift:
    def __init__(
        self,
        prediction_col: Optional[str] = None,
        label_col: Optional[str] = None,
        feature_weights: Optional[List[float]] = None,
        inf_capping: Optional[float] = 10,
    ):
        self.prediction_col = prediction_col
        self.label_col = label_col
        self.feature_weights = feature_weights
        self.capping = inf_capping
        self.discretizers: Dict[str, KBinsDiscretizer] = {}
        self.metrics = {
            "tvd": TotalVarianceDistance,
            "hellinger": HellingerDistance,
            "kld": KullbackLeiblerDivergence,
        }

    def dict_to_histogram(self, histogram_dict):
        histograms = {}
        for feature, stats in histogram_dict.items():
            histograms[feature] = stats["hist"][0]

        # Get features value counts
        histograms = pd.concat(
            [
                pd.DataFrame(data=hist, columns=[feature])
                for feature, hist in histograms.items()
            ],
            axis=1,
        )
        # To Distribution
        histograms = histograms / histograms.sum()
        return histograms

    def compute_metrics_over_df(self, base_histogram, latest_histogram):
        drift_measures = {}
        for metric_name, metric in self.metrics.items():
            drift_measures[metric_name] = {
                feature: metric(
                    base_histogram.loc[:, feature], latest_histogram.loc[:, feature]
                ).compute()
                for feature in base_histogram
            }
        return drift_measures

    def compute_drift_from_histograms(self, feature_stats, current_stats):
        # Process histogram dictionaries to Dataframe of the histograms
        # with Feature histogram as cols
        base_histogram = self.dict_to_histogram(feature_stats)
        latest_histogram = self.dict_to_histogram(current_stats)

        # Verify all the features exist between datasets
        base_features = set(base_histogram.columns)
        latest_features = set(latest_histogram.columns)

        features_common = list(base_features.intersection(latest_features))
        feature_difference = list(base_features ^ latest_features)

        if not features_common:
            raise ValueError(
                f"No common features found: {base_features} <> {latest_features}"
            )

        base_histogram = base_histogram.drop(
            feature_difference, axis=1, errors="ignore"
        )
        latest_histogram = latest_histogram.drop(
            feature_difference, axis=1, errors="ignore"
        )

        # Compute the drift per feature
        features_drift_measures = self.compute_metrics_over_df(
            base_histogram.loc[:, features_common],
            latest_histogram.loc[:, features_common],
        )

        # Compute total drift measures for features
        for metric_name in self.metrics.keys():
            feature_values = list(features_drift_measures[metric_name].values())
            features_drift_measures[metric_name]["total_sum"] = np.sum(feature_values)
            features_drift_measures[metric_name]["total_mean"] = np.mean(feature_values)

            # Add weighted mean by given feature weights if provided
            if self.feature_weights:
                features_drift_measures[metric_name]["total_weighted_mean"] = np.dot(
                    feature_values, self.feature_weights
                )

        drift_result = defaultdict(dict)

        for feature in features_common:
            for metric, values in features_drift_measures.items():
                drift_result[feature][metric] = values[feature]
                sum = features_drift_measures[metric]["total_sum"]
                mean = features_drift_measures[metric]["total_mean"]
                drift_result[f"{metric}_sum"] = sum
                drift_result[f"{metric}_mean"] = mean
                if self.feature_weights:
                    metric_measure = features_drift_measures[metric]
                    weighted_mean = metric_measure["total_weighted_mean"]
                    drift_result[f"{metric}_weighted_mean"] = weighted_mean

        if self.label_col:
            label_drift_measures = self.compute_metrics_over_df(
                base_histogram.loc[:, self.label_col],
                latest_histogram.loc[:, self.label_col],
            )
            for metric, values in label_drift_measures.items():
                drift_result[self.label_col][metric] = values[metric]

        if self.prediction_col:
            prediction_drift_measures = self.compute_metrics_over_df(
                base_histogram.loc[:, self.prediction_col],
                latest_histogram.loc[:, self.prediction_col],
            )
            for metric, values in prediction_drift_measures.items():
                drift_result[self.prediction_col][metric] = values[metric]

        return drift_result


class BatchProcessor:
    def __init__(
        self,
        context: MLClientCtx,
        project: str,
        model_monitoring_access_key: str,
        v3io_access_key: str,
    ):
        self.context = context
        self.project = project

        self.v3io_access_key = v3io_access_key
        self.model_monitoring_access_key = (
                model_monitoring_access_key or v3io_access_key
        )

        self.virtual_drift = VirtualDrift(inf_capping=10)

        template = config.model_endpoint_monitoring.store_prefixes.default

        kv_path = template.format(project=self.project, kind="endpoints")
        _, self.kv_container, self.kv_path = parse_model_endpoint_store_prefix(kv_path)

        tsdb_path = template.format(project=project, kind="events")
        _, self.tsdb_container, self.tsdb_path = parse_model_endpoint_store_prefix(
            tsdb_path
        )

        stream_path = template.format(project=self.project, kind="log_stream")
        _, self.stream_container, self.stream_path = parse_model_endpoint_store_prefix(
            stream_path
        )

        self.parquet_path = config.model_endpoint_monitoring.store_prefixes.user_space.format(
            project=project, kind="parquet"
        )

        logger.info(
            "Initializing BatchProcessor",
            project=project,
            model_monitoring_access_key_initalized=bool(model_monitoring_access_key),
            v3io_access_key_initialized=bool(v3io_access_key),
            parquet_path=self.parquet_path,
            kv_container=self.kv_container,
            kv_path=self.kv_path,
            tsdb_container=self.tsdb_container,
            tsdb_path=self.tsdb_path,
            stream_container=self.stream_container,
            stream_path=self.stream_path,
        )

        self.default_possible_drift_threshold = (
            config.model_endpoint_monitoring.drift_thresholds.default.possible_drift
        )
        self.default_drift_detected_threshold = (
            config.model_endpoint_monitoring.drift_thresholds.default.drift_detected
        )

        self.db = get_run_db()
        self.v3io = get_v3io_client(access_key=self.v3io_access_key)
        self.frames = get_frames_client(
            address=config.v3io_framesd,
            container=self.tsdb_container,
            token=self.v3io_access_key,
        )
        self.exception = None

    def post_init(self):
        response = self.v3io.create_stream(
            container=self.stream_container,
            path=self.stream_path,
            shard_count=1,
            raise_for_status=v3io.dataplane.RaiseForStatus.never,
            access_key=self.v3io_access_key,
        )

        if not (response.status_code == 400 and "ResourceInUse" in str(response.body)):
            response.raise_for_status([409, 204, 403])

    def run(self):

        try:
            endpoints = self.db.list_model_endpoints(self.project)
        except Exception as e:
            logger.error("Failed to list endpoints", exc=e)
            return

        active_endpoints = set()
        for endpoint in endpoints.endpoints:
            if endpoint.spec.active:
                active_endpoints.add(endpoint.metadata.uid)

        store, sub = store_manager.get_or_create_store(self.parquet_path)
        prefix = self.parquet_path.replace(sub, "")
        fs = store.get_filesystem(silent=False)

        if not fs.exists(sub):
            logger.warn(
                f"{sub} does not exist"
            )
            return

        for endpoint_dir in fs.ls(sub):
            endpoint_id = endpoint_dir["name"].split("=")[-1]
            if endpoint_id not in active_endpoints:
                continue

            try:
                last_year = self.get_last_created_dir(fs, endpoint_dir)
                last_month = self.get_last_created_dir(fs, last_year)
                last_day = self.get_last_created_dir(fs, last_month)
                last_hour = self.get_last_created_dir(fs, last_day)

                full_path = f"{prefix}{last_hour['name']}"

                logger.info(f"Now processing {full_path}")

                endpoint = self.db.get_model_endpoint(
                    project=self.project, endpoint_id=endpoint_id
                )

                if endpoint.status.endpoint_type == EndpointType.ROUTER:
                    # endpoint.status.feature_stats is None
                    logger.info(f"{endpoint_id} is router skipping")
                    continue

                df = pd.read_parquet(full_path)
                timestamp = df["timestamp"].iloc[-1]

                named_features_df = list(df["named_features"])
                named_features_df = pd.DataFrame(named_features_df)

                current_stats = DFDataInfer.get_stats(
                    df=named_features_df, options=InferOptions.Histogram
                )

                drift_result = self.virtual_drift.compute_drift_from_histograms(
                    feature_stats=endpoint.status.feature_stats,
                    current_stats=current_stats,
                )

                logger.info("Drift result", drift_result=drift_result)

                drift_status, drift_measure = self.check_for_drift(
                    drift_result=drift_result, endpoint=endpoint
                )

                logger.info(
                    "Drift status",
                    endpoint_id=endpoint_id,
                    drift_status=drift_status,
                    drift_measure=drift_measure,
                )

                if drift_status == "POSSIBLE_DRIFT" or drift_status == "DRIFT_DETECTED":
                    self.v3io.stream.put_records(
                        container=self.stream_container,
                        stream_path=self.stream_path,
                        records=[
                            {
                                "data": json.dumps(
                                    {
                                        "endpoint_id": endpoint_id,
                                        "drift_status": drift_status,
                                        "drift_measure": drift_measure,
                                        "drift_per_feature": {**drift_result},
                                    }
                                )
                            }
                        ],
                    )

                self.v3io.kv.update(
                    container=self.kv_container,
                    table_path=self.kv_path,
                    key=endpoint_id,
                    attributes={
                        "current_stats": json.dumps(current_stats),
                        "drift_measures": json.dumps(drift_result),
                        "drift_status": drift_status,
                    },
                )

                tsdb_drift_measures = {
                    "endpoint_id": endpoint_id,
                    "timestamp": pd.to_datetime(timestamp, format=TIME_FORMAT),
                    "record_type": "drift_measures",
                    "tvd_mean": drift_result["tvd_mean"],
                    "kld_mean": drift_result["kld_mean"],
                    "hellinger_mean": drift_result["hellinger_mean"],
                }

                self.frames.write(
                    backend="tsdb",
                    table=self.tsdb_path,
                    dfs=pd.DataFrame.from_dict([tsdb_drift_measures]),
                    index_cols=["timestamp", "endpoint_id", "record_type"],
                )

                logger.info(f"Done updating drift measures {full_path}")

            except Exception as e:
                logger.error(f"Exception for endpoint {endpoint_id}")
                self.exception = e

    def check_for_drift(self, drift_result, endpoint):
        tvd_mean = drift_result.get("tvd_mean")
        hellinger_mean = drift_result.get("hellinger_mean")

        drift_mean = 0.0
        if tvd_mean and hellinger_mean:
            drift_mean = (tvd_mean + hellinger_mean) / 2

        monitor_configuration = endpoint.spec.monitor_configuration or {}

        possible_drift = monitor_configuration.get(
            "possible_drift", self.default_possible_drift_threshold
        )
        drift_detected = monitor_configuration.get(
            "possible_drift", self.default_drift_detected_threshold
        )

        drift_status = "NO_DRIFT"
        if drift_mean >= drift_detected:
            drift_status = "DRIFT_DETECTED"
        elif drift_mean >= possible_drift:
            drift_status = "POSSIBLE_DRIFT"

        return drift_status, drift_mean

    @staticmethod
    def get_last_created_dir(fs, endpoint_dir):
        dirs = fs.ls(endpoint_dir["name"])
        last_dir = sorted(dirs, key=lambda k: k["name"].split("=")[-1])[-1]
        return last_dir


def handler(context: MLClientCtx):
    batch_processor = BatchProcessor(
        context=context,
        project=context.project,
        model_monitoring_access_key=os.environ.get("MODEL_MONITORING_ACCESS_KEY"),
        v3io_access_key=os.environ.get("V3IO_ACCESS_KEY"),
    )
    batch_processor.post_init()
    batch_processor.run()
    if batch_processor.exception:
        raise batch_processor.exception
 - commands: [] - code_origin: https://github.com/katyakats/functions.git#c5315901af56bce3f113041ad540591ee86ac00a:/Users/katyak/work/functions/model_monitoring_batch/model_monitoring_batch.py - origin_filename: /Users/katyak/work/functions/model_monitoring_batch/model_monitoring_batch.py - disable_auto_mount: false - priority_class_name: '' - affinity: null -verbose: false diff --git a/model_monitoring_batch/item.yaml b/model_monitoring_batch/item.yaml deleted file mode 100644 index a417ef149..000000000 --- a/model_monitoring_batch/item.yaml +++ /dev/null @@ -1,23 +0,0 @@ -apiVersion: v1 -categories: -- monitoring -description: '' -doc: '' -example: model_monitoring_batch.ipynb -generationDate: 2022-08-28:17-25 -hidden: false -icon: '' -labels: {} -maintainers: [] -marketplaceType: '' -mlrunVersion: 1.1.0 -name: model-monitoring-batch -platformVersion: 3.5.0 -spec: - filename: model_monitoring_batch.py - handler: handler - image: mlrun/mlrun - kind: job - requirements: [] -url: '' -version: 1.1.0 diff --git a/model_monitoring_batch/model_monitoring_batch.ipynb b/model_monitoring_batch/model_monitoring_batch.ipynb deleted file mode 100644 index f6e470072..000000000 --- a/model_monitoring_batch/model_monitoring_batch.ipynb +++ /dev/null @@ -1,91 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "source": [ - "# Model Monitoring\n", - "## Export function yaml" - ], - "metadata": { - "collapsed": false - } - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "from mlrun import code_to_function\n", - "from mlrun.runtimes import RemoteRuntime\n", - "\n", - "fn: RemoteRuntime = code_to_function(\n", - " name=\"model-monitoring-batch\",\n", - " kind=\"job\",\n", - " image=\"mlrun/mlrun\",\n", - " filename=\"model_monitoring_batch.py\",\n", - " handler=\"handler\",\n", - ")\n", - "\n", - "fn.export(\"model_monitoring_batch.yaml\")" - ] - }, - { - "cell_type": "markdown", - "source": [ - "## Deploy Batch Processing" - ], - "metadata": { - "collapsed": false - } - }, - { - "cell_type": "code", - "execution_count": null, - "outputs": [], - "source": [ - "from mlrun import import_function\n", - "from mlrun.platforms import mount_v3io\n", - "from mlrun.runtimes import KubejobRuntime\n", - "\n", - "\n", - "# Set project name\n", - "project = \"\"\n", - "\n", - "fn: KubejobRuntime = import_function(\"hub://model_monitoring_batch\")\n", - "fn.metadata.project = project\n", - "fn.apply(mount_v3io())\n", - "fn.run(name='model-monitoring-batch', schedule=\"0 */1 * * *\", params={\"project\": project})" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 2 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython2", - "version": "2.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} \ No newline at end of file diff --git a/model_monitoring_batch/model_monitoring_batch.py b/model_monitoring_batch/model_monitoring_batch.py deleted file mode 100644 index 66c19de35..000000000 --- a/model_monitoring_batch/model_monitoring_batch.py +++ /dev/null @@ -1,477 +0,0 @@ -# Copyright 2019 Iguazio -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import json -import os -from collections import defaultdict -from dataclasses import dataclass -from typing import Optional, List, Dict - -import numpy as np -import pandas as pd -import v3io -from mlrun import get_run_db -from mlrun import store_manager -from mlrun.data_types.infer import DFDataInfer, InferOptions -from mlrun.run import MLClientCtx -from mlrun.utils import logger, config -from mlrun.utils.model_monitoring import EndpointType, parse_model_endpoint_store_prefix -from mlrun.utils.v3io_clients import get_v3io_client, get_frames_client -from sklearn.preprocessing import KBinsDiscretizer - -TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f%z" - - -@dataclass -class TotalVarianceDistance: - """ - Provides a symmetric drift distance between two periods t and u - Z - vector of random variables - Pt - Probability distribution over time span t - """ - - distrib_t: np.ndarray - distrib_u: np.ndarray - - def compute(self) -> float: - return np.sum(np.abs(self.distrib_t - self.distrib_u)) / 2 - - -@dataclass -class HellingerDistance: - """ - Hellinger distance is an f divergence measure, similar to the Kullback-Leibler (KL) divergence. - However, unlike KL Divergence the Hellinger divergence is symmetric and bounded over a probability space. - """ - - distrib_t: np.ndarray - distrib_u: np.ndarray - - def compute(self) -> float: - return np.sqrt( - 0.5 * ((np.sqrt(self.distrib_u) - np.sqrt(self.distrib_t)) ** 2).sum() - ) - - -@dataclass -class KullbackLeiblerDivergence: - """ - KL Divergence (or relative entropy) is a measure of how one probability distribution differs from another. - It is an asymmetric measure (thus it's not a metric) and it doesn't satisfy the triangle inequality. - KL Divergence of 0, indicates two identical distributions. - """ - - distrib_t: np.ndarray - distrib_u: np.ndarray - - def compute(self, capping=None, kld_scaling=0.0001) -> float: - t_u = np.sum( - np.where( - self.distrib_t != 0, - (self.distrib_t) - * np.log( - self.distrib_t - / np.where(self.distrib_u != 0, self.distrib_u, kld_scaling) - ), - 0, - ) - ) - u_t = np.sum( - np.where( - self.distrib_u != 0, - (self.distrib_u) - * np.log( - self.distrib_u - / np.where(self.distrib_t != 0, self.distrib_t, kld_scaling) - ), - 0, - ) - ) - result = t_u + u_t - if capping: - return capping if result == float("inf") else result - return result - - -class VirtualDrift: - def __init__( - self, - prediction_col: Optional[str] = None, - label_col: Optional[str] = None, - feature_weights: Optional[List[float]] = None, - inf_capping: Optional[float] = 10, - ): - self.prediction_col = prediction_col - self.label_col = label_col - self.feature_weights = feature_weights - self.capping = inf_capping - self.discretizers: Dict[str, KBinsDiscretizer] = {} - self.metrics = { - "tvd": TotalVarianceDistance, - "hellinger": HellingerDistance, - "kld": KullbackLeiblerDivergence, - } - - def dict_to_histogram(self, histogram_dict): - histograms = {} - for feature, stats in histogram_dict.items(): - histograms[feature] = stats["hist"][0] - - # Get features value counts - histograms = pd.concat( - [ - pd.DataFrame(data=hist, columns=[feature]) - for feature, hist in histograms.items() - ], - axis=1, - ) - # To Distribution - histograms = histograms / histograms.sum() - return histograms - - def compute_metrics_over_df(self, base_histogram, latest_histogram): - drift_measures = {} - for metric_name, metric in self.metrics.items(): - drift_measures[metric_name] = { - feature: metric( - base_histogram.loc[:, feature], latest_histogram.loc[:, feature] - ).compute() - for feature in base_histogram - } - return drift_measures - - def compute_drift_from_histograms(self, feature_stats, current_stats): - # Process histogram dictionaries to Dataframe of the histograms - # with Feature histogram as cols - base_histogram = self.dict_to_histogram(feature_stats) - latest_histogram = self.dict_to_histogram(current_stats) - - # Verify all the features exist between datasets - base_features = set(base_histogram.columns) - latest_features = set(latest_histogram.columns) - - features_common = list(base_features.intersection(latest_features)) - feature_difference = list(base_features ^ latest_features) - - if not features_common: - raise ValueError( - f"No common features found: {base_features} <> {latest_features}" - ) - - base_histogram = base_histogram.drop( - feature_difference, axis=1, errors="ignore" - ) - latest_histogram = latest_histogram.drop( - feature_difference, axis=1, errors="ignore" - ) - - # Compute the drift per feature - features_drift_measures = self.compute_metrics_over_df( - base_histogram.loc[:, features_common], - latest_histogram.loc[:, features_common], - ) - - # Compute total drift measures for features - for metric_name in self.metrics.keys(): - feature_values = list(features_drift_measures[metric_name].values()) - features_drift_measures[metric_name]["total_sum"] = np.sum(feature_values) - features_drift_measures[metric_name]["total_mean"] = np.mean(feature_values) - - # Add weighted mean by given feature weights if provided - if self.feature_weights: - features_drift_measures[metric_name]["total_weighted_mean"] = np.dot( - feature_values, self.feature_weights - ) - - drift_result = defaultdict(dict) - - for feature in features_common: - for metric, values in features_drift_measures.items(): - drift_result[feature][metric] = values[feature] - sum = features_drift_measures[metric]["total_sum"] - mean = features_drift_measures[metric]["total_mean"] - drift_result[f"{metric}_sum"] = sum - drift_result[f"{metric}_mean"] = mean - if self.feature_weights: - metric_measure = features_drift_measures[metric] - weighted_mean = metric_measure["total_weighted_mean"] - drift_result[f"{metric}_weighted_mean"] = weighted_mean - - if self.label_col: - label_drift_measures = self.compute_metrics_over_df( - base_histogram.loc[:, self.label_col], - latest_histogram.loc[:, self.label_col], - ) - for metric, values in label_drift_measures.items(): - drift_result[self.label_col][metric] = values[metric] - - if self.prediction_col: - prediction_drift_measures = self.compute_metrics_over_df( - base_histogram.loc[:, self.prediction_col], - latest_histogram.loc[:, self.prediction_col], - ) - for metric, values in prediction_drift_measures.items(): - drift_result[self.prediction_col][metric] = values[metric] - - return drift_result - - -class BatchProcessor: - def __init__( - self, - context: MLClientCtx, - project: str, - model_monitoring_access_key: str, - v3io_access_key: str, - ): - self.context = context - self.project = project - - self.v3io_access_key = v3io_access_key - self.model_monitoring_access_key = ( - model_monitoring_access_key or v3io_access_key - ) - - self.virtual_drift = VirtualDrift(inf_capping=10) - - template = config.model_endpoint_monitoring.store_prefixes.default - - kv_path = template.format(project=self.project, kind="endpoints") - _, self.kv_container, self.kv_path = parse_model_endpoint_store_prefix(kv_path) - - tsdb_path = template.format(project=project, kind="events") - _, self.tsdb_container, self.tsdb_path = parse_model_endpoint_store_prefix( - tsdb_path - ) - - stream_path = template.format(project=self.project, kind="log_stream") - _, self.stream_container, self.stream_path = parse_model_endpoint_store_prefix( - stream_path - ) - - self.parquet_path = config.model_endpoint_monitoring.store_prefixes.user_space.format( - project=project, kind="parquet" - ) - - logger.info( - "Initializing BatchProcessor", - project=project, - model_monitoring_access_key_initalized=bool(model_monitoring_access_key), - v3io_access_key_initialized=bool(v3io_access_key), - parquet_path=self.parquet_path, - kv_container=self.kv_container, - kv_path=self.kv_path, - tsdb_container=self.tsdb_container, - tsdb_path=self.tsdb_path, - stream_container=self.stream_container, - stream_path=self.stream_path, - ) - - self.default_possible_drift_threshold = ( - config.model_endpoint_monitoring.drift_thresholds.default.possible_drift - ) - self.default_drift_detected_threshold = ( - config.model_endpoint_monitoring.drift_thresholds.default.drift_detected - ) - - self.db = get_run_db() - self.v3io = get_v3io_client(access_key=self.v3io_access_key) - self.frames = get_frames_client( - address=config.v3io_framesd, - container=self.tsdb_container, - token=self.v3io_access_key, - ) - self.exception = None - - def post_init(self): - response = self.v3io.create_stream( - container=self.stream_container, - path=self.stream_path, - shard_count=1, - raise_for_status=v3io.dataplane.RaiseForStatus.never, - access_key=self.v3io_access_key, - ) - - if not (response.status_code == 400 and "ResourceInUse" in str(response.body)): - response.raise_for_status([409, 204, 403]) - - def run(self): - - try: - endpoints = self.db.list_model_endpoints(self.project) - except Exception as e: - logger.error("Failed to list endpoints", exc=e) - return - - active_endpoints = set() - for endpoint in endpoints.endpoints: - if endpoint.spec.active: - active_endpoints.add(endpoint.metadata.uid) - - store, sub = store_manager.get_or_create_store(self.parquet_path) - prefix = self.parquet_path.replace(sub, "") - fs = store.get_filesystem(silent=False) - - if not fs.exists(sub): - logger.warn( - f"{sub} does not exist" - ) - return - - for endpoint_dir in fs.ls(sub): - endpoint_id = endpoint_dir["name"].split("=")[-1] - if endpoint_id not in active_endpoints: - continue - - try: - last_year = self.get_last_created_dir(fs, endpoint_dir) - last_month = self.get_last_created_dir(fs, last_year) - last_day = self.get_last_created_dir(fs, last_month) - last_hour = self.get_last_created_dir(fs, last_day) - - full_path = f"{prefix}{last_hour['name']}" - - logger.info(f"Now processing {full_path}") - - endpoint = self.db.get_model_endpoint( - project=self.project, endpoint_id=endpoint_id - ) - - if endpoint.status.endpoint_type == EndpointType.ROUTER: - # endpoint.status.feature_stats is None - logger.info(f"{endpoint_id} is router skipping") - continue - - df = pd.read_parquet(full_path) - timestamp = df["timestamp"].iloc[-1] - - named_features_df = list(df["named_features"]) - named_features_df = pd.DataFrame(named_features_df) - - current_stats = DFDataInfer.get_stats( - df=named_features_df, options=InferOptions.Histogram - ) - - drift_result = self.virtual_drift.compute_drift_from_histograms( - feature_stats=endpoint.status.feature_stats, - current_stats=current_stats, - ) - - logger.info("Drift result", drift_result=drift_result) - - drift_status, drift_measure = self.check_for_drift( - drift_result=drift_result, endpoint=endpoint - ) - - logger.info( - "Drift status", - endpoint_id=endpoint_id, - drift_status=drift_status, - drift_measure=drift_measure, - ) - - if drift_status == "POSSIBLE_DRIFT" or drift_status == "DRIFT_DETECTED": - self.v3io.stream.put_records( - container=self.stream_container, - stream_path=self.stream_path, - records=[ - { - "data": json.dumps( - { - "endpoint_id": endpoint_id, - "drift_status": drift_status, - "drift_measure": drift_measure, - "drift_per_feature": {**drift_result}, - } - ) - } - ], - ) - - self.v3io.kv.update( - container=self.kv_container, - table_path=self.kv_path, - key=endpoint_id, - attributes={ - "current_stats": json.dumps(current_stats), - "drift_measures": json.dumps(drift_result), - "drift_status": drift_status, - }, - ) - - tsdb_drift_measures = { - "endpoint_id": endpoint_id, - "timestamp": pd.to_datetime(timestamp, format=TIME_FORMAT), - "record_type": "drift_measures", - "tvd_mean": drift_result["tvd_mean"], - "kld_mean": drift_result["kld_mean"], - "hellinger_mean": drift_result["hellinger_mean"], - } - - self.frames.write( - backend="tsdb", - table=self.tsdb_path, - dfs=pd.DataFrame.from_dict([tsdb_drift_measures]), - index_cols=["timestamp", "endpoint_id", "record_type"], - ) - - logger.info(f"Done updating drift measures {full_path}") - - except Exception as e: - logger.error(f"Exception for endpoint {endpoint_id}") - self.exception = e - - def check_for_drift(self, drift_result, endpoint): - tvd_mean = drift_result.get("tvd_mean") - hellinger_mean = drift_result.get("hellinger_mean") - - drift_mean = 0.0 - if tvd_mean and hellinger_mean: - drift_mean = (tvd_mean + hellinger_mean) / 2 - - monitor_configuration = endpoint.spec.monitor_configuration or {} - - possible_drift = monitor_configuration.get( - "possible_drift", self.default_possible_drift_threshold - ) - drift_detected = monitor_configuration.get( - "possible_drift", self.default_drift_detected_threshold - ) - - drift_status = "NO_DRIFT" - if drift_mean >= drift_detected: - drift_status = "DRIFT_DETECTED" - elif drift_mean >= possible_drift: - drift_status = "POSSIBLE_DRIFT" - - return drift_status, drift_mean - - @staticmethod - def get_last_created_dir(fs, endpoint_dir): - dirs = fs.ls(endpoint_dir["name"]) - last_dir = sorted(dirs, key=lambda k: k["name"].split("=")[-1])[-1] - return last_dir - - -def handler(context: MLClientCtx): - batch_processor = BatchProcessor( - context=context, - project=context.project, - model_monitoring_access_key=os.environ.get("MODEL_MONITORING_ACCESS_KEY"), - v3io_access_key=os.environ.get("V3IO_ACCESS_KEY"), - ) - batch_processor.post_init() - batch_processor.run() - if batch_processor.exception: - raise batch_processor.exception diff --git a/model_monitoring_batch/requirements.txt b/model_monitoring_batch/requirements.txt deleted file mode 100644 index 06d315b02..000000000 --- a/model_monitoring_batch/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -v3io -scikit-learn \ No newline at end of file diff --git a/validate_great_expectations/README.md b/validate_great_expectations/README.md deleted file mode 100644 index 5f113be58..000000000 --- a/validate_great_expectations/README.md +++ /dev/null @@ -1,53 +0,0 @@ -# Great Expectations Validation -![Great Expectations Logo](doc/great-expectations-logo-full-size.png) - -Run data validation via Great Expectations. Will validate a given dataset with a given set of expectations, run the validation, and log the output HTML data doc in MLRun. - -## Prerequisites - -See [1_set_expectations.ipynb](1_set_expectations.ipynb) for a full example. - -- Initialized a Great Expectations project -- Configured at least one Datasource i.e. `my_datasource` -- Created at least one Expectation Suite i.e. `my_suite` -- Created a Checkpoint i.e. `my_checkpoint` - -## Usage - -See [2_validate_expectations.ipynb](2_validate_expectations.ipynb) for a full example. - -```python -import mlrun - -fn = mlrun.import_function("hub://great_expectations") -run = fn.run( - inputs={"data": "https://s3.wasabisys.com/iguazio/data/iris/iris.data.raw.csv"}, - params={ - "expectation_suite_name": "test_suite", - "data_asset_name": "iris_dataset", - }, -) -``` - -## All Configuration -Inputs -```rst -:param data: Data to validate. Can be local or remote link. -``` - -Parameters -```rst -:param expectation_suite_name: Name of expectation suite to validate against. -:param data_asset_name: Name of dataset in Great Expectations. -:param datasource_name: Name of datasource to use for validation. -:param data_connector_name: Name of data connector to use for validation. -:param datasource_config: Full configuration for datasource. For use with custom - data sources other than the default pandas datasource. -:param batch_identifiers: Custom metadata for identifying particular batches of - data. For use when not using the default batch identifiers. -:param root_directory: Path to underlying Great Expectations project. Defaults to - MLRun project artifact path if not specified. -:param checkpoint_name: Name of checkpoint to use for validation. -:param checkpoint_config: Full configuration for checkpoint. For use with custome - checkpoint config other than the default. -``` \ No newline at end of file diff --git a/validate_great_expectations/doc/great-expectations-logo-full-size.png b/validate_great_expectations/doc/great-expectations-logo-full-size.png deleted file mode 100644 index 625fc92bc..000000000 Binary files a/validate_great_expectations/doc/great-expectations-logo-full-size.png and /dev/null differ diff --git a/validate_great_expectations/function.yaml b/validate_great_expectations/function.yaml deleted file mode 100644 index f3f1d3fce..000000000 --- a/validate_great_expectations/function.yaml +++ /dev/null @@ -1,170 +0,0 @@ -kind: job -metadata: - name: validate-great-expectations - tag: '' - hash: 82d0b647d443eb6e643d9dbfc8c0a650d74da018 - project: '' - labels: - author: nicks - framework: great-expectations - categories: - - data-validation - - data-analysis -spec: - command: '' - args: [] - image: '' - build: - functionSourceCode: import os
import shutil

import mlrun

from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult


def get_default_datasource_config(
    datasource_name: str, data_connector_name: str
) -> dict:
    """
    Convenience function to get the default pandas datasource config
    for use in validating expectations.

    :param datasource_name:     Name of datasource.
    :param data_connector_name: Name of data connector.

    :returns: Configuration for default datasource.
    """
    default_datasource_config = {
        "name": f"{datasource_name}",
        "class_name": "Datasource",
        "module_name": "great_expectations.datasource",
        "execution_engine": {
            "module_name": "great_expectations.execution_engine",
            "class_name": "PandasExecutionEngine",
        },
        "data_connectors": {
            f"{data_connector_name}": {
                "class_name": "RuntimeDataConnector",
                "module_name": "great_expectations.datasource.data_connector",
                "batch_identifiers": ["default_identifier_name"],
            },
        },
    }
    return default_datasource_config


def get_default_checkpoint_config(checkpoint_name: str) -> dict:
    """
    Convenience function to get the default checkpoint config for
    use in validating expectations.

    :param checkpoint_name: Name of checkpoint.

    :returns: Configuration for default checkpoint.
    """
    return {
        "name": checkpoint_name,
        "config_version": 1.0,
        "class_name": "SimpleCheckpoint",
        "run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
    }


def get_data_doc_path(checkpoint_result: CheckpointResult) -> str:
    """
    Convenience function to get the path of the output
    data doc from a checkpoint result.

    :param checkpoint_result: Great Expectations checkpoint result.

    :returns: Absolute path to new data doc.
    """
    result_id = checkpoint_result.list_validation_result_identifiers()[0]
    data_doc_path = checkpoint_result["run_results"][result_id]["actions_results"][
        "update_data_docs"
    ]["local_site"]
    data_doc_path = data_doc_path.replace("file://", "")
    return data_doc_path


def validate_expectations(
    context: mlrun.MLClientCtx,
    data: mlrun.DataItem,
    expectation_suite_name: str,
    data_asset_name: str,
    datasource_name: str = "pandas_datasource",
    data_connector_name: str = "default_runtime_data_connector_name",
    datasource_config: dict = None,
    batch_identifiers: dict = None,
    root_directory: str = None,
    checkpoint_name: str = None,
    checkpoint_config: dict = None,
) -> None:
    """
    Main function to validate an input dataset, datasource, data connector,
    and expectation suite.

    Runs the Great Expectation validation and logs
    whether the validation was a success as well as the output page
    of the data docs.

    :param context:                MLRun context.
    :param data:                   Data to validate. Can be local or remote link.
    :param expectation_suite_name: Name of expectation suite to validate against.
    :param data_asset_name:        Name of dataset in Great Expectations.
    :param datasource_name:        Name of datasource to use for validation.
    :param data_connector_name:    Name of data connector to use for validation.
    :param datasource_config:      Full configuration for datasource. For use with custom
                                   data sources other than the default pandas datasource.
    :param batch_identifiers:      Custom metadata for identifying particular batches of
                                   data. For use when not using the default batch identifiers.
    :param root_directory:         Path to underlying Great Expectations project. Defaults to
                                   MLRun project artifact path if not specified.
    :param checkpoint_name:        Name of checkpoint to use for validation.
    :param checkpoint_config:      Full configuration for checkpoint. For use with custome
                                   checkpoint config other than the default.
    """

    # Get data
    df = data.as_df()

    # Use default root directory for project if not specified
    root_directory = (
        root_directory
        if root_directory
        else f"/v3io/projects/{context.project}/great_expectations"
    )

    # Load great expectations context
    ge_context = BaseDataContext(
        project_config=DataContextConfig(
            store_backend_defaults=FilesystemStoreBackendDefaults(
                root_directory=root_directory
            )
        )
    )

    # Get expectation suite
    ge_context.get_expectation_suite(expectation_suite_name=expectation_suite_name)

    # Add default data source if not specified
    datasource_config = (
        datasource_config
        if datasource_config
        else get_default_datasource_config(datasource_name, data_connector_name)
    )
    ge_context.add_datasource(**datasource_config)

    # Get data batch
    batch_identifiers = (
        batch_identifiers
        if batch_identifiers
        else {"default_identifier_name": "default_identifier"}
    )
    batch_request = RuntimeBatchRequest(
        datasource_name=datasource_name,
        data_connector_name=data_connector_name,
        data_asset_name=data_asset_name,
        runtime_parameters={"batch_data": df},
        batch_identifiers=batch_identifiers,
    )

    # Get validator
    validator = ge_context.get_validator(
        batch_request=batch_request,
        expectation_suite_name=expectation_suite_name,
    )

    # Use default checkpoint name and config if not specified
    checkpoint_name = (
        checkpoint_name if checkpoint_name else f"{data_asset_name}_checkpoint"
    )
    checkpoint_config = (
        checkpoint_config
        if checkpoint_config
        else get_default_checkpoint_config(checkpoint_name)
    )

    # Add checkpoint
    ge_context.add_checkpoint(**checkpoint_config)

    # Run expectation suite on checkpoint
    checkpoint_result = ge_context.run_checkpoint(
        checkpoint_name=checkpoint_name,
        validations=[
            {
                "batch_request": batch_request,
                "expectation_suite_name": expectation_suite_name,
            }
        ],
    )

    # Log success
    context.log_result("validated", checkpoint_result["success"])

    # Log data doc
    data_doc_path = get_data_doc_path(checkpoint_result)
    context.log_artifact("validation_results", target_path=data_doc_path)
 - base_image: mlrun/mlrun - commands: - - python -m pip install great-expectations==0.15.41 - code_origin: https://github.com/igz-us-sales/functions.git#c7b44af35294494a531a014f3d02a28eff3f4105:/User/functions/validate_great_expectations/validate_great_expectations.py - origin_filename: /User/functions/validate_great_expectations/validate_great_expectations.py - entry_points: - get_default_datasource_config: - name: get_default_datasource_config - doc: 'Convenience function to get the default pandas datasource config - - for use in validating expectations.' - parameters: - - name: datasource_name - type: str - doc: Name of datasource. - default: '' - - name: data_connector_name - type: str - doc: Name of data connector. - default: '' - outputs: - - default: '' - doc: Configuration for default datasource. - type: dict - lineno: 15 - get_default_checkpoint_config: - name: get_default_checkpoint_config - doc: 'Convenience function to get the default checkpoint config for - - use in validating expectations.' - parameters: - - name: checkpoint_name - type: str - doc: Name of checkpoint. - default: '' - outputs: - - default: '' - doc: Configuration for default checkpoint. - type: dict - lineno: 46 - get_data_doc_path: - name: get_data_doc_path - doc: 'Convenience function to get the path of the output - - data doc from a checkpoint result.' - parameters: - - name: checkpoint_result - type: CheckpointResult - doc: Great Expectations checkpoint result. - default: '' - outputs: - - default: '' - doc: Absolute path to new data doc. - type: str - lineno: 63 - validate_expectations: - name: validate_expectations - doc: 'Main function to validate an input dataset, datasource, data connector, - - and expectation suite. - - - Runs the Great Expectation validation and logs - - whether the validation was a success as well as the output page - - of the data docs.' - parameters: - - name: context - type: MLClientCtx - doc: MLRun context. - default: '' - - name: data - type: DataItem - doc: Data to validate. Can be local or remote link. - default: '' - - name: expectation_suite_name - type: str - doc: Name of expectation suite to validate against. - default: '' - - name: data_asset_name - type: str - doc: Name of dataset in Great Expectations. - default: '' - - name: datasource_name - type: str - doc: Name of datasource to use for validation. - default: pandas_datasource - - name: data_connector_name - type: str - doc: Name of data connector to use for validation. - default: default_runtime_data_connector_name - - name: datasource_config - type: dict - doc: Full configuration for datasource. For use with custom data sources other - than the default pandas datasource. - default: null - - name: batch_identifiers - type: dict - doc: Custom metadata for identifying particular batches of data. For use when - not using the default batch identifiers. - default: null - - name: root_directory - type: str - doc: Path to underlying Great Expectations project. Defaults to MLRun project - artifact path if not specified. - default: null - - name: checkpoint_name - type: str - doc: Name of checkpoint to use for validation. - default: null - - name: checkpoint_config - type: dict - doc: Full configuration for checkpoint. For use with custome checkpoint config - other than the default. - default: null - outputs: - - default: '' - lineno: 80 - description: Validate a dataset using Great Expectations - default_handler: validate_expectations - disable_auto_mount: false - env: [] - resources: - requests: - memory: 1Mi - cpu: 25m - limits: - memory: 20Gi - cpu: '2' - priority_class_name: igz-workload-medium - preemption_mode: prevent - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: app.iguazio.com/lifecycle - operator: NotIn - values: - - preemptible - - key: eks.amazonaws.com/capacityType - operator: NotIn - values: - - SPOT - - key: node-lifecycle - operator: NotIn - values: - - spot - tolerations: null - security_context: {} -verbose: false diff --git a/validate_great_expectations/item.yaml b/validate_great_expectations/item.yaml deleted file mode 100644 index 2c1a98b51..000000000 --- a/validate_great_expectations/item.yaml +++ /dev/null @@ -1,26 +0,0 @@ -apiVersion: v1 -categories: -- data-validation -- data-analysis -description: Validate a dataset using Great Expectations -doc: '' -example: validate_great_expectations.ipynb -generationDate: 2022-04-26:12-28 -hidden: false -icon: '' -labels: - author: nicks - framework: great-expectations -maintainers: [] -marketplaceType: '' -mlrunVersion: 1.1.0 -name: validate-great-expectations -platformVersion: 3.5.2 -spec: - filename: validate_great_expectations.py - handler: validate_expectations - image: mlrun/mlrun - kind: job - requirements: [great-expectations==0.15.41] -url: '' -version: 1.1.0 \ No newline at end of file diff --git a/validate_great_expectations/requirements.txt b/validate_great_expectations/requirements.txt deleted file mode 100644 index d1b8ef94b..000000000 --- a/validate_great_expectations/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -great-expectations==0.15.41 \ No newline at end of file diff --git a/validate_great_expectations/test_validate_great_expectations.py b/validate_great_expectations/test_validate_great_expectations.py deleted file mode 100644 index 0c54a6ec9..000000000 --- a/validate_great_expectations/test_validate_great_expectations.py +++ /dev/null @@ -1,209 +0,0 @@ -import os -import sys -from pathlib import Path -import shutil -import mlrun - -import pandas as pd -from great_expectations.core.batch import RuntimeBatchRequest -from great_expectations.data_context import BaseDataContext -from great_expectations.data_context.types.base import ( - DataContextConfig, - FilesystemStoreBackendDefaults, -) -from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult - -from validate_great_expectations import ( - get_default_datasource_config, - get_default_checkpoint_config, - get_data_doc_path, -) - - -DATA_ASSET_NAME = "iris_dataset" -DATA_PATH = "https://s3.wasabisys.com/iguazio/data/iris/iris.data.raw.csv" -EXPECTATION_SUITE_NAME = "test_suite" -ROOT_DIRECTORY = f"/tmp/great_expectations" -DATASOURCE_NAME = "pandas_datasource" -DATA_CONNECTOR_NAME = "default_runtime_data_connector_name" - - -def test_get_default_datasource_config(): - datasource_name = "my_datasource" - data_connector_name = "my_dataconnector" - - expected_datasource_config = { - "name": f"{datasource_name}", - "class_name": "Datasource", - "module_name": "great_expectations.datasource", - "execution_engine": { - "module_name": "great_expectations.execution_engine", - "class_name": "PandasExecutionEngine", - }, - "data_connectors": { - f"{data_connector_name}": { - "class_name": "RuntimeDataConnector", - "module_name": "great_expectations.datasource.data_connector", - "batch_identifiers": ["default_identifier_name"], - }, - }, - } - - assert ( - get_default_datasource_config( - datasource_name=datasource_name, data_connector_name=data_connector_name - ) - == expected_datasource_config - ) - - -def test_get_default_checkpoint_config(): - checkpoint_name = "my_checkpoint" - - expected_checkpoint_config = { - "name": checkpoint_name, - "config_version": 1.0, - "class_name": "SimpleCheckpoint", - "run_name_template": "%Y%m%d-%H%M%S-my-run-name-template", - } - - assert ( - get_default_checkpoint_config(checkpoint_name=checkpoint_name) - == expected_checkpoint_config - ) - - -def set_expectations(fail=False): - ge_context = BaseDataContext( - project_config=DataContextConfig( - store_backend_defaults=FilesystemStoreBackendDefaults( - root_directory=ROOT_DIRECTORY - ) - ) - ) - - datasource_config = { - "name": f"{DATASOURCE_NAME}", - "class_name": "Datasource", - "module_name": "great_expectations.datasource", - "execution_engine": { - "module_name": "great_expectations.execution_engine", - "class_name": "PandasExecutionEngine", - }, - "data_connectors": { - f"{DATA_CONNECTOR_NAME}": { - "class_name": "RuntimeDataConnector", - "module_name": "great_expectations.datasource.data_connector", - "batch_identifiers": ["default_identifier_name"], - }, - }, - } - ge_context.add_datasource(**datasource_config) - - ge_context.create_expectation_suite( - expectation_suite_name=EXPECTATION_SUITE_NAME, overwrite_existing=True - ) - - df = pd.read_csv(DATA_PATH) - - batch_request = RuntimeBatchRequest( - datasource_name=DATASOURCE_NAME, - data_connector_name=DATA_CONNECTOR_NAME, - data_asset_name=DATA_ASSET_NAME, - runtime_parameters={"batch_data": df}, - batch_identifiers={"default_identifier_name": "default_identifier"}, - ) - - validator = ge_context.get_validator( - batch_request=batch_request, - expectation_suite_name=EXPECTATION_SUITE_NAME, - ) - - validator.expect_column_values_to_not_be_null(column="sepal length (cm)") - validator.expect_column_values_to_not_be_null(column="sepal width (cm)") - validator.expect_column_values_to_be_between( - column="sepal width (cm)", min_value=2, max_value=4.4 - ) - if fail: - validator.expect_column_values_to_be_between( - column="sepal length (cm)", min_value=0, max_value=5 - ) - - validator.save_expectation_suite(discard_failed_expectations=False) - - -def cleanup_expectations(): - dirpath = Path(ROOT_DIRECTORY) - if dirpath.exists() and dirpath.is_dir(): - shutil.rmtree(dirpath) - - -def run_expectations(): - fn = mlrun.import_function("function.yaml") - run = fn.run( - inputs={"data": "https://s3.wasabisys.com/iguazio/data/iris/iris.data.raw.csv"}, - params={ - "expectation_suite_name": EXPECTATION_SUITE_NAME, - "data_asset_name": DATA_ASSET_NAME, - "root_directory": ROOT_DIRECTORY, - "datasource_name": DATASOURCE_NAME, - "data_connector_name": DATA_CONNECTOR_NAME, - }, - local=True, - ) - return run - - -def test_validate_expectations_pass(): - # Setup - set_expectations(fail=False) - run = run_expectations() - - # Check that great expectations directory structure was successfully created - dirpath = Path(ROOT_DIRECTORY) - assert dirpath.exists() - assert dirpath.is_dir() - - # Check that run outptuts were successfully saved - assert "validated" in run.outputs - assert "validation_results" in run.outputs - - # Check that validation passed - assert run.outputs["validated"] == True - - # Assert that data docs were saved in run - assert run.outputs["validation_results"].endswith(".html") - - # Assert that data docs exist on filesystem - dirpath = Path(run.outputs["validation_results"]) - assert dirpath.exists() - - # Tear down - cleanup_expectations() - -def test_validate_expectations_fail(): - # Setup - set_expectations(fail=True) - run = run_expectations() - - # Check that great expectations directory structure was successfully created - dirpath = Path(ROOT_DIRECTORY) - assert dirpath.exists() - assert dirpath.is_dir() - - # Check that run outptuts were successfully saved - assert "validated" in run.outputs - assert "validation_results" in run.outputs - - # Check that validation passed - assert run.outputs["validated"] == False - - # Assert that data docs were saved in run - assert run.outputs["validation_results"].endswith(".html") - - # Assert that data docs exist on filesystem - dirpath = Path(run.outputs["validation_results"]) - assert dirpath.exists() - - # Tear down - cleanup_expectations() \ No newline at end of file diff --git a/validate_great_expectations/validate_great_expectations.ipynb b/validate_great_expectations/validate_great_expectations.ipynb deleted file mode 100644 index cd07a033f..000000000 --- a/validate_great_expectations/validate_great_expectations.ipynb +++ /dev/null @@ -1,934 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "07e810dc", - "metadata": {}, - "outputs": [], - "source": [ - "import mlrun\n", - "import pandas as pd\n", - "from great_expectations.core.batch import RuntimeBatchRequest\n", - "from great_expectations.data_context import BaseDataContext\n", - "from great_expectations.data_context.types.base import (\n", - " DataContextConfig,\n", - " FilesystemStoreBackendDefaults,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "f57b4a0e", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "> 2023-03-03 22:08:23,289 [info] loaded project great-expectations from MLRun DB\n" - ] - } - ], - "source": [ - "project = mlrun.get_or_create_project(\"great-expectations\", context=\"./\")" - ] - }, - { - "cell_type": "markdown", - "id": "6f721976", - "metadata": {}, - "source": [ - "### Config" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "63852ffe", - "metadata": {}, - "outputs": [], - "source": [ - "data_asset_name = \"iris_dataset\"\n", - "data_path = \"https://s3.wasabisys.com/iguazio/data/iris/iris.data.raw.csv\"\n", - "expectation_suite_name = \"test_suite\"\n", - "root_directory = f\"/v3io/projects/{project.name}/great_expectations\"" - ] - }, - { - "cell_type": "markdown", - "id": "b4a3da34", - "metadata": {}, - "source": [ - "### Intialize Great Expectations Context" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "fb9c3956", - "metadata": {}, - "outputs": [], - "source": [ - "ge_context = BaseDataContext(\n", - " project_config=DataContextConfig(\n", - " store_backend_defaults=FilesystemStoreBackendDefaults(\n", - " root_directory=root_directory\n", - " )\n", - " )\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "42f80798", - "metadata": {}, - "source": [ - "### Add Pandas Datasource" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "id": "dc3a01f3", - "metadata": {}, - "outputs": [], - "source": [ - "datasource_name = \"pandas_datasource\"\n", - "data_connector_name = \"default_runtime_data_connector_name\"" - ] - }, - { - "cell_type": "code", - "execution_count": 6, - "id": "b893a260", - "metadata": {}, - "outputs": [], - "source": [ - "datasource_config = {\n", - " \"name\": f\"{datasource_name}\",\n", - " \"class_name\": \"Datasource\",\n", - " \"module_name\": \"great_expectations.datasource\",\n", - " \"execution_engine\": {\n", - " \"module_name\": \"great_expectations.execution_engine\",\n", - " \"class_name\": \"PandasExecutionEngine\",\n", - " },\n", - " \"data_connectors\": {\n", - " f\"{data_connector_name}\": {\n", - " \"class_name\": \"RuntimeDataConnector\",\n", - " \"module_name\": \"great_expectations.datasource.data_connector\",\n", - " \"batch_identifiers\": [\"default_identifier_name\"],\n", - " },\n", - " },\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": 7, - "id": "0358a4ac", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ge_context.add_datasource(**datasource_config)" - ] - }, - { - "cell_type": "markdown", - "id": "8c8406cf", - "metadata": {}, - "source": [ - "### Create Expectation Suite" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "f68fb7e9", - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{\n", - " \"data_asset_type\": null,\n", - " \"meta\": {\n", - " \"great_expectations_version\": \"0.15.41\"\n", - " },\n", - " \"ge_cloud_id\": null,\n", - " \"expectations\": [],\n", - " \"expectation_suite_name\": \"test_suite\"\n", - "}" - ] - }, - "execution_count": 8, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "ge_context.create_expectation_suite(\n", - " expectation_suite_name=expectation_suite_name, overwrite_existing=True\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "c805fb0b", - "metadata": {}, - "source": [ - "### Get Data Batch" - ] - }, - { - "cell_type": "code", - "execution_count": 9, - "id": "a2a7c0ed", - "metadata": {}, - "outputs": [], - "source": [ - "df = pd.read_csv(data_path)" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "id": "9838eb91", - "metadata": {}, - "outputs": [], - "source": [ - "batch_request = RuntimeBatchRequest(\n", - " datasource_name=datasource_name,\n", - " data_connector_name=data_connector_name,\n", - " data_asset_name=data_asset_name,\n", - " runtime_parameters={\"batch_data\": df},\n", - " batch_identifiers={\"default_identifier_name\": \"default_identifier\"},\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "b65f9642", - "metadata": {}, - "source": [ - "### Get Validator" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "8f8a1b3a", - "metadata": {}, - "outputs": [], - "source": [ - "validator = ge_context.get_validator(\n", - " batch_request=batch_request,\n", - " expectation_suite_name=expectation_suite_name,\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "a5e9f68c", - "metadata": {}, - "source": [ - "### Add Expectations" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "7c3b44aa", - "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "789561a733774d34a821d4a57e18e9b9", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "Calculating Metrics: 0%| | 0/6 [00:00 2023-03-03 22:09:01,338 [info] function spec saved to path: function.yaml\n" - ] - }, - { - "data": { - "text/plain": [ - "" - ] - }, - "execution_count": 15, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "fn.export()" - ] - }, - { - "cell_type": "code", - "execution_count": 16, - "id": "a54cac0e", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "> 2023-03-03 22:09:13,830 [info] Started building image: .mlrun/func-great-expectations-validate-expectations:latest\n", - "\u001b[36mINFO\u001b[0m[0000] Retrieving image manifest mlrun/mlrun:1.1.0 \n", - "\u001b[36mINFO\u001b[0m[0000] Retrieving image mlrun/mlrun:1.1.0 from registry index.docker.io \n", - "\u001b[36mINFO\u001b[0m[0000] Built cross stage deps: map[] \n", - "\u001b[36mINFO\u001b[0m[0000] Retrieving image manifest mlrun/mlrun:1.1.0 \n", - "\u001b[36mINFO\u001b[0m[0000] Returning cached image manifest \n", - "\u001b[36mINFO\u001b[0m[0000] Executing 0 build triggers \n", - "\u001b[36mINFO\u001b[0m[0000] Unpacking rootfs as cmd RUN python -m pip install great-expectations==0.15.41 requires it. \n", - "\u001b[36mINFO\u001b[0m[0021] RUN python -m pip install great-expectations==0.15.41 \n", - "\u001b[36mINFO\u001b[0m[0021] Taking snapshot of full filesystem... \n", - "\u001b[36mINFO\u001b[0m[0033] cmd: /bin/sh \n", - "\u001b[36mINFO\u001b[0m[0033] args: [-c python -m pip install great-expectations==0.15.41] \n", - "\u001b[36mINFO\u001b[0m[0033] Running: [/bin/sh -c python -m pip install great-expectations==0.15.41] \n", - "Collecting great-expectations==0.15.41\n", - " Downloading great_expectations-0.15.41-py3-none-any.whl (5.2 MB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 5.2/5.2 MB 122.0 MB/s eta 0:00:00\n", - "Requirement already satisfied: pyparsing>=2.4 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (3.0.9)\n", - "Collecting ipywidgets>=7.5.1\n", - " Downloading ipywidgets-8.0.4-py3-none-any.whl (137 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 137.8/137.8 KB 249.8 MB/s eta 0:00:00\n", - "Requirement already satisfied: notebook>=6.4.10 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (6.4.12)\n", - "Requirement already satisfied: mistune>=0.8.4 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (2.0.4)\n", - "Collecting tqdm>=4.59.0\n", - " Downloading tqdm-4.65.0-py3-none-any.whl (77 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 77.1/77.1 KB 230.4 MB/s eta 0:00:00\n", - "Requirement already satisfied: urllib3<1.27,>=1.25.4 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (1.26.12)\n", - "Requirement already satisfied: jinja2>=2.10 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (3.0.3)\n", - "Requirement already satisfied: scipy>=0.19.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (1.7.3)\n", - "Collecting tzlocal>=1.2\n", - " Downloading tzlocal-4.2-py3-none-any.whl (19 kB)\n", - "Requirement already satisfied: typing-extensions>=3.10.0.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (4.3.0)\n", - "Requirement already satisfied: requests>=2.20 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (2.28.1)\n", - "Requirement already satisfied: importlib-metadata>=1.7.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (4.12.0)\n", - "Requirement already satisfied: nbformat>=5.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (5.4.0)\n", - "Collecting altair<5,>=4.0.0\n", - " Downloading altair-4.2.2-py3-none-any.whl (813 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 813.6/813.6 KB 278.3 MB/s eta 0:00:00\n", - "Requirement already satisfied: numpy>=1.18.5 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (1.21.6)\n", - "Requirement already satisfied: pydantic<2.0,>=1.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (1.10.1)\n", - "Collecting jsonpatch>=1.22\n", - " Downloading jsonpatch-1.32-py2.py3-none-any.whl (12 kB)\n", - "Requirement already satisfied: cryptography>=3.2 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (3.3.2)\n", - "Requirement already satisfied: jsonschema>=2.5.1 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (3.2.0)\n", - "Requirement already satisfied: pytz>=2021.3 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (2022.2.1)\n", - "Collecting ruamel.yaml<0.17.18,>=0.16\n", - " Downloading ruamel.yaml-0.17.17-py3-none-any.whl (109 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 109.1/109.1 KB 247.4 MB/s eta 0:00:00\n", - "Requirement already satisfied: Ipython>=7.16.3 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (7.34.0)\n", - "Collecting marshmallow<4.0.0,>=3.7.1\n", - " Downloading marshmallow-3.19.0-py3-none-any.whl (49 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 49.1/49.1 KB 216.6 MB/s eta 0:00:00\n", - "Requirement already satisfied: Click>=7.1.2 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (8.0.4)\n", - "Collecting makefun<2,>=1.7.0\n", - " Downloading makefun-1.15.1-py2.py3-none-any.whl (22 kB)\n", - "Requirement already satisfied: python-dateutil>=2.8.1 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (2.8.2)\n", - "Collecting colorama>=0.4.3\n", - " Downloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)\n", - "Requirement already satisfied: pandas>=1.1.0 in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (1.3.5)\n", - "Requirement already satisfied: packaging in /usr/local/lib/python3.7/site-packages (from great-expectations==0.15.41) (21.3)\n", - "Requirement already satisfied: toolz in /usr/local/lib/python3.7/site-packages (from altair<5,>=4.0.0->great-expectations==0.15.41) (0.12.0)\n", - "Requirement already satisfied: entrypoints in /usr/local/lib/python3.7/site-packages (from altair<5,>=4.0.0->great-expectations==0.15.41) (0.4)\n", - "Requirement already satisfied: six>=1.4.1 in /usr/local/lib/python3.7/site-packages (from cryptography>=3.2->great-expectations==0.15.41) (1.16.0)\n", - "Requirement already satisfied: cffi>=1.12 in /usr/local/lib/python3.7/site-packages (from cryptography>=3.2->great-expectations==0.15.41) (1.15.1)\n", - "Requirement already satisfied: zipp>=0.5 in /usr/local/lib/python3.7/site-packages (from importlib-metadata>=1.7.0->great-expectations==0.15.41) (3.8.1)\n", - "Requirement already satisfied: prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0 in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (3.0.31)\n", - "Requirement already satisfied: backcall in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (0.2.0)\n", - "Requirement already satisfied: matplotlib-inline in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (0.1.6)\n", - "Requirement already satisfied: pexpect>4.3 in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (4.8.0)\n", - "Requirement already satisfied: setuptools>=18.5 in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (57.5.0)\n", - "Requirement already satisfied: decorator in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (5.1.1)\n", - "Requirement already satisfied: jedi>=0.16 in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (0.18.1)\n", - "Requirement already satisfied: pygments in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (2.13.0)\n", - "Requirement already satisfied: traitlets>=4.2 in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (5.3.0)\n", - "Requirement already satisfied: pickleshare in /usr/local/lib/python3.7/site-packages (from Ipython>=7.16.3->great-expectations==0.15.41) (0.7.5)\n", - "Requirement already satisfied: ipykernel>=4.5.1 in /usr/local/lib/python3.7/site-packages (from ipywidgets>=7.5.1->great-expectations==0.15.41) (6.15.2)\n", - "Collecting jupyterlab-widgets~=3.0\n", - " Downloading jupyterlab_widgets-3.0.5-py3-none-any.whl (384 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 384.3/384.3 KB 282.6 MB/s eta 0:00:00\n", - "Collecting widgetsnbextension~=4.0\n", - " Downloading widgetsnbextension-4.0.5-py3-none-any.whl (2.0 MB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.0/2.0 MB 264.7 MB/s eta 0:00:00\n", - "Requirement already satisfied: MarkupSafe>=2.0 in /usr/local/lib/python3.7/site-packages (from jinja2>=2.10->great-expectations==0.15.41) (2.1.1)\n", - "Collecting jsonpointer>=1.9\n", - " Downloading jsonpointer-2.3-py2.py3-none-any.whl (7.8 kB)\n", - "Requirement already satisfied: pyrsistent>=0.14.0 in /usr/local/lib/python3.7/site-packages (from jsonschema>=2.5.1->great-expectations==0.15.41) (0.18.1)\n", - "Requirement already satisfied: attrs>=17.4.0 in /usr/local/lib/python3.7/site-packages (from jsonschema>=2.5.1->great-expectations==0.15.41) (22.1.0)\n", - "Requirement already satisfied: jupyter-core in /usr/local/lib/python3.7/site-packages (from nbformat>=5.0->great-expectations==0.15.41) (4.11.1)\n", - "Requirement already satisfied: fastjsonschema in /usr/local/lib/python3.7/site-packages (from nbformat>=5.0->great-expectations==0.15.41) (2.16.1)\n", - "Requirement already satisfied: ipython-genutils in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (0.2.0)\n", - "Requirement already satisfied: nbconvert>=5 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (7.0.0)\n", - "Requirement already satisfied: Send2Trash>=1.8.0 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (1.8.0)\n", - "Requirement already satisfied: prometheus-client in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (0.14.1)\n", - "Requirement already satisfied: tornado>=6.1 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (6.2)\n", - "Requirement already satisfied: nest-asyncio>=1.5 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (1.5.5)\n", - "Requirement already satisfied: pyzmq>=17 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (23.2.1)\n", - "Requirement already satisfied: jupyter-client>=5.3.4 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (7.3.5)\n", - "Requirement already satisfied: terminado>=0.8.3 in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (0.15.0)\n", - "Requirement already satisfied: argon2-cffi in /usr/local/lib/python3.7/site-packages (from notebook>=6.4.10->great-expectations==0.15.41) (21.3.0)\n", - "Requirement already satisfied: idna<4,>=2.5 in /usr/local/lib/python3.7/site-packages (from requests>=2.20->great-expectations==0.15.41) (3.3)\n", - "Requirement already satisfied: certifi>=2017.4.17 in /usr/local/lib/python3.7/site-packages (from requests>=2.20->great-expectations==0.15.41) (2022.6.15)\n", - "Requirement already satisfied: charset-normalizer<3,>=2 in /usr/local/lib/python3.7/site-packages (from requests>=2.20->great-expectations==0.15.41) (2.1.1)\n", - "Collecting ruamel.yaml.clib>=0.1.2\n", - " Downloading ruamel.yaml.clib-0.2.7-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (500 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 500.1/500.1 KB 251.9 MB/s eta 0:00:00\n", - "Collecting pytz-deprecation-shim\n", - " Downloading pytz_deprecation_shim-0.1.0.post0-py2.py3-none-any.whl (15 kB)\n", - "Collecting backports.zoneinfo\n", - " Downloading backports.zoneinfo-0.2.1-cp37-cp37m-manylinux1_x86_64.whl (70 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 70.7/70.7 KB 212.6 MB/s eta 0:00:00\n", - "Requirement already satisfied: pycparser in /usr/local/lib/python3.7/site-packages (from cffi>=1.12->cryptography>=3.2->great-expectations==0.15.41) (2.21)\n", - "Requirement already satisfied: psutil in /usr/local/lib/python3.7/site-packages (from ipykernel>=4.5.1->ipywidgets>=7.5.1->great-expectations==0.15.41) (5.9.2)\n", - "Requirement already satisfied: debugpy>=1.0 in /usr/local/lib/python3.7/site-packages (from ipykernel>=4.5.1->ipywidgets>=7.5.1->great-expectations==0.15.41) (1.6.3)\n", - "Requirement already satisfied: parso<0.9.0,>=0.8.0 in /usr/local/lib/python3.7/site-packages (from jedi>=0.16->Ipython>=7.16.3->great-expectations==0.15.41) (0.8.3)\n", - "Requirement already satisfied: nbclient>=0.5.0 in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (0.6.7)\n", - "Requirement already satisfied: beautifulsoup4 in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (4.11.1)\n", - "Requirement already satisfied: pandocfilters>=1.4.1 in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (1.5.0)\n", - "Requirement already satisfied: tinycss2 in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (1.1.1)\n", - "Requirement already satisfied: jupyterlab-pygments in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (0.2.2)\n", - "Requirement already satisfied: defusedxml in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (0.7.1)\n", - "Requirement already satisfied: bleach in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (5.0.1)\n", - "Requirement already satisfied: lxml in /usr/local/lib/python3.7/site-packages (from nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (4.9.1)\n", - "Requirement already satisfied: ptyprocess>=0.5 in /usr/local/lib/python3.7/site-packages (from pexpect>4.3->Ipython>=7.16.3->great-expectations==0.15.41) (0.7.0)\n", - "Requirement already satisfied: wcwidth in /usr/local/lib/python3.7/site-packages (from prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0->Ipython>=7.16.3->great-expectations==0.15.41) (0.2.5)\n", - "Requirement already satisfied: argon2-cffi-bindings in /usr/local/lib/python3.7/site-packages (from argon2-cffi->notebook>=6.4.10->great-expectations==0.15.41) (21.2.0)\n", - "Collecting tzdata\n", - " Downloading tzdata-2022.7-py2.py3-none-any.whl (340 kB)\n", - " ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 340.1/340.1 KB 258.0 MB/s eta 0:00:00\n", - "Requirement already satisfied: soupsieve>1.2 in /usr/local/lib/python3.7/site-packages (from beautifulsoup4->nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (2.3.2.post1)\n", - "Requirement already satisfied: webencodings in /usr/local/lib/python3.7/site-packages (from bleach->nbconvert>=5->notebook>=6.4.10->great-expectations==0.15.41) (0.5.1)\n", - "Installing collected packages: makefun, widgetsnbextension, tzdata, tqdm, ruamel.yaml.clib, jupyterlab-widgets, jsonpointer, colorama, backports.zoneinfo, ruamel.yaml, pytz-deprecation-shim, marshmallow, jsonpatch, tzlocal, altair, ipywidgets, great-expectations\n", - "Successfully installed altair-4.2.2 backports.zoneinfo-0.2.1 colorama-0.4.6 great-expectations-0.15.41 ipywidgets-8.0.4 jsonpatch-1.32 jsonpointer-2.3 jupyterlab-widgets-3.0.5 makefun-1.15.1 marshmallow-3.19.0 pytz-deprecation-shim-0.1.0.post0 ruamel.yaml-0.17.17 ruamel.yaml.clib-0.2.7 tqdm-4.65.0 tzdata-2022.7 tzlocal-4.2 widgetsnbextension-4.0.5\n", - "WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv\n", - "WARNING: You are using pip version 22.0.4; however, version 23.0.1 is available.\n", - "You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.\n", - "\u001b[36mINFO\u001b[0m[0039] Taking snapshot of full filesystem... \n", - "\u001b[36mINFO\u001b[0m[0042] Pushing image to docker-registry.default-tenant.app.us-sales-350.iguazio-cd1.com:80/mlrun/func-great-expectations-validate-expectations:latest \n", - "\u001b[36mINFO\u001b[0m[0042] Pushed docker-registry.default-tenant.app.us-sales-350.iguazio-cd1.com:80/mlrun/func-great-expectations-validate-expectations@sha256:1e3b2615cc8f2dc39062037c0a27299e15d12d3011d50a9e8214ec34b84c21a2 \n", - "\n" - ] - }, - { - "data": { - "text/plain": [ - "BuildStatus(ready=True, outputs={'image': '.mlrun/func-great-expectations-validate-expectations:latest'})" - ] - }, - "execution_count": 16, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "project.build_function(fn)" - ] - }, - { - "cell_type": "markdown", - "id": "df71ef7c", - "metadata": {}, - "source": [ - "### Run Validation" - ] - }, - { - "cell_type": "code", - "execution_count": 17, - "id": "857021cf", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "> 2023-03-03 22:10:40,839 [info] starting run validate-expectations-validate_expectations uid=436372d741034d678145c63fecfe4450 DB=http://mlrun-api:8080\n", - "> 2023-03-03 22:10:41,124 [info] Job is running in the background, pod: validate-expectations-validate-expectations-tx9xb\n", - "> 2023-03-03 22:10:55,088 [info] run executed, status=completed\n", - "Calculating Metrics: 100%|██████████| 19/19 [00:00<00:00, 323.87it/s]\n", - "final state: completed\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - "
\n", - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
projectuiditerstartstatenamelabelsinputsparametersresultsartifacts
great-expectations0Mar 03 22:10:51completedvalidate-expectations-validate_expectations
v3io_user=nick
kind=job
owner=nick
mlrun/client_version=1.1.0
host=validate-expectations-validate-expectations-tx9xb
data
expectation_suite_name=test_suite
data_asset_name=iris_dataset
validated=False
validation_results
\n", - "
\n", - "
\n", - "
\n", - " Title\n", - " ×\n", - "
\n", - " \n", - "
\n", - "
\n" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n" - ] - }, - { - "data": { - "text/html": [ - " > to track results use the .show() or .logs() methods or click here to open in UI" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "> 2023-03-03 22:11:01,178 [info] run executed, status=completed\n" - ] - } - ], - "source": [ - "run = fn.run(\n", - " inputs={\"data\": \"https://s3.wasabisys.com/iguazio/data/iris/iris.data.raw.csv\"},\n", - " params={\n", - " \"expectation_suite_name\": \"test_suite\",\n", - " \"data_asset_name\": \"iris_dataset\",\n", - " },\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "dec9172c", - "metadata": {}, - "source": [ - "### View Data Doc" - ] - }, - { - "cell_type": "code", - "execution_count": 18, - "id": "8b90a4c7", - "metadata": {}, - "outputs": [], - "source": [ - "import os\n", - "from IPython.display import IFrame" - ] - }, - { - "cell_type": "code", - "execution_count": 19, - "id": "3c59d69e", - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "\n", - " \n", - " " - ], - "text/plain": [ - "" - ] - }, - "execution_count": 19, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "IFrame(src=os.path.relpath(run.outputs[\"validation_results\"]), width=1000, height=800)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "a3caea0c", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python [conda env:root] *", - "language": "python", - "name": "conda-root-py" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.7.6" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/validate_great_expectations/validate_great_expectations.py b/validate_great_expectations/validate_great_expectations.py deleted file mode 100644 index 1e48df031..000000000 --- a/validate_great_expectations/validate_great_expectations.py +++ /dev/null @@ -1,197 +0,0 @@ -import os -import shutil - -import mlrun - -from great_expectations.core.batch import RuntimeBatchRequest -from great_expectations.data_context import BaseDataContext -from great_expectations.data_context.types.base import ( - DataContextConfig, - FilesystemStoreBackendDefaults, -) -from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult - - -def get_default_datasource_config( - datasource_name: str, data_connector_name: str -) -> dict: - """ - Convenience function to get the default pandas datasource config - for use in validating expectations. - - :param datasource_name: Name of datasource. - :param data_connector_name: Name of data connector. - - :returns: Configuration for default datasource. - """ - default_datasource_config = { - "name": f"{datasource_name}", - "class_name": "Datasource", - "module_name": "great_expectations.datasource", - "execution_engine": { - "module_name": "great_expectations.execution_engine", - "class_name": "PandasExecutionEngine", - }, - "data_connectors": { - f"{data_connector_name}": { - "class_name": "RuntimeDataConnector", - "module_name": "great_expectations.datasource.data_connector", - "batch_identifiers": ["default_identifier_name"], - }, - }, - } - return default_datasource_config - - -def get_default_checkpoint_config(checkpoint_name: str) -> dict: - """ - Convenience function to get the default checkpoint config for - use in validating expectations. - - :param checkpoint_name: Name of checkpoint. - - :returns: Configuration for default checkpoint. - """ - return { - "name": checkpoint_name, - "config_version": 1.0, - "class_name": "SimpleCheckpoint", - "run_name_template": "%Y%m%d-%H%M%S-my-run-name-template", - } - - -def get_data_doc_path(checkpoint_result: CheckpointResult) -> str: - """ - Convenience function to get the path of the output - data doc from a checkpoint result. - - :param checkpoint_result: Great Expectations checkpoint result. - - :returns: Absolute path to new data doc. - """ - result_id = checkpoint_result.list_validation_result_identifiers()[0] - data_doc_path = checkpoint_result["run_results"][result_id]["actions_results"][ - "update_data_docs" - ]["local_site"] - data_doc_path = data_doc_path.replace("file://", "") - return data_doc_path - - -def validate_expectations( - context: mlrun.MLClientCtx, - data: mlrun.DataItem, - expectation_suite_name: str, - data_asset_name: str, - datasource_name: str = "pandas_datasource", - data_connector_name: str = "default_runtime_data_connector_name", - datasource_config: dict = None, - batch_identifiers: dict = None, - root_directory: str = None, - checkpoint_name: str = None, - checkpoint_config: dict = None, -) -> None: - """ - Main function to validate an input dataset, datasource, data connector, - and expectation suite. - - Runs the Great Expectation validation and logs - whether the validation was a success as well as the output page - of the data docs. - - :param context: MLRun context. - :param data: Data to validate. Can be local or remote link. - :param expectation_suite_name: Name of expectation suite to validate against. - :param data_asset_name: Name of dataset in Great Expectations. - :param datasource_name: Name of datasource to use for validation. - :param data_connector_name: Name of data connector to use for validation. - :param datasource_config: Full configuration for datasource. For use with custom - data sources other than the default pandas datasource. - :param batch_identifiers: Custom metadata for identifying particular batches of - data. For use when not using the default batch identifiers. - :param root_directory: Path to underlying Great Expectations project. Defaults to - MLRun project artifact path if not specified. - :param checkpoint_name: Name of checkpoint to use for validation. - :param checkpoint_config: Full configuration for checkpoint. For use with custome - checkpoint config other than the default. - """ - - # Get data - df = data.as_df() - - # Use default root directory for project if not specified - root_directory = ( - root_directory - if root_directory - else f"/v3io/projects/{context.project}/great_expectations" - ) - - # Load great expectations context - ge_context = BaseDataContext( - project_config=DataContextConfig( - store_backend_defaults=FilesystemStoreBackendDefaults( - root_directory=root_directory - ) - ) - ) - - # Get expectation suite - ge_context.get_expectation_suite(expectation_suite_name=expectation_suite_name) - - # Add default data source if not specified - datasource_config = ( - datasource_config - if datasource_config - else get_default_datasource_config(datasource_name, data_connector_name) - ) - ge_context.add_datasource(**datasource_config) - - # Get data batch - batch_identifiers = ( - batch_identifiers - if batch_identifiers - else {"default_identifier_name": "default_identifier"} - ) - batch_request = RuntimeBatchRequest( - datasource_name=datasource_name, - data_connector_name=data_connector_name, - data_asset_name=data_asset_name, - runtime_parameters={"batch_data": df}, - batch_identifiers=batch_identifiers, - ) - - # Get validator - validator = ge_context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, - ) - - # Use default checkpoint name and config if not specified - checkpoint_name = ( - checkpoint_name if checkpoint_name else f"{data_asset_name}_checkpoint" - ) - checkpoint_config = ( - checkpoint_config - if checkpoint_config - else get_default_checkpoint_config(checkpoint_name) - ) - - # Add checkpoint - ge_context.add_checkpoint(**checkpoint_config) - - # Run expectation suite on checkpoint - checkpoint_result = ge_context.run_checkpoint( - checkpoint_name=checkpoint_name, - validations=[ - { - "batch_request": batch_request, - "expectation_suite_name": expectation_suite_name, - } - ], - ) - - # Log success - context.log_result("validated", checkpoint_result["success"]) - - # Log data doc - data_doc_path = get_data_doc_path(checkpoint_result) - context.log_artifact("validation_results", target_path=data_doc_path)