Running OctaiPipe Pipelines in Kubeflow#

The purpose of this guide is to illustrate how to use OctaiKube to prepare and run an OctaiPipe pipeline in Kubeflow. To start with, OctaiPipe is an end-to-end distributed Edge AI platform optimized for training, deploying and managing models in any IoT environment. We needed a way to make running our machine learning pipelines on Kubernetes clusters simpler, more coordinated and containerized, and that’s where Kubeflow platform helps. However, to in order not to overload OctaiPipe code with more code to use Kuberflow, we developed the OctaiKube library that plays the role of an interface between OctaiPipe and Kubeflow. It includes simple functions to load the OctaiPipe library, prepare and run pipleline steps as kubeflow components and assemble them as one pipleline that could be scheduled to run periodically depending on the use’s need.

In this notebook, we will outline how to use OctaiKube to prepare and run an ML pipeline on a kubernetes cluster.

OctaiPipe provides a set of ML steps that covers most of the functionalities that a data scientist could need to build an ML workflow. You can use OctaiKube’s interface functions to list the available octaisteps.

1 ock.get_available_steps()
['preprocessing',
'feature_engineering',
'model_training',
'model_evaluation',
'model_inference',
'data_drift',
'clustering']

Let’s explore how to prepare the configuration file needed for the Model training step.

Preparing Configuration files#

Schema Definition#

We can initialise configuration files for any of those steps in two manners:

1 - We can load a strict definition of the config. This will provide you a template with the expected data types against all the items

1 ock.get_def_step_config(step_name='model_training')
'configs/model_training.yml'

Quite straightforward, right! In one line of code, you will have a template of the configuration file for that step and all you have to do is change the fields from the default values which are the expected data types against each item and adapt it to your data. The configuration file that was created looks as follows. An explanation of the different fields exists in the following link :ref:` <Octaipipe Steps>`:

name: <class 'str'>
input_data_specs:
    query_type: <class 'str'>
    query_template_path: <class 'str'>
    query_values:
        start: <class 'str'>
        stop: <class 'str'>
        bucket: <class 'str'>
        measurement: <class 'str'>
        fields: <class 'list'>
        tags: <class 'dict'>
    data_converter:
        name: <class 'str'>
        args: <class 'dict'>
output_data_specs:
    - datastore_type: <class 'str'>
      settings: <class 'dict'>
model_specs:
    load_existing: <class 'bool'>
    name: <class 'str'>
    type: <class 'str'>
    model_load_specs:
        version: <class 'str'>
    params: <class 'dict'>
run_specs:
    save_results: <class 'bool'>
    target_label: <class 'str'>
    grid_search:
        do_grid_search: <class 'bool'>
        metric: <class 'str'>
    deployment_container:
        build: <class 'bool'>
        gh_user: <class 'str'>
        repo: <class 'str'>
        workflow_id: <class 'int'>

Example Config#

2 - Alternatively, we can load an example of the filled config where we can change some values

1ock.get_example_step_config(step_name='model_training')
'configs/model_training_example.yml'
name: model_training

input_data_specs:
  query_type: dataframe
  query_template_path: ./configs/data/query.txt
  query_values:
    start: '2022-01-05T17:10:00.000Z'
    stop: '2022-01-05T17:40:00.000Z'
    bucket: live-metrics
    measurement: def-metrics
    tags:
      MY_TAG: value_0
  data_converter: {}

output_data_specs:
  - datastore_type: influxdb
    settings:
      bucket: test-bucket-1
      measurement: testv1

model_specs:
  load_existing: false
  name: test_model
  type: ridge_reg
  model_load_specs:
    version: '1.0'
  params:
    alpha: 2.0

run_specs:
  save_results: true
  target_label: EDDY_BOTTOM_PSD1
  grid_search:
    do_grid_search: true
    metric: mse

Running a Pipeline#

Next, we can check out the kfp component for each of the steps that we would like to include in our ML pipeline. In brief, a pipeline component is one step in the workflow that does a specific task, it takes takes inputs and gives back outputs. It is a containerized application that performs one step in a pipeline’s workflow. Pipeline components in Kubeflow are defined through component specifications, which includes the following:

  • The component’s interface, its inputs and outputs.

  • The component’s implementation, the container image and the command to execute.

  • The component’s metadata, such as the name and description of the component.

Use the following to print out the specification of a pipeline component related that runs an octaipipe step :

1ock.get_component(step_name='model_training')
"{'name': 'Model Training Step', 'inputs': [{'name': 'config_path', 'type': 'String', 'description': 'Step Configuration'}, {'name': 'trigger', 'type': 'Data'}], 'outputs': [{'name': 'output', 'type': 'Data'}], 'implementation': {'container': {'image': 'mltestcanv.azurecr.io/config-fix:latest', 'command': ['octaistep', '--step-name', 'model_training', '--config-path', {'inputValue': 'config_path'}, '--trigger', {'inputValue': 'trigger'}, '--output-path', {'outputPath': 'output'}]}}}"

A better formatting of the output of the function is below:

name: Model Training Step

inputs:
- {name: config_path, type: String, description: 'Step Configuration'}
- {name: trigger, type: Data}

outputs:
- {name: output, type: Data}

implementation:
container:
    image: mltestcanv.azurecr.io/config-fix:latest

    command: [
    octaistep,
    --step-name,
    model_training,
    --config-path,
    {inputValue: config_path},
    --trigger,
    {inputValue: trigger},
    --output-path,
    {outputPath: output}
    ]

Next, all you have to do is to assemble your steps into a linear pipeline. To do this, simply supply a list of step names to the assemble_pipeline method from the utils package. This function takes as arguments the name of the pipeline, the list of steps in the pipeline and the name of the experiment in kubeflow and it returns a kfp run object. It first creates kfp components from the list of steps, gets the path to the configuration files uploaded to the blob and runs the pipeline.

1 steps = [('preprocessing', 'preprocess_config_path.yml'),
2          ('model_training', 'model_train_config_path.yml'),
3          ('model_evaluation', 'model_eval_config_path.yml')]
4 ock.utils.run_pipeline('test', 'exp1',steps)
RunPipelineResult(run_id=e60a7b2c-b7e5-46a8-a4db-b5e7f5a34028)

With OctaiKube you can do even more !

The assemble_pipeline function does a single run or execution of a pipeline. You can track the progress of the run by looking at its details page on the Kubeflow Pipelines UI, where you can see the runtime graph, output artifacts, and logs for each step in the run.

However, sometimes you might need to create a repeatable run of a pipeline and that’s where the utils function recurring_pipeline_run helps. The configuration for a recurring run includes a copy of a pipeline with all parameter values specified and a run trigger. You can start a recurring run inside any experiment, and it will periodically start a new copy of the run configuration.

1from octaikube.utils import *
2recurring_pipeline_run('exp1','test',1,namespace='cyrine-chemsi')
{'created_at': datetime.datetime(2022, 7, 26, 8, 28, 53, tzinfo=tzlocal()),
'description': None,
'enabled': True,
'error': None,
'id': 'a1a62111-f70a-4d78-88a1-a25909a4eb3b',
'max_concurrency': '1',
'mode': None,
'name': 'test_26/07/2022-08:28:53',
'no_catchup': None,
'pipeline_spec': {'parameters': [{'name': 'dummy', 'value': 'dummy'}],
                'pipeline_id': 'ebd8c7e0-4c71-4f0f-b7fc-9cbddd2f233c',
                'pipeline_manifest': None,
                'pipeline_name': 'test',
                'workflow_manifest': '{"kind":"Workflow","apiVersion":"argoproj.io/v1alpha1","metadata":{"generateName":"test-","creationTimestamp":null,"labels":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.13"},"annotations":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.13","pipelines.kubeflow.org/pipeline_compilation_time":"2022-07-22T15:57:14.341968","pipelines.kubeflow.org/pipeline_spec":"{\\"inputs\\": '
                                        '[{\\"name\\": \\"dummy\\"}], '
                                        '\\"name\\": '
                                        '\\"test\\"}"}},"spec":{"templates":[{"name":"feature-engineering-step","inputs":{"parameters":[{"name":"preprocessing-step-output"}]},"outputs":{"parameters":[{"name":"feature-engineering-step-output","valueFrom":{"path":"/tmp/outputs/output/data"}}],"artifacts":[{"name":"feature-engineering-step-output","path":"/tmp/outputs/output/data"}]},"metadata":{"annotations":{"pipelines.kubeflow.org/arguments.parameters":"{\\"config_path\\": '
                                        '\\"blob {\\\\\\"container\\\\\\": '
                                        '\\\\\\"configurations\\\\\\",       '
                                        '\\\\\\"blob_name\\\\\\": '
                                        '\\\\\\"step_config_c9aa0cb2.yml\\\\\\"}\\", '
                                        '\\"trigger\\": '
                                        '\\"{{inputs.parameters.preprocessing-step-output}}\\"}","pipelines.kubeflow.org/component_ref":"{\\"digest\\": '
                                        '\\"36d71cabdb89cb528343a6ba650cc2ba9a032d50f52f1892c78cb5430251ba0d\\"}","pipelines.kubeflow.org/component_spec":"{\\"implementation\\": '
                                        '{\\"container\\": {\\"command\\": '
                                        '[\\"octaistep\\", \\"--step-name\\", '
                                        '\\"feature_engineering\\", '
                                        '\\"--config-path\\", '
                                        '{\\"inputValue\\": '
                                        '\\"config_path\\"}, \\"--trigger\\", '
                                        '{\\"inputValue\\": \\"trigger\\"}, '
                                        '\\"--output-path\\", '
                                        '{\\"outputPath\\": \\"output\\"}], '
                                        '\\"image\\": '
                                        '\\"mltestcanv.azurecr.io/config-fix:latest\\"}}, '
                                        '\\"inputs\\": [{\\"description\\": '
                                        '\\"Step Configuration\\", \\"name\\": '
                                        '\\"config_path\\", \\"type\\": '
                                        '\\"String\\"}, {\\"name\\": '
                                        '\\"trigger\\", \\"type\\": '
                                        '\\"Data\\"}], \\"name\\": \\"Feature '
                                        'Engineering Step\\", \\"outputs\\": '
                                        '[{\\"name\\": \\"output\\", '
                                        '\\"type\\": '
                                        '\\"Data\\"}]}"},"labels":{"pipelines.kubeflow.org/enable_caching":"true","pipelines.kubeflow.org/kfp_sdk_version":"1.8.13","pipelines.kubeflow.org/pipeline-sdk-type":"kfp"}},"container":{"name":"","image":"mltestcanv.azurecr.io/config-fix:latest","command":["octaistep","--step-name","feature_engineering","--config-path","blob '
                                        '{\\"container\\": '
                                        '\\"configurations\\",       '
                                        '\\"blob_name\\": '
                                        '\\"step_config_c9aa0cb2.yml\\"}","--trigger","{{inputs.parameters.preprocessing-step-output}}","--output-path","/tmp/outputs/output/data"],"resources":{},"imagePullPolicy":"Always"}},{"name":"model-training-step","inputs":{"parameters":[{"name":"feature-engineering-step-output"}]},"outputs":{"artifacts":[{"name":"model-training-step-output","path":"/tmp/outputs/output/data"}]},"metadata":{"annotations":{"pipelines.kubeflow.org/arguments.parameters":"{\\"config_path\\": '
                                        '\\"blob {\\\\\\"container\\\\\\": '
                                        '\\\\\\"configurations\\\\\\",       '
                                        '\\\\\\"blob_name\\\\\\": '
                                        '\\\\\\"step_config_b6ea6699.yml\\\\\\"}\\", '
                                        '\\"trigger\\": '
                                        '\\"{{inputs.parameters.feature-engineering-step-output}}\\"}","pipelines.kubeflow.org/component_ref":"{\\"digest\\": '
                                        '\\"a5598fea0bd9fc9cbb26570c92964bfc4eb65aada6675cdece37f1599733bdad\\"}","pipelines.kubeflow.org/component_spec":"{\\"implementation\\": '
                                        '{\\"container\\": {\\"command\\": '
                                        '[\\"octaistep\\", \\"--step-name\\", '
                                        '\\"model_training\\", '
                                        '\\"--config-path\\", '
                                        '{\\"inputValue\\": '
                                        '\\"config_path\\"}, \\"--trigger\\", '
                                        '{\\"inputValue\\": \\"trigger\\"}, '
                                        '\\"--output-path\\", '
                                        '{\\"outputPath\\": \\"output\\"}], '
                                        '\\"image\\": '
                                        '\\"mltestcanv.azurecr.io/config-fix:latest\\"}}, '
                                        '\\"inputs\\": [{\\"description\\": '
                                        '\\"Step Configuration\\", \\"name\\": '
                                        '\\"config_path\\", \\"type\\": '
                                        '\\"String\\"}, {\\"name\\": '
                                        '\\"trigger\\", \\"type\\": '
                                        '\\"Data\\"}], \\"name\\": \\"Model '
                                        'Training Step\\", \\"outputs\\": '
                                        '[{\\"name\\": \\"output\\", '
                                        '\\"type\\": '
                                        '\\"Data\\"}]}"},"labels":{"pipelines.kubeflow.org/enable_caching":"true","pipelines.kubeflow.org/kfp_sdk_version":"1.8.13","pipelines.kubeflow.org/pipeline-sdk-type":"kfp"}},"container":{"name":"","image":"mltestcanv.azurecr.io/config-fix:latest","command":["octaistep","--step-name","model_training","--config-path","blob '
                                        '{\\"container\\": '
                                        '\\"configurations\\",       '
                                        '\\"blob_name\\": '
                                        '\\"step_config_b6ea6699.yml\\"}","--trigger","{{inputs.parameters.feature-engineering-step-output}}","--output-path","/tmp/outputs/output/data"],"resources":{},"imagePullPolicy":"Always"}},{"name":"preprocessing-step","inputs":{},"outputs":{"parameters":[{"name":"preprocessing-step-output","valueFrom":{"path":"/tmp/outputs/output/data"}}],"artifacts":[{"name":"preprocessing-step-output","path":"/tmp/outputs/output/data"}]},"metadata":{"annotations":{"pipelines.kubeflow.org/arguments.parameters":"{\\"config_path\\": '
                                        '\\"blob {\\\\\\"container\\\\\\": '
                                        '\\\\\\"configurations\\\\\\",       '
                                        '\\\\\\"blob_name\\\\\\": '
                                        '\\\\\\"step_config_3d946d66.yml\\\\\\"}\\", '
                                        '\\"trigger\\": '
                                        '\\"None\\"}","pipelines.kubeflow.org/component_ref":"{\\"digest\\": '
                                        '\\"60b657eb09ad5d5250be5b5449afeeb3176ff586cbc1dd7587a0a50d34e897b8\\"}","pipelines.kubeflow.org/component_spec":"{\\"implementation\\": '
                                        '{\\"container\\": {\\"command\\": '
                                        '[\\"octaistep\\", \\"--step-name\\", '
                                        '\\"preprocessing\\", '
                                        '\\"--config-path\\", '
                                        '{\\"inputValue\\": '
                                        '\\"config_path\\"}, \\"--trigger\\", '
                                        '{\\"inputValue\\": \\"trigger\\"}, '
                                        '\\"--output-path\\", '
                                        '{\\"outputPath\\": \\"output\\"}], '
                                        '\\"image\\": '
                                        '\\"mltestcanv.azurecr.io/config-fix:latest\\"}}, '
                                        '\\"inputs\\": [{\\"description\\": '
                                        '\\"Step Configuration\\", \\"name\\": '
                                        '\\"config_path\\", \\"type\\": '
                                        '\\"String\\"}, {\\"default\\": '
                                        '\\"None\\", \\"name\\": '
                                        '\\"trigger\\", \\"optional\\": true, '
                                        '\\"type\\": \\"Data\\"}], \\"name\\": '
                                        '\\"Preprocessing Step\\", '
                                        '\\"outputs\\": [{\\"name\\": '
                                        '\\"output\\", \\"type\\": '
                                        '\\"Data\\"}]}"},"labels":{"pipelines.kubeflow.org/enable_caching":"true","pipelines.kubeflow.org/kfp_sdk_version":"1.8.13","pipelines.kubeflow.org/pipeline-sdk-type":"kfp"}},"container":{"name":"","image":"mltestcanv.azurecr.io/config-fix:latest","command":["octaistep","--step-name","preprocessing","--config-path","blob '
                                        '{\\"container\\": '
                                        '\\"configurations\\",       '
                                        '\\"blob_name\\": '
                                        '\\"step_config_3d946d66.yml\\"}","--trigger","None","--output-path","/tmp/outputs/output/data"],"resources":{},"imagePullPolicy":"Always"}},{"name":"test","inputs":{},"outputs":{},"metadata":{},"dag":{"tasks":[{"name":"feature-engineering-step","template":"feature-engineering-step","arguments":{"parameters":[{"name":"preprocessing-step-output","value":"{{tasks.preprocessing-step.outputs.parameters.preprocessing-step-output}}"}]},"dependencies":["preprocessing-step"]},{"name":"model-training-step","template":"model-training-step","arguments":{"parameters":[{"name":"feature-engineering-step-output","value":"{{tasks.feature-engineering-step.outputs.parameters.feature-engineering-step-output}}"}]},"dependencies":["feature-engineering-step"]},{"name":"preprocessing-step","template":"preprocessing-step","arguments":{}}]}}],"entrypoint":"test","arguments":{"parameters":[{"name":"dummy"}]},"serviceAccountName":"pipeline-runner"},"status":{"startedAt":null,"finishedAt":null}}'},
'resource_references': [{'key': {'id': 'ecf382d0-a884-4c71-90d0-0db247250dd2',
                                'type': 'EXPERIMENT'},
                        'name': 'exp1',
                        'relationship': 'OWNER'},
                        {'key': {'id': 'ebd8c7e0-4c71-4f0f-b7fc-9cbddd2f233c',
                                'type': 'PIPELINE_VERSION'},
                        'name': 'test',
                        'relationship': 'CREATOR'}],
'service_account': 'default-editor',
'status': 'NO_STATUS',
'trigger': {'cron_schedule': None,
            'periodic_schedule': {'end_time': None,
                                'interval_second': '1',
                                'start_time': None}},
'updated_at': datetime.datetime(2022, 7, 26, 8, 28, 53, tzinfo=tzlocal())}