Guide - Custom Pipeline Steps#

This page contains a few example custom pipeline steps to show how custom pipeline steps can be implemented for different requirements. The code is shown and explained to show how you can easily create custom steps to run in Kubeflow or deploy to an edge device.

For detailed documentation on how to create custom pipeline steps, see the guide: Custom Pipeline Steps

To test out custom steps locally before registering them to the database, you can use the run_step_local function from the develop submodule specifying the path to the custom step file with the local_step_path argument.

Testing custom steps#
from octaipipe import develop


step_name: str = 'my_custom_step'
local_path: str = './path/to/custom_step_file.py'
config_path: str = './path/to/custom_config_file.yml'

develop.run_step_local(step_name=step_name,
                       config_path=config_path,
                       local_step_path=local_step_path)

Parent class PipelineStep#

Central to making custom steps is the OctaiPipe parent class PipelineStep, which is a base class with a collection of methods providing standard methods for loading and writing data. Any custom step should inherit from the PipelineStep, and initialize it with the following arguments:

  • name: the name of the pipeline step. Can be any string.

  • input_data_specs: dictionary with specifications for data loading, used to set up self._sources, which is a list containing Source objects (they have attribute loader which is used to write output data).

  • output_data_specs: list of dictionaries with specifications for data writing, used to load self._sinks which is a list containing Sink objects (which have an attribute writer which is used to write output data).

We usually catch these in the kwargs part of example custom steps. For information n kwargs, see for example this Stackoverflow post.

As mentioned, the PipelineStep includes some key methods that can be used in custom steps. These are as follows:

  • self._load_data(): Method which uses self._input_data_specs to load data
    from the storage type in datastore_type. To load data from different sources (e.g.
    from different tables in a SQL database), the attribute self._input_data_specs
    can be altered.
  • self._write_data(): Method which uses self._sinks to write
    data to the datastore_type (s) defined in the output_data_specs list. This method takes a Pandas
    DataFrame as input, e.g. self._write_data(my_data_frame).

Example Custom Steps#

The following example use-cases are shown:

  1. Custom step config

  2. Custom data loading

  3. Custom data preprocessing

  4. Custom model evaluation

  5. Custom periodic step

Custom step config#

We start off by creating a custom step config, showing the connection between methods in the custom steps and the step config.

To customize inputs, it is important to understand the main two methods required in a custom step:

The __init__ method#

The __init__ method initializes the custom step class as well as the required parent class, the PipelineStep class. Everything in your config file will be handed to the __init__ method except the run_specs. For example, in the below config for preprocessing, the run_specs will be popped from the config and name, input_data_specs, output_data_specs, and preprocessing_specs are handed to the __init__ method.

 1name: preprocessing
 2
 3input_data_specs:
 4  default:
 5  - datastore_type: influxdb
 6    settings:
 7      query_type: dataframe # influx/dataframe/stream/stream_dataframe/csv
 8      query_template_path: ./configs/data/influx_query.txt
 9      query_config:
10        start: "2020-05-20T13:30:00.000Z"
11        stop: "2020-05-20T13:35:00.000Z"
12        bucket: sensors-raw
13        measurement: cat
14        tags: {}
15
16output_data_specs:
17  default:
18  - datastore_type: influxdb
19    settings:
20      bucket: test-bucket-1
21      measurement: testv1
22
23run_specs:
24  save_results: True
25  # run_interval: 2s
26  target_label: RUL
27  label_type: "int"
28  onnx_pred: False
29  train_val_test_split:
30    to_split: false
31    split_ratio:
32      training: 0.6
33      validation: 0.2
34      testing: 0.2
35
36preprocessing_specs:
37  steps: # full list of steps will be in technical documentation
38    - interpolate_missing_data
39    - remove_constant_columns
40    - make_target
41    - encode_categorical_columns
42    - normalise
43  degradation_model: linear # linear, pw_linear
44  initial_RUL: 10
45  var_threshold: 0
46  preprocessors_specs:
47    - type: onehot_encoder
48      load_existing: False
49      name: onehot_encoder_test0
50      # version: "2.3"
51      categorical_columns:
52        - "cat_col1"
53        - "cat_col2"
54    - type: minmax_scaler
55      load_existing: False
56      name: scaler_test0
57      # version: "1.1"

Let’s also have a look at the first few lines of the preprocessing step’s __init__ method to understand what happens to the configs. Here, the preprocessing_specs are explicitly named and used later on in the method, whereas the rest are stored in the kwargs dictionary and are used to initialize the parent PipelineStep class with super().__init__(**kwargs). For more information on the PipelineStep, see Custom Pipeline Steps.

First lines of preprocessing init#
def __init__(self, preprocessing_specs,  **kwargs):
    super().__init__(**kwargs)
    self._logger = logging.getLogger(__name__)

To hand any custom arguments to your custom step’s __init__ method, simply add them to the config outside the run_specs. In the example config below, we have added the two arguments str_to_print and print_times. How this works with the custom step will be shown in the step later.

 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     settings:
 6       query_type: csv
 7       query_config:
 8         filepath_or_buffer: data_in.csv
 9
10 output_data_specs:
11   default:
12   - datastore_type: local
13     settings:
14       file_path: data_out.csv
15       write_config: {}
16
17 str_to_print: 'Hello World!'
18 print_times: 5
19
20 run_specs: {}

The run method#

The run method is the second required method of any OctaiPipe pipeline step. This is the part of the step where the step is executed. After the step has been initialized (__init__ method has run), the run method runs from top to bottom. To give arguments to the run method, simply add fields to the run_specs of your config file.

In the example below, the fields print_at_run and sleep_timer have been added to the run_specs.

 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     query_type: csv
 6     query_config:
 7       filepath_or_buffer: data_in.csv
 8
 9 output_data_specs:
10   default:
11   - datastore_type: local
12     settings:
13       file_path: data_out.csv
14       write_config: {}
15
16 str_to_print: 'Hello World!'
17 print_times: 5
18
19 run_specs:
20   print_at_run: 'Hello World at run time!'
21   sleep_timer: 2

Putting it all together#

Now that we have created our custom config, we write a custom pipeline step that uses the configs we set. Below is a step that prints str_to_print in the __init__ method print_times times. At run time, the run method is executed and the value for print_at_run is printed 5 times, pausing for sleep_timer seconds after each print. The step is below:

Custom config step file#
import logging
import time
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomConfigStep(PipelineStep):

    def __init__(self, str_to_print: str, print_times: int,  **kwargs):
        super().__init__(**kwargs)
        self._logger = logging.getLogger(__name__)

        for i in range(print_times):
            print(str_to_print)

    def run(self, print_at_run: str, sleep_timer: int, **kwargs):
        for i in range(5):
            print(print_at_run)
            time.sleep(sleep_timer)

Custom data loading#

While OctaiPipe includes support for several database layers (Data Loading and Writing Utilities), users might need to extend the base package with additional data loading functionalities.

The custom step below with accompanying custom config file makes a request to an API for input data from the custom config field custom_data_url in the run_specs. We assume that the returned object has a .json() method that returns a list of dictionaries. We turn this object into a Pandas dataframe and save it to a CSV file using the native OctaiPipe data writing functionality: self._write_data().

In this way, we show how a data loading method not native to OctaiPipe can be used, saving the data in a format which allows the user to then use further OctaiPipe pipeline steps with native data loading/writing methods.

NOTE that input_data_specs still need to be specified as they are expected arguments for the PipelineStep. However, they are not used in this class as we do not call self._load_data().

Custom data load config file#
 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     settings:
 6       query_type: csv
 7       query_config:
 8         filepath_or_buffer: 'dummy'
 9
10 output_data_specs:
11   default:
12   - datastore_type: local
13     settings:
14       file_path: data_out.csv
15       write_config:
16         index: false
17
18 run_specs:
19   custom_data_url: 'https://custom-data-api-url.net/'
Custom data load step file#
import logging
import requests
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomDataLoadStep(PipelineStep):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._logger = logging.getLogger(__name__)

    def run(self, custom_data_url: str, **kwargs):
        # request data from API
        response = requests.get(custom_data_url)
        response = response.json()

        # make dataframe from response object
        data = pd.DataFrame(response)

        # write data with native method using output_data_specs
        self._write_data(data)

Custom data preprocessing#

OctaiPipe includes a handpicked set of preprocessing functionalities in the native preprocessing step (Preprocessing Step). However, many datasets require very customized preprocessing, making a perfect candidate for a custom pipeline step. The below code pulls a dataset from a local CSV file and runs a custom preprocessing method which picks out every third row in the dataset (set by custom config field every_n_row). The resulting dataframe is saved to another CSV.

Custom preprocessing#
 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     settings:
 6       query_type: csv
 7       query_config:
 8         filepath_or_buffer: './data_in.csv'
 9
10 output_data_specs:
11   default:
12   - datastore_type: local
13     settings:
14       file_path: ./data_out.csv
15       write_config:
16         index: false
17
18 run_specs:
19   every_n_row: 3
Custom preprocessing file#
import logging
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomPreprocessingStep(PipelineStep):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._logger = logging.getLogger(__name__)

    def _custom_preprocess(self, data, every_n_row: int):
        return data.iloc[::every_n_row, :]

    def run(self, every_n_row: int, **kwargs):
        # get input data with native _load_data method
        data = self._load_data()

        # run custom preprocessing
        data_preprocessed = self._custom_preprocess(data, every_n_row)

        # write preprocessed data with native method using output_data_specs
        self._write_data(data_preprocessed)

Custom model evaluation#

OctaiPipe includes its own model evaluation step Model Evaluation Step, but sometimes users need to extend this with their own evaluation metrics or methods. In addition, showing an example of a custom model evaluation step will display how to work with native OctaiPipe models in custom steps, allowing users to create custom model training and inference steps as well if necessary.

The following step is a custom model evaluation step which loads a previously trained model named example_model of version 1.0. To retrieve the trained model from OctaiPipe’s cloud model store, we use the type_model_mapping which is a dictionary matching the config name of a model (e.g. 'ridge_reg') and the respective OctaiPipe model class. We then build the model with the build method.

We evaluate the model against a dataset pulled with the native load_data method. To evaluate the model, we run the model’s predict method. This is OctaiPipe’s implementation of getting model predictions. If the original model object is needed (e.g. the sklearn class), this can be found using the model._estimator attribute.

The custom evaluation method returns the max value of the predictions as the score. This is a completely arbitrary metric, but shows how a custom metric can be implemented.

Custom model evaluation#
 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     settings:
 6       query_type: csv
 7       query_config:
 8         filepath_or_buffer: './data_in.csv'
 9
10 model_specs:
11   name: example_model
12   type: ridge_reg
13   version: "1.0"
14
15 run_specs: {}
Custom preprocessing file#
import logging
from octaipipe.model_classes.model_mapping import type_model_mapping
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomEvaluationStep(PipelineStep):

    def __init__(self, model_specs, **kwargs):
        super().__init__(**kwargs)
        self._logger = logging.getLogger(__name__)

        # get the model id and instantiate the model object
        model_type = model_specs['type']
        output_path = self._out_coll._path
        model_specs.update({'output_path': output_path,
                            'load_existing': True,
                            'model_load_specs': {
                                'version': model_specs.pop('version')}})
        self._model = type_model_mapping(model_type)(**model_specs)

        self._model.build()

    def _custom_metric(self, y_pred):
        return max(y_pred)

    def run(self, **kwargs):
        # get input data with native _load_data method
        data = self._load_data()

        y_pred = self._model.predict(data)

        score = self._custom_metric(y_pred)

        self._logger.info(f'Model got score: {score}')

Custom periodic step#

While it is great to run a pipeline step once for data processing, training, or other, we often want to run pipeline steps repeatedly in production environments. For example, data might be coming in from a machine over time and we want to run preprocessing as new data comes in.

To do this, we define a periodic pipeline step. It works the same way as a regular pipeline step, but we let the run method go through a loop which periodically pulls data, runs preprocessing and appends it to the output file. We use the custom preprocessing step from Custom data preprocessing.

Note that we set mode: a in the write config to append the data to the existing file.

Custom periodic preprocessing#
 0 name: custom_step
 1
 2 input_data_specs:
 3   default:
 4   - datastore_type: local
 5     settings:
 6       query_type: csv
 7       query_config:
 8         filepath_or_buffer: './data_in.csv'
 9
10 output_data_specs:
11   default:
12   - datastore_type: local
13     settings:
14       file_path: ./data_out.csv
15       write_config:
16         index: false
17         mode: a
18
19 run_specs:
20   every_n_row: 3
21   run_every_n_s: 10
Custom periodic preprocessing file#
import logging
import time
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomPeriodicPreprocessingStep(PipelineStep):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._logger = logging.getLogger(__name__)

    def _custom_preprocess(self, data, every_n_row: int):
        return data.iloc[::every_n_row, :]

    def run(self, every_n_row: int, run_every_n_s: int, **kwargs):

        while True:
            # get input data with native _load_data method
            data = self._load_data()

            # run custom preprocessing
            data_preprocessed = self._custom_preprocess(data, every_n_row)

            # write preprocessed data with native method using output_data_specs
            self._write_data(data_preprocessed)

            time.sleep(run_every_n_s)