Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
models/
.ipynb_checkpoints
*.gz
*.csv
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"python.pythonPath": "/home/yasha/anaconda3/bin/python"
}
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
# Function Templates and Examples

This repo stores ML and data processing related functions and examples
This repo stores ML and data processing related functions and examples:

## open archive
Open an archive and extract its contents.

## archive to parquet
Retrieve an archive, extract its contents and store as a parquet file.

## xgboost model server
Load a serialized xgboost model and deploy as a model server.


78 changes: 78 additions & 0 deletions datagen/binary_classes/binary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
n_samp# Copyright 2019 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from typing import Optional, List, Any
from sklearn.datasets import make_classification

from mlrun.execution import MLClientCtx


def create_binary_classification(
context: MLClientCtx = None,
n_samples: int = 100_000,
m_features: int = 20,
features_hdr: Optional[List[str]] = None,
weight: float = 0.50,
random_state=1,
filename: Optional[str] = None,
target_path: str = "",
key: str = "",
**sk_params,
):
"""Create a binary classification sample dataset and save.
If no filename is given it will default to:
'simdata-{n_samples}X{m_features}.parquet'.
All of the scikit-learn parameters can be set using **sk_params
:param context: function context
:param n_samples: number of rows/samples
:param m_features: number of cols/features
:param features_hdr: header for features array
:param weight: fraction of sample (neg)
:param random_state: rng seed (see https://scikit-learn.org/stable/glossary.html#term-random-state)
:param filename: optional name for stored data file
:param target_path: destimation for file
:param key: key of data in artifact store
:param sk_params: keyword arguments for scikit-learn's 'make_classification'
Returns filename of created data (includes path).
"""
# check directories exist and create filename if None:
os.makedirs(target_path, exist_ok=True)
if not filename:
name = f"simdata-{n_samples:0.0e}X{m_features}.parquet".replace("+", "")
filename = os.path.join(target_path, name)

features, labels = make_classification(
n_samples=n_samples,
n_features=m_features,
weights=[weight], # False
n_classes=2,
random_state=random_state,
**sk_params,
)

# make dataframes, add column names, concatenate (X, y)
X = pd.DataFrame(features)
if not features_hdr:
X.columns = ["feat_" + str(x) for x in range(m_features)]
else:
X.columns = features_hdr

y = pd.DataFrame(labels, columns=["labels"])
data = pd.concat([X, y], axis=1)

pq.write_table(pa.Table.from_pandas(data), filename)
context.log_artifact(key, target_path=filename)
43 changes: 43 additions & 0 deletions fileutils/arc_to_parquet/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## arc_to_parquet

Retrieve a remote archive and save locally as a parquet file, [source](arc_to_parquet.py)

Usage example:

```python
# load function from Github
xfn = mlrun.import_function('https://github.com/mlrun/functions/master/fileutils/arc_to_parquet/arc_to_parquet.yaml')

# configure function: mount on Iguazio data fabric, set as interactive (return stdout)
xfn.apply(mlrun.mount_v3io())
xfn.interactive = True

# create and run the task
images_path = '/User/mlrun/functions/images'
archive = 'https://fpsignals-public.s3.amazonaws.com/x_test_50.csv.gz'

arc_to_parq_task = mlrun.NewTask('arc2parq',
handler='arc_to_parquet',
params={
'target_path': target_path,
'name' : 'x_test_50.csv',
'key' : 'raw_data',
'archive_url': archive})
# run
run = xfn.run(open_archive_task)
```

Output:

```
[mlrun] 2020-01-09 21:28:47,515 starting run arc2parq uid=ed20cbdcddb3473882507594f69e6180 -> http://mlrun-api:8080
[mlrun] 2020-01-09 21:29:03,735 destination file does not exist, downloading
[mlrun] 2020-01-09 21:29:03,873 saved table to /User/mlrun/functions/parquet/x_test_50.parquet
[mlrun] 2020-01-09 21:29:03,873 logging /User/mlrun/functions/parquet/x_test_50.parquet to context

[mlrun] 2020-01-09 21:29:03,898 run executed, status=completed
...
to track results use .show() or .logs() or in CLI:
!mlrun get run ed20cbdcddb3473882507594f69e6180 , !mlrun logs ed20cbdcddb3473882507594f69e6180
[mlrun] 2020-01-09 21:29:06,867 run executed, status=completed
```
69 changes: 69 additions & 0 deletions fileutils/arc_to_parquet/arc_to_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import os
from pathlib import Path
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa

from mlrun.execution import MLClientCtx
from typing import IO, AnyStr, Union, List, Optional


def arc_to_parquet(
context: MLClientCtx,
archive_url: Union[str, Path, IO[AnyStr]],
header: Optional[List[str]] = None,
target_path: str = "",
name: str = "",
chunksize: int = 10_000,
log_data: bool = True,
add_uid: bool = False,
key: str = "raw_data",
) -> None:
"""Open a file/object archive and save as a parquet file.

:param context: function context
:param archive_url: any valid string path consistent with the path variable
of pandas.read_csv, including strings as file paths, as urls,
pathlib.Path objects, etc...
:param header: column names
:param target_path: destination folder of table
:param name: name file to be saved locally, also
:param chunksize: (0) row size retrieved per iteration
:param log_data: (True) if True, log the data so that it is available
at the next step
:param add_uid: (False) add the metadata uid to the target_path so that
runs can be identified
:param key: key in artifact store (when log_data=True)
"""
if not name.endswith(".parquet"):
name += ".parquet"

if not add_uid:
uid = ""
else:
uid = context.uid

dest_path = os.path.join(target_path, uid, name)
os.makedirs(os.path.join(target_path, uid), exist_ok=True)

if not os.path.isfile(dest_path):
context.logger.info("destination file does not exist, downloading")
pqwriter = None
for i, df in enumerate(
pd.read_csv(archive_url, chunksize=chunksize, names=header)
):
table = pa.Table.from_pandas(df)
if i == 0:
pqwriter = pq.ParquetWriter(dest_path, table.schema)
pqwriter.write_table(table)

if pqwriter:
pqwriter.close()

context.logger.info(f"saved table to {dest_path}")
else:
context.logger.info("destination file already exists")

if log_data:
context.logger.info(f"assign data to {key} in artifact store")
context.log_artifact(key, target_path=dest_path)
13 changes: 13 additions & 0 deletions fileutils/arc_to_parquet/arc_to_parquet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
kind: job
metadata:
name: arc_to_parquet
spec:
description: 'archive to parquet and log'
build:
functionSourceCode: IyBHZW5lcmF0ZWQgYnkgbnVjbGlvLmV4cG9ydC5OdWNsaW9FeHBvcnRlciBvbiAyMDIwLTAxLTA5IDE3OjA3CgppbXBvcnQgb3MKCmZyb20gbWxydW4uZXhlY3V0aW9uIGltcG9ydCBNTENsaWVudEN0eApmcm9tIHR5cGluZyBpbXBvcnQgSU8sIEFueVN0ciwgVW5pb24sIExpc3QKZnJvbSBwYXRobGliIGltcG9ydCBQYXRoCgppbXBvcnQgcGFuZGFzIGFzIHBkCmltcG9ydCBweWFycm93LnBhcnF1ZXQgYXMgcHEKaW1wb3J0IHB5YXJyb3cgYXMgcGEKCmRlZiBhcmNfdG9fcGFycXVldCgKICAgIGNvbnRleHQ6IE1MQ2xpZW50Q3R4LAogICAgYXJjaGl2ZV91cmw6IFVuaW9uW3N0ciwgUGF0aCwgSU9bQW55U3RyXV0sCiAgICBoZWFkZXI6IFVuaW9uW05vbmUsIExpc3Rbc3RyXV0gPSBOb25lLAogICAgdGFyZ2V0X3BhdGg6IHN0ciA9ICIiLAogICAgbmFtZTogc3RyID0gIiIsCiAgICBjaHVua3NpemU6IGludCA9IDEwXzAwMCwKICAgIGxvZ19kYXRhOiBib29sID0gVHJ1ZSwKICAgIGtleTogc3RyID0gJ3Jhd19kYXRhJwopIC0+IE5vbmU6CiAgICAiIiJPcGVuIGEgZmlsZS9vYmplY3QgYXJjaGl2ZSBhbmQgc2F2ZSBhcyBhIHBhcnF1ZXQgZmlsZS4KICAgIAogICAgQXJnczoKICAgIDpwYXJhbSBjb250ZXh0OiAgICAgZnVuY3Rpb24gY29udGV4dAogICAgOnBhcmFtIGFyY2hpdmVfdXJsOiBhbnkgdmFsaWQgc3RyaW5nIHBhdGggY29uc2lzdGVudCB3aXRoIHRoZSBwYXRoIHZhcmlhYmxlCiAgICAgICAgICAgICAgICAgICAgICAgIG9mIHBhbmRhcy5yZWFkX2Nzdi4gbmNsdWRpbmcgc3RyaW5ncyBhcyBmaWxlIHBhdGhzLCBhcyB1cmxzLCAKICAgICAgICAgICAgICAgICAgICAgICAgcGF0aGxpYi5QYXRoIG9iamVjdHMsIGV0Yy4uLgogICAgOnBhcmFtIGhlYWRlcjogICAgICBjb2x1bW4gbmFtZXMKICAgIDpwYXJhbSB0YXJnZXRfcGF0aDogZGVzdGluYXRpb24gZm9sZGVyIG9mIHRhYmxlCiAgICA6cGFyYW0gbmFtZTogICAgICAgIG5hbWUgZmlsZSB0byBiZSBzYXZlZCBsb2NhbGx5LCBhbHNvCiAgICA6cGFyYW0gY2h1bmtzaXplOiAgICgwKSByb3cgc2l6ZSByZXRyaWV2ZWQgcGVyIGl0ZXJhdGlvbgogICAgOnBhcmFtIGxvZ19kYXRhOiAgICAoVHJ1ZSkgaWYgVHJ1ZSwgbG9nIHRoZSBkYXRhIHNvIHRoYXQgaXQgaXMgYXZhaWxhYmxlCiAgICAgICAgICAgICAgICAgICAgICAgIGF0IHRoZSBuZXh0IHN0ZXAKICAgICIiIgogICAgb3MubWFrZWRpcnModGFyZ2V0X3BhdGgsIGV4aXN0X29rPVRydWUpCgogICAgaWYgbm90IG5hbWUuZW5kc3dpdGgoIi5wYXJxdWV0Iik6CiAgICAgICAgbmFtZSArPSAiLnBhcnF1ZXQiCgogICAgZGVzdF9wYXRoID0gb3MucGF0aC5qb2luKHRhcmdldF9wYXRoLCBuYW1lKQoKICAgIGlmIG5vdCBvcy5wYXRoLmlzZmlsZShkZXN0X3BhdGgpOgogICAgICAgIGNvbnRleHQubG9nZ2VyLmluZm8oImRlc3RpbmF0aW9uIGZpbGUgZG9lcyBub3QgZXhpc3QsIGRvd25sb2FkaW5nIikKICAgICAgICBwcXdyaXRlciA9IE5vbmUKICAgICAgICBmb3IgaSwgZGYgaW4gZW51bWVyYXRlKAogICAgICAgICAgICBwZC5yZWFkX2NzdihhcmNoaXZlX3VybCwgY2h1bmtzaXplPWNodW5rc2l6ZSwgbmFtZXM9aGVhZGVyKQogICAgICAgICk6CiAgICAgICAgICAgIHRhYmxlID0gcGEuVGFibGUuZnJvbV9wYW5kYXMoZGYpCiAgICAgICAgICAgIGlmIGkgPT0gMDoKICAgICAgICAgICAgICAgIHBxd3JpdGVyID0gcHEuUGFycXVldFdyaXRlcihkZXN0X3BhdGgsIHRhYmxlLnNjaGVtYSkKICAgICAgICAgICAgcHF3cml0ZXIud3JpdGVfdGFibGUodGFibGUpCgogICAgICAgIGlmIHBxd3JpdGVyOgogICAgICAgICAgICBwcXdyaXRlci5jbG9zZSgpCgogICAgICAgIGNvbnRleHQubG9nZ2VyLmluZm8oZiJzYXZlZCB0YWJsZSB0byB7ZGVzdF9wYXRofSIpCiAgICBlbHNlOgogICAgICAgIGNvbnRleHQubG9nZ2VyLmluZm8oImRlc3RpbmF0aW9uIGZpbGUgZXhpc3RzIikKCiAgICBpZiBsb2dfZGF0YToKICAgICAgICBjb250ZXh0LmxvZ2dlci5pbmZvKGYibG9nZ2luZyB7ZGVzdF9wYXRofSB0byBjb250ZXh0IikKICAgICAgICBjb250ZXh0LmxvZ19hcnRpZmFjdChrZXksIHRhcmdldF9wYXRoPWRlc3RfcGF0aCkKCg==
base_image: python:3.6-jessie
commands:
- pip install -q mlrun
- pip install -q pyarrow
- pip install -q numpy
- pip install -q pandas
12 changes: 6 additions & 6 deletions fileutils/README.md → fileutils/open_archive/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,23 @@

## open_archive

Example function which can open a remote zip archive into a local target folder, [see source](file_utils.py).
Open a remote zip archive into a local target folder, [ource](file_utils.py).

Usage example:

```python
# load function from Github
xfn = mlrun.import_function('https://github.com/mlrun/functions/master/fileutils/function.yaml')
xfn = mlrun.import_function('https://github.com/mlrun/functions/master/fileutils/open_archive/function.yaml')

# configute it: mount on iguazio fabric, set as interactive (return stdout)
xfn.apply(mlrun.mount_v3io())
xfn.interactive = True

# create and run the task
images_path = '/User/mlrun/examples/images'
images_path = '/User/mlrun/functions/images'
open_archive_task = mlrun.NewTask('download', handler='open_archive',
params={'target_dir': images_path},
inputs={'archive_url': 'http://iguazio-sample-data.s3.amazonaws.com/catsndogs.zip'})
params={'target_dir': images_path},
inputs={'archive_url': 'http://iguazio-sample-data.s3.amazonaws.com/catsndogs.zip'})

# run
run = xfn.run(open_archive_task)
Expand All @@ -40,4 +40,4 @@ type result.show() to see detailed results/progress or use CLI:
!mlrun get run --uid 2ec277feb3b644e2a45c92ce8cb2537a
[mlrun] 2019-10-28 22:31:03,699 run executed, status=completed

```
```
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ def open_archive(context,

context.logger.info(f'extracted archive to {target_dir}')
context.log_artifact('content', target_path=target_dir)


File renamed without changes.
Loading