End-to-end cloud training to edge deployment example - classification#
This notebook contains all the necessary information needed to run an end-to-end test of OctaiPipe, training a model on the cloud and deploying it out to your own edge devices.
In order to run the notebook, you will need to have access to one or more devices to deploy to. In this example, we will use a portion of the C-MAPSS dataset provided in the Tutorials
folder of the Jupyter Notebook server. You can also use a dataset of your choice, but this would require adjusting the input and output data specs in the configuration files.
Note that this notebook exists in the OctaiPipe Jupyter Notebook Server in Kubeflow and can be run from there end-to-end. All data files and configs are included in the folder. If you wish to run the notebook from another path or with other configs, make sure to adjust the code in the notebook to fit your setup.
The notebook helps you do the following:
Load C-MAPSS data and write to Influx on one device for training
Preprocess data and train a model using a simple OctaiPipe pipeline
Deploy model out to edge device(s) with added Grafana instance
Write “live” data to all devices and view inference in Grafana
Register devices#
If you haven’t already, register your edge devices/computers via OctaiPipe web portal. Search “register devices” in the doucmentation. This example usus influxdb as a data source, but you can configure any data source you prefer.
Load C-MAPSS data and write to Influx on device for training#
Here we load a small portion of the C-MAPSS dataset which is included in the OctaiPipe Notebook Servers under the Tutorials
folder. This notebook is also included in that folder and should be able to run from the folder end-to-end. If you run this notebook from another folder, make sure to adjust the data path to the load_cmapss
function.
In order to train a classifier, we need to create a categorical output variable. We generate a categorical target by mapping RUL 0: 0-63, 1: 64-124, 2: 125. This relates to the RUL clipping, which you can find more information about here. The target is set up using string labels so that we can then use the built-in OctaiPipe label encoding functionality in preprocessing.
We also select one of the influx instances created for the devices and write the data to it to use for training. To do this, enter the device ID of the device on which you wish to store the dataset for the influx_device_id
variable below.
[ ]:
from octaipipe.helper_funcs.example_utils import (
load_cmapss,
write_data_to_devices
)
influx_device_id: str = 'REPLACE_WITH_DEVICE_ID' # e.g. my_device_0
data_path: str = './train_FD001.txt'
data = load_cmapss('./train_FD001.txt')
def rul_class_map(rul):
if rul < 64:
return 0
elif rul < 125:
return 1
else:
return 2
data['RUL_cat'] = data['RUL'].map(rul_class_map)
data.drop('RUL', axis=1, inplace=True)
write_data_to_devices(influx_device_id, data)
Preprocess data and train a model#
Now that we have set up our devices, we can build a simple OctaiPipe pipeline which preprocesses the data and trains a model on the preprocessed data.
We will be using the OctaiKube library, which helps run OctaiPipe pipelines in Kubeflow.
To run a pipeline step in OctaiPipe, we need to define two main things: the step name and the path to the configuration file. To get an example configuration file, we use the octaikube function get_example_step_config
which takes the step name as input.
After filling out the preprocessing
and model_training
configs to fit our data, we save them as ./configs/preprocessing.yml
and ./configs/model_training.yml
. Below are the two config files shown as text for the C-MAPSS dataset. If you’re using your own dataset or if you have configured the C-MAPSS data before adding to your device, these config files would need to be amended. For more information about the model type see the XGBoost
documentation.
For information on these pipeline steps and their configs, see the documentation for the model training step and preprocessing step. If you wish to get information on available OctaiPipe pipeline steps or how to make your own pipeline steps see the relevant documentation.
Note down the model version that is created during training, as this will be used later on in inference.
The ./configs/data/influx_query.txt
file is also pasted below for reference.
./configs/preprocessing_cat.yml
name: preprocessing
input_data_specs:
datastore_type: influxdb
query_type: dataframe
query_template_path: ./configs/data/influx_query.txt
query_values:
start: "2023-01-01T00:00:00.000Z"
stop: "2023-01-01T05:00:00.000Z"
bucket: test-bucket
measurement: sensors-raw
tags: {}
data_converter: {}
output_data_specs:
datastore_type: influxdb
bucket: test-bucket
measurement: sensors-preprocessed
run_specs:
save_results: True
target_label: RUL_cat
label_type: "int"
onnx_pred: False
train_val_test_split:
to_split: false
split_ratio:
training: 0.6
validation: 0.2
testing: 0.2
preprocessing_specs:
steps:
- normalise
preprocessors_specs:
- type: minmax_scaler
load_existing: False
name: scaler_example
degradation_model: pw_linear
initial_RUL: 125
./configs/model_training_cat.yml
name: model_training
input_data_specs:
datastore_type: influxdb
query_type: dataframe
query_template_path: ./configs/data/influx_query.txt
query_values:
start: "2023-01-01T00:00:00.000Z"
stop: "2023-01-01T05:00:00.000Z"
bucket: test-bucket
measurement: sensors-preprocessed
tags: {}
data_converter: {}
output_data_specs:
datastore_type: influxdb
bucket: test-bucket
measurement: tmp
model_specs:
type: xgb_class
load_existing: false
name: test_model_cat
model_load_specs:
version: '1.1'
params:
learning_rate: 0.1
run_specs:
save_results: True
target_label: RUL_cat
grid_search:
do_grid_search: false
grid_definition: ./configs/model_grids/xgb_class.yml
metric: 'accuracy'
./configs/data/influx_query.txt
from(bucket:[bucket])
|> range(start: [start], stop: [stop])
|> filter(fn:(r) => r._measurement == [measurement])
|> filter(fn:(r) => [fields])
|> keep(columns: ["_time", "_field", "_value"])
|> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
|> drop(columns: ["_start", "_stop", "_batch",
"table", "result", "_measurement",
"_result", "id"])
Once we have set up the config files and query template, we can run the pipeline using the OctaiKube utils function run_pipeline
. We specifically set the environment variables so that we can reach influx through Kubeflow and specify which OctaiPipe image to use. If you have added data to one of the devices set up in step 2, you can use the org name testOrg
, token token123asd123asd
and the URL of the device and port 8086 to reach it.
Note that we also have to set the NAMESPACE
environment variable. This is the Kubeflow workspace you are using, often your firstname-lastname.
[ ]:
import octaikube as ock
import os
os.environ['NAMESPACE'] = 'your-namespace'
influx_device_ip_address: str = 'insert_device_ip_here'
# Make sure to change the image name to your organization's image
ock.utils.run_pipeline(name="test_pipeline",
exp_name="test_exp",
steps=[('preprocessing', './configs/preprocessing_cat.yml'),
('model_training', './configs/model_training_cat.yml')],
env_vars={'INFLUX_URL': f'http://{influx_device_ip_address}:8086/',
'INFLUX_ORG': 'testOrg',
'INFLUX_TOKEN': 'token123asd123asd'},
image_name='stableoctaipiperepo.azurecr.io/master:latest')
[ ]:
from octaipipe import models
import pandas as pd
# Check all models with name 'test_model' to find version
mods = [mod for mod in models.find_all_models() if mod['modelName'] == 'test_model_cat']
mods = pd.DataFrame(mods).sort_values(by='version', ascending=False)
print(f'Available models are: \n{mods}\n')
# Get scalers
scalers = [mod for mod in models.find_all_models() if mod['modelName'] == 'scaler_example']
scalers = pd.DataFrame(scalers).sort_values(by='version', ascending=False)
print(f'Available scalers are: \n{scalers}')
Deploy model out to edge devices#
Once the model has been trained, we will deploy the model back out to our edge devices that we set up in steps 1 and 2.
The model version is retrieved from the output of the model training. This can be found in Kubeflow pipelines either from the Kubeflow UI or by pressing the Run details
link from the model training pipeline. Substitute the model version in the model inference config file. The inference config file, as in training, is pasted below together with the preprocessing config. We also paste the inference query template as well as the deployment config.
It is worth noting that for preprocessing we do not specify the target_label
as this is not present in the inference data.
The deployment config specifies what the deployment should look like, including the pipeline steps and their config file paths. We also specify that we wish to deploy Grafana to each edge device. This means we will get an instance of Grafana running on each device with a default dashboard, which tracks time-to-failure predictions (such as for the C-MAPSS dataset).
We also add a deployment name and description to the deployment by setting the name
and description
parameters when running the deployment function.
./configs/preprocessing_inference_cat.yml
name: preprocessing
input_data_specs:
datastore_type: influxdb
query_type: dataframe
query_template_path: ./configs/data/inference_query.txt
query_values:
start: -1m
bucket: test-bucket
measurement: sensors-live
data_converter:
{}
output_data_specs:
- datastore_type: influxdb
settings:
bucket: test-bucket
measurement: live-preprocessed
run_specs:
save_results: True
target_label:
label_type: "int"
onnx_pred: False
train_val_test_split:
to_split: false
split_ratio:
training: 0.6
validation: 0.2
testing: 0.2
run_interval: 10s
preprocessing_specs:
steps:
- normalise
preprocessors_specs:
- type: minmax_scaler
load_existing: True
name: scaler_example
version: "4.0" # Check with model module
degradation_model: pw_linear
initial_RUL: 125
./configs/model_inference_cat.yml
name: model_inference
input_data_specs:
datastore_type: influxdb
query_type: dataframe
query_template_path: ./configs/data/inference_query.txt
query_values:
start: 30s
bucket: test-bucket
measurement: live-preprocessed
data_converter: {}
output_data_specs:
- datastore_type: influxdb
setings:
bucket: test-bucket
measurement: live-predictions
- datastore_type: local
settings:
path: "./logs/results"
model_specs:
name: test_model_cat
type: xgb_class
version: "1.1" # check training output for version number
run_specs:
prediction_period: 5s
save_results: True
onnx_pred: false
./configs/data/inference_query.yml
from(bucket:[bucket])
|> range(start: [start])
|> filter(fn:(r) => r._measurement == [measurement])
|> filter(fn:(r) => [fields])
|> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
|> drop(columns: ["_start", "_stop", "_batch",
"table", "result", "_measurement",
"_result", "id"])
./configs/deployment_config_cat.yml
name: deployment_config
device_ids:
- device_01
- device_02
- device_03
image_name: stableoctaipiperepo.azurecr.io/master:latest
env: {}
datasources:
environment:
- INFLUX_ORG
- INFLUX_URL
- INFLUX_TOKEN
- INFLUX_DEFAULT_BUCKET
env_file:
# - ./credentials/.env
grafana_deployment: edge
grafana_cloud_config_path:
pipelines:
- preprocessing:
config_path: preprocessing_inference_cat.yml
- model_inference:
config_path: model_inference_cat.yml
[ ]:
from octaipipe.deployment.edge.octaideploy import octaideploy
octaideploy(compose_action='start',
config_path='./configs/deployment_config_cat.yml',
name='RUL classification',
description='RUL classification model deployed for inference')
Write live data to all devices and view inference#
In this step, we use a simple script to feed C-MAPSS dataset piece-wise to the influx databases on the devices registered in previous steps. This is to simulate live data coming into the devices the way it would during a production environment. It will supply the inference deployment on the devices with input data, which will the provide predictions for Grafana to plot.
Since this is just a toy example, we feed the same dataset to the model as we did during training, just to see that the edge deployment works.
Start the live data feeding by entering your device IDs into the devices
list and run the write_live_data
function. Note that this will run until the dataset reaches its end (total runtime around 30 minutes). If you wish to stop it before this, simply interrupt the cell.
The deployment with Octaideploy
above should generate a URL at which the Grafana instance for each device can be reached. The initial username and password for Grafana is admin
and admin
, but this can be changed after first login.
To check out Grafana, go to the URL provided and enter the username and password. There is currently no default classification dashboard in OctaiPipe, but if you wish to make one yourself, you can do so from the Grafana UI using the inference data as input.
[ ]:
from octaipipe.helper_funcs.example_utils import (
write_live_data, load_cmapss
)
# Fill this with your device IDs as strings
devices: list = ['device_0', 'device_1']
data_path: str = './train_FD001.txt'
data = load_cmapss('./train_FD001.txt')
write_live_data(devices, data)
Remove deployment#
Finally, we run Octaideploy once more to remove the edge deployment from each device. This is done by providing Octaideploy with the down
keyword and deployment ID shown in the deployment outputs.
To also remove the Grafana instance, we add the _grafana
suffix to the deployment ID.
[ ]:
octaideploy(compose_action='down', deployment_id='someID_grafana')
Remove Influx instances#
This is an optional step to remove the Influx instances from devices once the example is done. If you wish to keep the Influx instances on the devices, you can skip this step.
We simply send a custom command to kill the influxdb container and remove the image from the device.
[ ]:
from octaipipe.client.device_client import DeviceClient
custom_cmd = 'docker kill influxdb && docker image rm influxdb:latest'
device_list: list = ['testdev1']
dev_client = DeviceClient()
for dev in device_list:
dev_client.connect_device(dev)
dev_client.execute_command(custom_cmd)
[ ]: