Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extensively to store intermediate and final task/pipeline artifacts. Furthermore
to serve models directly from object storage.

prokube.ai comes pre-configured with integrated object storage. Alternatively, admins can configure pipelines
to use other instances of object storage (e.g. self-hosted MinIO, AWS S3, GCS, etc.).
to use other instances of object storage (e.g. self-hosted S3-compatible storage, AWS S3, GCS, etc.).
Many S3 libraries use environment variables for their configuration — those are usually:
`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `S3_ENDPOINT`. They are likely already
available in your environment. You can also ask your admin about them.
Expand Down
2 changes: 1 addition & 1 deletion mlflow/mlflow-kfp-example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
"outputs": [],
"source": [
"def add_env_vars_to_tasks(task_list: list[dsl.PipelineTask]) -> None:\n",
" \"\"\"Adds environment variables for MinIO to the MLflow tasks\"\"\"\n",
" \"\"\"Adds object storage environment variables to the MLflow tasks\"\"\"\n",
" for task in task_list:\n",
" use_secret_as_env(\n",
" task,\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,21 @@
"metadata": {},
"outputs": [],
"source": [
"# Configuration for MinIO bucket\n",
"# Change to your MinIO bucket name, which is <namespace>-data by default\n",
"# You can check your buckets by opening a terminal in the notebook and run 'mc ls minio'\n",
"s3_bucket = \"\"\n",
"s3_dataset_path = f\"{s3_bucket}/mobile-price-classification\"\n",
"if not s3_bucket:\n",
" raise ValueError(\"Please set the 's3_bucket' variable to your MinIO bucket name.\")"
"# Configuration for object storage bucket\n",
"# Uses the default prokube bucket for this namespace: <namespace>-data\n",
"# You can check your buckets by opening your object storage browser or using the configured storage CLI alias.\n",
"with open(\"/var/run/secrets/kubernetes.io/serviceaccount/namespace\", \"r\") as namespace_file:\n",
" namespace = namespace_file.read().strip()\n",
"s3_bucket = f\"{namespace}-data\"\n",
"s3_dataset_path = f\"{s3_bucket}/mobile-price-classification\""
]
},
{
"cell_type": "markdown",
"id": "65946d3d",
"metadata": {},
"source": [
"## Download Dataset From Kagglehub and Upload to MinIO with Python"
"## Download Dataset From Kagglehub and Upload to Object Storage with Python"
]
},
{
Expand All @@ -132,14 +132,14 @@
"metadata": {},
"outputs": [],
"source": [
"# Upload dataset to MinIO\n",
"# Upload dataset to object storage\n",
"if not os.getenv(\"AWS_ACCESS_KEY_ID\") or not os.getenv(\"AWS_SECRET_ACCESS_KEY\"):\n",
" raise ValueError(\"AWS credentials not found in environment variables.\")\n",
"\n",
"# Initialize S3 filesystem\n",
"s3 = s3fs.S3FileSystem()\n",
"\n",
"# Upload the dataset to MinIO\n",
"# Upload the dataset to object storage\n",
"s3.put(f\"{dataset_path}/train.csv\", f\"{s3_dataset_path}/train.csv\")\n",
"s3.put(f\"{dataset_path}/test.csv\", f\"{s3_dataset_path}/test.csv\")\n",
"\n",
Expand Down Expand Up @@ -174,16 +174,16 @@
" base_image=\"python:3.9\",\n",
")\n",
"def read_data(\n",
" minio_train_data_path: str,\n",
" minio_test_data_path: str,\n",
" train_data_path: str,\n",
" test_data_path: str,\n",
" train_df: Output[Dataset],\n",
" test_df: Output[Dataset], \n",
"):\n",
" \"\"\"Reads training and test data writes it to pipeline artifacts as parquet.\"\"\"\n",
" import pandas as pd\n",
"\n",
" df_train = pd.read_csv(minio_train_data_path)\n",
" df_test = pd.read_csv(minio_test_data_path)\n",
" df_train = pd.read_csv(train_data_path)\n",
" df_test = pd.read_csv(test_data_path)\n",
" \n",
" df_train.to_parquet(train_df.path)\n",
" df_test.to_parquet(test_df.path)"
Expand Down Expand Up @@ -592,8 +592,8 @@
"source": [
"@dsl.pipeline\n",
"def mobile_price_classification_pipeline(\n",
" minio_train_data_path: str,\n",
" minio_test_data_path: str,\n",
" train_data_path: str,\n",
" test_data_path: str,\n",
" test_size: float = 0.5,\n",
" C: List = [1, 0.1, 0.25, 0.5, 2, 0.75],\n",
" kernel: List = [\"linear\", \"rbf\"],\n",
Expand All @@ -618,11 +618,11 @@
" \n",
" # Step 1: Read the data\n",
" read_data_task = read_data(\n",
" minio_train_data_path=minio_train_data_path,\n",
" minio_test_data_path=minio_test_data_path,\n",
" train_data_path=train_data_path,\n",
" test_data_path=test_data_path,\n",
" )\n",
" # Use the cluster internal s3 endpoint\n",
" read_data_task.set_env_variable('AWS_ENDPOINT_URL',\"http://minio.minio\")\n",
" read_data_task.set_env_variable('AWS_ENDPOINT_URL', f'http://{os.environ[\"S3_ENDPOINT\"]}')\n",
" # Use Kubernetes secrets to provide AWS credentials to the read_data component\n",
" kubernetes.use_secret_as_env(\n",
" read_data_task,\n",
Expand Down Expand Up @@ -702,8 +702,8 @@
"\n",
"# Define the arguments to be passed to the pipeline\n",
"args = dict(\n",
" minio_train_data_path=f\"s3://{s3_dataset_path}/train.csv\",\n",
" minio_test_data_path=f\"s3://{s3_dataset_path}/test.csv\",\n",
" train_data_path=f\"s3://{s3_dataset_path}/train.csv\",\n",
" test_data_path=f\"s3://{s3_dataset_path}/test.csv\",\n",
" test_size=0.2,\n",
" C=[1, 0.1, 0.25, 0.5, 2, 0.75],\n",
" kernel=[\"linear\", \"rbf\"],\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@
"metadata": {},
"outputs": [],
"source": [
"# Configuration for MinIO bucket\n",
"# Change to your MinIO bucket name, which is <namespace>-data by default\n",
"# You can check your buckets by opening a terminal in the notebook and run 'mc ls minio'\n",
"s3_bucket = \"\"\n",
"# Configuration for object storage bucket\n",
"# Uses the default prokube bucket for this namespace: <namespace>-data\n",
"# You can check your buckets by opening your object storage browser or using the configured storage CLI alias.\n",
"with open(\"/var/run/secrets/kubernetes.io/serviceaccount/namespace\", \"r\") as namespace_file:\n",
" namespace = namespace_file.read().strip()\n",
"s3_bucket = f\"{namespace}-data\"\n",
"s3_dataset_path = f\"{s3_bucket}/mobile-price-classification\"\n",
"if not s3_bucket:\n",
" raise ValueError(\"Please set the 's3_bucket' variable to your MinIO bucket name.\")\n",
"\n",
"minio_train_data_path=f\"s3://{s3_dataset_path}/train.csv\"\n",
"minio_test_data_path=f\"s3://{s3_dataset_path}/test.csv\""
"train_data_path=f\"s3://{s3_dataset_path}/train.csv\"\n",
"test_data_path=f\"s3://{s3_dataset_path}/test.csv\""
]
},
{
Expand Down Expand Up @@ -89,7 +89,7 @@
"id": "65946d3d",
"metadata": {},
"source": [
"## Download Dataset From Kagglehub and Upload to MinIO with Python"
"## Download Dataset From Kagglehub and Upload to Object Storage with Python"
]
},
{
Expand All @@ -112,14 +112,14 @@
"metadata": {},
"outputs": [],
"source": [
"# Upload dataset to MinIO\n",
"# Upload dataset to object storage\n",
"if not os.getenv(\"AWS_ACCESS_KEY_ID\") or not os.getenv(\"AWS_SECRET_ACCESS_KEY\"):\n",
" raise ValueError(\"AWS credentials not found in environment variables.\")\n",
"\n",
"# Initialize S3 filesystem\n",
"s3 = s3fs.S3FileSystem()\n",
"\n",
"# Upload the dataset to MinIO\n",
"# Upload the dataset to object storage\n",
"s3.put(f\"{dataset_path}/train.csv\", f\"{s3_dataset_path}/train.csv\")\n",
"s3.put(f\"{dataset_path}/test.csv\", f\"{s3_dataset_path}/test.csv\")\n",
"\n",
Expand Down Expand Up @@ -149,9 +149,9 @@
"metadata": {},
"outputs": [],
"source": [
"# The storage options are preconfigured for the cluster internal MinIO, so we can read the data directly without additional configuration\n",
"df_train = pd.read_csv(minio_train_data_path)\n",
"df_test = pd.read_csv(minio_test_data_path)"
"# The storage options are preconfigured for the cluster internal object storage, so we can read the data directly without additional configuration\n",
"df_train = pd.read_csv(train_data_path)\n",
"df_test = pd.read_csv(test_data_path)"
]
},
{
Expand Down Expand Up @@ -346,7 +346,7 @@
"import plotly.express as px\n",
"\n",
"# Load the fitted scaler and preprocess test data\n",
"x_test = pd.read_csv(minio_test_data_path)\n",
"x_test = pd.read_csv(test_data_path)\n",
"x_test = x_test.drop('id', axis=1)\n",
"x_test = pd.DataFrame(scaler.transform(x_test), columns=x_test.columns)\n",
"\n",
Expand Down
44 changes: 22 additions & 22 deletions pipelines/lightweight-components/mobile-price-classifications.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@
"metadata": {},
"outputs": [],
"source": [
"# Configuration for MinIO bucket\n",
"# Change to your MinIO bucket name, which is <namespace>-data by default\n",
"# You can check your buckets by opening a terminal in the notebook and run 'mc ls minio'\n",
"s3_bucket = \"\"\n",
"s3_dataset_path = f\"{s3_bucket}/mobile-price-classification\"\n",
"if not s3_bucket:\n",
" raise ValueError(\"Please set the 's3_bucket' variable to your MinIO bucket name.\")"
"# Configuration for object storage bucket\n",
"# Uses the default prokube bucket for this namespace: <namespace>-data\n",
"# You can check your buckets by opening your object storage browser or using the configured storage CLI alias.\n",
"with open(\"/var/run/secrets/kubernetes.io/serviceaccount/namespace\", \"r\") as namespace_file:\n",
" namespace = namespace_file.read().strip()\n",
"s3_bucket = f\"{namespace}-data\"\n",
"s3_dataset_path = f\"{s3_bucket}/mobile-price-classification\""
]
},
{
"cell_type": "markdown",
"id": "65946d3d",
"metadata": {},
"source": [
"## Download Dataset From Kagglehub and Upload to MinIO with Python"
"## Download Dataset From Kagglehub and Upload to Object Storage with Python"
]
},
{
Expand All @@ -84,14 +84,14 @@
"metadata": {},
"outputs": [],
"source": [
"# Upload dataset to MinIO\n",
"# Upload dataset to object storage\n",
"if not os.getenv(\"AWS_ACCESS_KEY_ID\") or not os.getenv(\"AWS_SECRET_ACCESS_KEY\"):\n",
" raise ValueError(\"AWS credentials not found in environment variables.\")\n",
"\n",
"# Initialize S3 filesystem\n",
"s3 = s3fs.S3FileSystem()\n",
"\n",
"# Upload the dataset to MinIO\n",
"# Upload the dataset to object storage\n",
"s3.put(f\"{dataset_path}/train.csv\", f\"{s3_dataset_path}/train.csv\")\n",
"s3.put(f\"{dataset_path}/test.csv\", f\"{s3_dataset_path}/test.csv\")\n",
"\n",
Expand Down Expand Up @@ -126,16 +126,16 @@
" base_image=\"python:3.9\",\n",
")\n",
"def read_data(\n",
" minio_train_data_path: str,\n",
" minio_test_data_path: str,\n",
" train_data_path: str,\n",
" test_data_path: str,\n",
" train_df: Output[Dataset],\n",
" test_df: Output[Dataset], \n",
"):\n",
" \"\"\"Reads training and test data writes it to pipeline artifacts as parquet.\"\"\"\n",
" import pandas as pd\n",
"\n",
" df_train = pd.read_csv(minio_train_data_path)\n",
" df_test = pd.read_csv(minio_test_data_path)\n",
" df_train = pd.read_csv(train_data_path)\n",
" df_test = pd.read_csv(test_data_path)\n",
" \n",
" df_train.to_parquet(train_df.path)\n",
" df_test.to_parquet(test_df.path)"
Expand Down Expand Up @@ -490,8 +490,8 @@
"source": [
"@dsl.pipeline\n",
"def mobile_price_classification_pipeline(\n",
" minio_train_data_path: str,\n",
" minio_test_data_path: str,\n",
" train_data_path: str,\n",
" test_data_path: str,\n",
" test_size: float = 0.5,\n",
" C: List = [1, 0.1, 0.25, 0.5, 2, 0.75],\n",
" kernel: List = [\"linear\", \"rbf\"],\n",
Expand All @@ -516,11 +516,11 @@
" \n",
" # Step 1: Read the data\n",
" read_data_task = read_data(\n",
" minio_train_data_path=minio_train_data_path,\n",
" minio_test_data_path=minio_test_data_path,\n",
" train_data_path=train_data_path,\n",
" test_data_path=test_data_path,\n",
" )\n",
" # Use the cluster internal s3 endpoint\n",
" read_data_task.set_env_variable('AWS_ENDPOINT_URL',\"http://minio.minio\")\n",
" read_data_task.set_env_variable('AWS_ENDPOINT_URL', f'http://{os.environ[\"S3_ENDPOINT\"]}')\n",
" # Use Kubernetes secrets to provide AWS credentials to the read_data component\n",
" kubernetes.use_secret_as_env(\n",
" read_data_task,\n",
Expand Down Expand Up @@ -596,8 +596,8 @@
"\n",
"# Define the arguments to be passed to the pipeline\n",
"args = dict(\n",
" minio_train_data_path=f\"s3://{s3_dataset_path}/train.csv\",\n",
" minio_test_data_path=f\"s3://{s3_dataset_path}/test.csv\",\n",
" train_data_path=f\"s3://{s3_dataset_path}/train.csv\",\n",
" test_data_path=f\"s3://{s3_dataset_path}/test.csv\",\n",
" test_size=0.2,\n",
" C=[1, 0.1, 0.25, 0.5, 2, 0.75],\n",
" kernel=[\"linear\", \"rbf\"],\n",
Expand All @@ -623,7 +623,7 @@
"metadata": {},
"source": [
"# Debugging with Lightweight Components\n",
"Debugging can be challenging when using lightweight components in Kubeflow Pipelines. A practical approach is to download the artifacts from the steps preceding the failing one, from MinIO, and then run the functions used in the components locally. You can easily copy the paths to these artifacts from the Kubeflow Pipelines UI. Once you have these paths, you can use them as shown in the example below to download and read in Pandas DataFrames or perform similar operations. Make sure to adjust the paths according to your specific setup requirements."
"Debugging can be challenging when using lightweight components in Kubeflow Pipelines. A practical approach is to download the artifacts from the steps preceding the failing one from object storage, and then run the functions used in the components locally. You can easily copy the paths to these artifacts from the Kubeflow Pipelines UI. Once you have these paths, you can use them as shown in the example below to download and read in Pandas DataFrames or perform similar operations. Make sure to adjust the paths according to your specific setup requirements."
]
},
{
Expand Down
3 changes: 1 addition & 2 deletions pipelines/lightweight-python-package/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,13 @@ COMPONENTS_IMAGE = "<your-registry>/mobile-price-classification:v1"

Follow the same dataset preparation steps as in the lightweight components example:
1. Download the dataset from Kaggle
2. Upload to your MinIO bucket
2. Upload to your default object storage bucket (`<namespace>-data`)

### 4. Run the Pipeline

**From within the cluster (e.g., from a Kubeflow Notebook):**

```sh
# Update s3_bucket in submit-cluster.py first
python submit-cluster.py
```

Expand Down
21 changes: 11 additions & 10 deletions pipelines/lightweight-python-package/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Dict, List

from kfp import dsl
Expand All @@ -9,17 +10,17 @@

@dsl.component(base_image=COMPONENTS_IMAGE)
def read_data(
minio_train_data_path: str,
minio_test_data_path: str,
train_data_path: str,
test_data_path: str,
train_df: Output[Dataset],
test_df: Output[Dataset],
):
"""Reads training and test data and writes it to pipeline artifacts as parquet."""
from mobile_price_classification import read_data as _read_data

_read_data(
minio_train_data_path=minio_train_data_path,
minio_test_data_path=minio_test_data_path,
train_data_path=train_data_path,
test_data_path=test_data_path,
train_output_path=train_df.path,
test_output_path=test_df.path,
)
Expand Down Expand Up @@ -155,8 +156,8 @@ def test_model(

@dsl.pipeline
def mobile_price_classification_pipeline(
minio_train_data_path: str,
minio_test_data_path: str,
train_data_path: str,
test_data_path: str,
test_size: float = 0.5,
C: List = [1, 0.1, 0.25, 0.5, 2, 0.75],
kernel: List = ["linear", "rbf"],
Expand All @@ -182,11 +183,11 @@ def mobile_price_classification_pipeline(

# Step 1: Read the data
read_data_task = read_data(
minio_train_data_path=minio_train_data_path,
minio_test_data_path=minio_test_data_path,
train_data_path=train_data_path,
test_data_path=test_data_path,
)
# Use the cluster internal s3 endpoint
read_data_task.set_env_variable('AWS_ENDPOINT_URL',"http://minio.minio")
# Use the cluster internal object storage endpoint
read_data_task.set_env_variable('AWS_ENDPOINT_URL', f'http://{os.environ["S3_ENDPOINT"]}')
# Use Kubernetes secrets to provide AWS credentials to the read_data component
kubernetes.use_secret_as_env(
read_data_task,
Expand Down
Loading