End-to-end cloud training to edge deployment example - classification#

This notebook contains all the necessary information needed to run an end-to-end test of OctaiPipe, training a model on the cloud and deploying it out to your own edge devices.

In order to run the notebook, you will need to have access to one or more devices to deploy to. In this example, we will use a portion of the C-MAPSS dataset provided in the Tutorials folder of the Jupyter Notebook server. You can also use a dataset of your choice, but this would require adjusting the input and output data specs in the configuration files.

Note that this notebook exists in the OctaiPipe Jupyter Notebook Server in Kubeflow and can be run from there end-to-end. All data files and configs are included in the folder. If you wish to run the notebook from another path or with other configs, make sure to adjust the code in the notebook to fit your setup.

The notebook helps you do the following:

  1. Load C-MAPSS data and write to Influx on one device for training

  2. Preprocess data and train a model using a simple OctaiPipe pipeline

  3. Deploy model out to edge device(s) with added Grafana instance

  4. Write “live” data to all devices and view inference in Grafana

  5. Remove deployment from device(s)

  6. Remove Influx instances from devices

Register devices#

If you haven’t already, register your edge devices/computers via OctaiPipe web portal. Search “register devices” in the doucmentation. This example usus influxdb as a data source, but you can configure any data source you prefer.

Load C-MAPSS data and write to Influx on device for training#

Here we load a small portion of the C-MAPSS dataset which is included in the OctaiPipe Notebook Servers under the Tutorials folder. This notebook is also included in that folder and should be able to run from the folder end-to-end. If you run this notebook from another folder, make sure to adjust the data path to the load_cmapss function.

In order to train a classifier, we need to create a categorical output variable. We generate a categorical target by mapping RUL 0: 0-63, 1: 64-124, 2: 125. This relates to the RUL clipping, which you can find more information about here. The target is set up using string labels so that we can then use the built-in OctaiPipe label encoding functionality in preprocessing.

We also select one of the influx instances created for the devices and write the data to it to use for training. To do this, enter the device ID of the device on which you wish to store the dataset for the influx_device_id variable below.

[ ]:
from octaipipe.helper_funcs.example_utils import (
    load_cmapss,
    write_data_to_devices
)


influx_device_id: str = 'REPLACE_WITH_DEVICE_ID' # e.g. my_device_0

data_path: str = './train_FD001.txt'
data = load_cmapss('./train_FD001.txt')


def rul_class_map(rul):
    if rul < 64:
        return 0
    elif rul < 125:
        return 1
    else:
        return 2


data['RUL_cat'] = data['RUL'].map(rul_class_map)
data.drop('RUL', axis=1, inplace=True)

write_data_to_devices(influx_device_id, data)

Preprocess data and train a model#

Now that we have set up our devices, we can build a simple OctaiPipe pipeline which preprocesses the data and trains a model on the preprocessed data.

We will be using the OctaiKube library, which helps run OctaiPipe pipelines in Kubeflow.

To run a pipeline step in OctaiPipe, we need to define two main things: the step name and the path to the configuration file. To get an example configuration file, we use the octaikube function get_example_step_config which takes the step name as input.

After filling out the preprocessing and model_training configs to fit our data, we save them as ./configs/preprocessing.yml and ./configs/model_training.yml. Below are the two config files shown as text for the C-MAPSS dataset. If you’re using your own dataset or if you have configured the C-MAPSS data before adding to your device, these config files would need to be amended. For more information about the model type see the XGBoost documentation.

For information on these pipeline steps and their configs, see the documentation for the model training step and preprocessing step. If you wish to get information on available OctaiPipe pipeline steps or how to make your own pipeline steps see the relevant documentation.

Note down the model version that is created during training, as this will be used later on in inference.

The ./configs/data/influx_query.txt file is also pasted below for reference.

./configs/preprocessing_cat.yml

name: preprocessing

input_data_specs:
  datastore_type: influxdb
  query_type: dataframe
  query_template_path: ./configs/data/influx_query.txt
  query_values:
    start: "2023-01-01T00:00:00.000Z"
    stop: "2023-01-01T05:00:00.000Z"
    bucket: test-bucket
    measurement: sensors-raw
    tags: {}
  data_converter: {}

output_data_specs:
  datastore_type: influxdb
  bucket: test-bucket
  measurement: sensors-preprocessed

run_specs:
  save_results: True
  target_label: RUL_cat
  label_type: "int"
  onnx_pred: False
  train_val_test_split:
    to_split: false
    split_ratio:
      training: 0.6
      validation: 0.2
      testing: 0.2

preprocessing_specs:
  steps:
    - normalise
  preprocessors_specs:
    - type: minmax_scaler
      load_existing: False
      name: scaler_example
  degradation_model: pw_linear
  initial_RUL: 125

./configs/model_training_cat.yml

name: model_training

input_data_specs:
  datastore_type: influxdb
  query_type: dataframe
  query_template_path: ./configs/data/influx_query.txt
  query_values:
    start: "2023-01-01T00:00:00.000Z"
    stop: "2023-01-01T05:00:00.000Z"
    bucket: test-bucket
    measurement: sensors-preprocessed
    tags: {}
  data_converter: {}

output_data_specs:
  datastore_type: influxdb
  bucket: test-bucket
  measurement: tmp

model_specs:
  type: xgb_class
  load_existing: false
  name: test_model_cat
  model_load_specs:
    version: '1.1'
  params:
    learning_rate: 0.1

run_specs:
  save_results: True
  target_label: RUL_cat
  grid_search:
    do_grid_search: false
    grid_definition: ./configs/model_grids/xgb_class.yml
    metric: 'accuracy'

./configs/data/influx_query.txt

from(bucket:[bucket])
    |> range(start: [start], stop: [stop])
    |> filter(fn:(r) => r._measurement == [measurement])
    |> filter(fn:(r) => [fields])
    |> keep(columns: ["_time", "_field", "_value"])
    |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
    |> drop(columns: ["_start", "_stop", "_batch",
                      "table", "result", "_measurement",
                      "_result", "id"])

Once we have set up the config files and query template, we can run the pipeline using the OctaiKube utils function run_pipeline. We specifically set the environment variables so that we can reach influx through Kubeflow and specify which OctaiPipe image to use. If you have added data to one of the devices set up in step 2, you can use the org name testOrg, token token123asd123asd and the URL of the device and port 8086 to reach it.

Note that we also have to set the NAMESPACE environment variable. This is the Kubeflow workspace you are using, often your firstname-lastname.

[ ]:
import octaikube as ock
import os


os.environ['NAMESPACE'] = 'your-namespace'

influx_device_ip_address: str = 'insert_device_ip_here'
# Make sure to change the image name to your organization's image
ock.utils.run_pipeline(name="test_pipeline",
                       exp_name="test_exp",
                       steps=[('preprocessing', './configs/preprocessing_cat.yml'),
                              ('model_training', './configs/model_training_cat.yml')],
                       env_vars={'INFLUX_URL': f'http://{influx_device_ip_address}:8086/',
                                 'INFLUX_ORG': 'testOrg',
                                 'INFLUX_TOKEN': 'token123asd123asd'},
                       image_name='stableoctaipiperepo.azurecr.io/master:latest')
[ ]:
from octaipipe import models
import pandas as pd


# Check all models with name 'test_model' to find version
mods = [mod for mod in models.find_all_models() if mod['modelName'] == 'test_model_cat']
mods = pd.DataFrame(mods).sort_values(by='version', ascending=False)
print(f'Available models are: \n{mods}\n')

# Get scalers
scalers = [mod for mod in models.find_all_models() if mod['modelName'] == 'scaler_example']
scalers = pd.DataFrame(scalers).sort_values(by='version', ascending=False)
print(f'Available scalers are: \n{scalers}')

Deploy model out to edge devices#

Once the model has been trained, we will deploy the model back out to our edge devices that we set up in steps 1 and 2.

The model version is retrieved from the output of the model training. This can be found in Kubeflow pipelines either from the Kubeflow UI or by pressing the Run details link from the model training pipeline. Substitute the model version in the model inference config file. The inference config file, as in training, is pasted below together with the preprocessing config. We also paste the inference query template as well as the deployment config.

It is worth noting that for preprocessing we do not specify the target_label as this is not present in the inference data.

The deployment config specifies what the deployment should look like, including the pipeline steps and their config file paths. We also specify that we wish to deploy Grafana to each edge device. This means we will get an instance of Grafana running on each device with a default dashboard, which tracks time-to-failure predictions (such as for the C-MAPSS dataset).

We also add a deployment name and description to the deployment by setting the name and description parameters when running the deployment function.

./configs/preprocessing_inference_cat.yml

name: preprocessing

input_data_specs:
  datastore_type: influxdb
  query_type: dataframe
  query_template_path: ./configs/data/inference_query.txt
  query_values:
    start: -1m
    bucket: test-bucket
    measurement: sensors-live
  data_converter:
    {}

output_data_specs:
  - datastore_type: influxdb
    settings:
      bucket: test-bucket
      measurement: live-preprocessed

run_specs:
  save_results: True
  target_label:
  label_type: "int"
  onnx_pred: False
  train_val_test_split:
    to_split: false
    split_ratio:
      training: 0.6
      validation: 0.2
      testing: 0.2
  run_interval: 10s

preprocessing_specs:
  steps:
    - normalise
  preprocessors_specs:
    - type: minmax_scaler
      load_existing: True
      name: scaler_example
      version: "4.0" # Check with model module
  degradation_model: pw_linear
  initial_RUL: 125

./configs/model_inference_cat.yml

name: model_inference

input_data_specs:
  datastore_type: influxdb
  query_type: dataframe
  query_template_path: ./configs/data/inference_query.txt
  query_values:
    start: 30s
    bucket: test-bucket
    measurement: live-preprocessed
  data_converter: {}

output_data_specs:
  - datastore_type: influxdb
    setings:
      bucket: test-bucket
      measurement: live-predictions
  - datastore_type: local
    settings:
      path: "./logs/results"

model_specs:
  name: test_model_cat
  type: xgb_class
  version: "1.1" # check training output for version number

run_specs:
  prediction_period: 5s
  save_results: True
  onnx_pred: false

./configs/data/inference_query.yml

from(bucket:[bucket])
    |> range(start: [start])
    |> filter(fn:(r) => r._measurement == [measurement])
    |> filter(fn:(r) => [fields])
    |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
    |> drop(columns: ["_start", "_stop", "_batch",
                      "table", "result", "_measurement",
                      "_result", "id"])

./configs/deployment_config_cat.yml

name: deployment_config

device_ids:
  - device_01
  - device_02
  - device_03

image_name: stableoctaipiperepo.azurecr.io/master:latest

env: {}

datasources:
  environment:
    - INFLUX_ORG
    - INFLUX_URL
    - INFLUX_TOKEN
    - INFLUX_DEFAULT_BUCKET
  env_file:
    # - ./credentials/.env

grafana_deployment: edge
grafana_cloud_config_path:

pipelines:
  - preprocessing:
      config_path: preprocessing_inference_cat.yml
  - model_inference:
      config_path: model_inference_cat.yml
[ ]:
from octaipipe.deployment.edge.octaideploy import octaideploy


octaideploy(compose_action='start',
            config_path='./configs/deployment_config_cat.yml',
            name='RUL classification',
            description='RUL classification model deployed for inference')

Write live data to all devices and view inference#

In this step, we use a simple script to feed C-MAPSS dataset piece-wise to the influx databases on the devices registered in previous steps. This is to simulate live data coming into the devices the way it would during a production environment. It will supply the inference deployment on the devices with input data, which will the provide predictions for Grafana to plot.

Since this is just a toy example, we feed the same dataset to the model as we did during training, just to see that the edge deployment works.

Start the live data feeding by entering your device IDs into the devices list and run the write_live_data function. Note that this will run until the dataset reaches its end (total runtime around 30 minutes). If you wish to stop it before this, simply interrupt the cell.

The deployment with Octaideploy above should generate a URL at which the Grafana instance for each device can be reached. The initial username and password for Grafana is admin and admin, but this can be changed after first login.

To check out Grafana, go to the URL provided and enter the username and password. There is currently no default classification dashboard in OctaiPipe, but if you wish to make one yourself, you can do so from the Grafana UI using the inference data as input.

[ ]:
from octaipipe.helper_funcs.example_utils import (
    write_live_data, load_cmapss
)


# Fill this with your device IDs as strings
devices: list = ['device_0', 'device_1']

data_path: str = './train_FD001.txt'
data = load_cmapss('./train_FD001.txt')

write_live_data(devices, data)

Remove deployment#

Finally, we run Octaideploy once more to remove the edge deployment from each device. This is done by providing Octaideploy with the down keyword and deployment ID shown in the deployment outputs.

To also remove the Grafana instance, we add the _grafana suffix to the deployment ID.

[ ]:
octaideploy(compose_action='down', deployment_id='someID_grafana')

Remove Influx instances#

This is an optional step to remove the Influx instances from devices once the example is done. If you wish to keep the Influx instances on the devices, you can skip this step.

We simply send a custom command to kill the influxdb container and remove the image from the device.

[ ]:
from octaipipe.client.device_client import DeviceClient


custom_cmd = 'docker kill influxdb && docker image rm influxdb:latest'

device_list: list = ['testdev1']
dev_client = DeviceClient()
for dev in device_list:
    dev_client.connect_device(dev)
    dev_client.execute_command(custom_cmd)
[ ]: