Guide - Custom Pipeline Steps#
This page contains a few example custom pipeline steps to show how custom pipeline steps can be implemented for different requirements. The code is shown and explained to show how you can easily create custom steps to run in Kubeflow or deploy to an edge device.
For detailed documentation on how to create custom pipeline steps, see the guide: Custom Pipeline Steps
To test out custom steps locally before registering them to the database, you can
use the run_step_local
function from the develop submodule specifying the path
to the custom step file with the local_step_path
argument.
from octaipipe import develop
step_name: str = 'my_custom_step'
local_path: str = './path/to/custom_step_file.py'
config_path: str = './path/to/custom_config_file.yml'
develop.run_step_local(step_name=step_name,
config_path=config_path,
local_step_path=local_step_path)
Parent class PipelineStep#
Central to making custom steps is the OctaiPipe parent class PipelineStep
, which
is a base class with a collection of methods providing standard methods for loading
and writing data. Any custom step should inherit from the PipelineStep
, and initialize
it with the following arguments:
name
: the name of the pipeline step. Can be any string.input_data_specs
: dictionary with specifications for data loading, used to set upself._sources
, which is a list containing Source objects (they have attributeloader
which is used to write output data).output_data_specs
: list of dictionaries with specifications for data writing, used to loadself._sinks
which is a list containing Sink objects (which have an attributewriter
which is used to write output data).
We usually catch these in the kwargs
part of example custom steps. For information
n kwargs
, see for example this Stackoverflow post.
As mentioned, the PipelineStep includes some key methods that can be used in custom steps. These are as follows:
self._load_data()
: Method which usesself._input_data_specs
to load datafrom the storage type indatastore_type
. To load data from different sources (e.g.from different tables in a SQL database), the attributeself._input_data_specs
can be altered.self._write_data()
: Method which usesself._sinks
to writedata to thedatastore_type
(s) defined in the output_data_specs list. This method takes a PandasDataFrame as input, e.g.self._write_data(my_data_frame)
.
Example Custom Steps#
The following example use-cases are shown:
Custom step config#
We start off by creating a custom step config, showing the connection between methods in the custom steps and the step config.
To customize inputs, it is important to understand the main two methods required in a custom step:
The __init__
method#
The __init__
method initializes the custom step class as well as the required
parent class, the PipelineStep
class. Everything in your config file will be
handed to the __init__
method except the run_specs
. For example, in the
below config for preprocessing, the run_specs
will be popped from the config
and name
, input_data_specs
, output_data_specs
, and preprocessing_specs
are handed to the __init__
method.
1name: preprocessing
2
3input_data_specs:
4 default:
5 - datastore_type: influxdb
6 settings:
7 query_type: dataframe # influx/dataframe/stream/stream_dataframe/csv
8 query_template_path: ./configs/data/influx_query.txt
9 query_config:
10 start: "2020-05-20T13:30:00.000Z"
11 stop: "2020-05-20T13:35:00.000Z"
12 bucket: sensors-raw
13 measurement: cat
14 tags: {}
15
16output_data_specs:
17 default:
18 - datastore_type: influxdb
19 settings:
20 bucket: test-bucket-1
21 measurement: testv1
22
23run_specs:
24 save_results: True
25 # run_interval: 2s
26 target_label: RUL
27 label_type: "int"
28 onnx_pred: False
29 train_val_test_split:
30 to_split: false
31 split_ratio:
32 training: 0.6
33 validation: 0.2
34 testing: 0.2
35
36preprocessing_specs:
37 steps: # full list of steps will be in technical documentation
38 - interpolate_missing_data
39 - remove_constant_columns
40 - make_target
41 - encode_categorical_columns
42 - normalise
43 degradation_model: linear # linear, pw_linear
44 initial_RUL: 10
45 var_threshold: 0
46 preprocessors_specs:
47 - type: onehot_encoder
48 load_existing: False
49 name: onehot_encoder_test0
50 # version: "2.3"
51 categorical_columns:
52 - "cat_col1"
53 - "cat_col2"
54 - type: minmax_scaler
55 load_existing: False
56 name: scaler_test0
57 # version: "1.1"
Let’s also have a look at the first few lines of the preprocessing step’s __init__
method to understand what happens to the configs. Here, the preprocessing_specs
are explicitly named and used later on in the method, whereas the rest are stored
in the kwargs
dictionary and are used to initialize the parent PipelineStep class
with super().__init__(**kwargs)
. For more information on the PipelineStep, see
Custom Pipeline Steps.
def __init__(self, preprocessing_specs, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
To hand any custom arguments to your custom step’s __init__
method, simply add
them to the config outside the run_specs. In the example config below, we have added
the two arguments str_to_print
and print_times
. How this works with the custom
step will be shown in the step later.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 settings:
6 query_type: csv
7 query_config:
8 filepath_or_buffer: data_in.csv
9
10 output_data_specs:
11 default:
12 - datastore_type: local
13 settings:
14 file_path: data_out.csv
15 write_config: {}
16
17 str_to_print: 'Hello World!'
18 print_times: 5
19
20 run_specs: {}
The run
method#
The run
method is the second required method of any OctaiPipe pipeline step.
This is the part of the step where the step is executed. After the step has been
initialized (__init__
method has run), the run
method runs from top to bottom.
To give arguments to the run
method, simply add fields to the run_specs
of
your config file.
In the example below, the fields print_at_run
and sleep_timer
have been added
to the run_specs
.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 query_type: csv
6 query_config:
7 filepath_or_buffer: data_in.csv
8
9 output_data_specs:
10 default:
11 - datastore_type: local
12 settings:
13 file_path: data_out.csv
14 write_config: {}
15
16 str_to_print: 'Hello World!'
17 print_times: 5
18
19 run_specs:
20 print_at_run: 'Hello World at run time!'
21 sleep_timer: 2
Putting it all together#
Now that we have created our custom config, we write a custom pipeline step that uses
the configs we set. Below is a step that prints str_to_print
in the __init__
method print_times
times. At run time, the run
method is executed and the
value for print_at_run
is printed 5 times, pausing for sleep_timer
seconds
after each print. The step is below:
import logging
import time
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomConfigStep(PipelineStep):
def __init__(self, str_to_print: str, print_times: int, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
for i in range(print_times):
print(str_to_print)
def run(self, print_at_run: str, sleep_timer: int, **kwargs):
for i in range(5):
print(print_at_run)
time.sleep(sleep_timer)
Custom data loading#
While OctaiPipe includes support for several database layers (Data Loading and Writing Utilities), users might need to extend the base package with additional data loading functionalities.
The custom step below with accompanying custom config file makes a request to an
API for input data from the custom config field custom_data_url
in the run_specs
.
We assume that the returned object has a .json()
method that returns a list
of dictionaries. We turn this object into a Pandas dataframe and save it to a CSV
file using the native OctaiPipe data writing functionality: self._write_data()
.
In this way, we show how a data loading method not native to OctaiPipe can be used, saving the data in a format which allows the user to then use further OctaiPipe pipeline steps with native data loading/writing methods.
NOTE that input_data_specs
still need to be specified as they are expected
arguments for the PipelineStep. However, they are not used in this class as we do
not call self._load_data()
.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 settings:
6 query_type: csv
7 query_config:
8 filepath_or_buffer: 'dummy'
9
10 output_data_specs:
11 default:
12 - datastore_type: local
13 settings:
14 file_path: data_out.csv
15 write_config:
16 index: false
17
18 run_specs:
19 custom_data_url: 'https://custom-data-api-url.net/'
import logging
import requests
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomDataLoadStep(PipelineStep):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
def run(self, custom_data_url: str, **kwargs):
# request data from API
response = requests.get(custom_data_url)
response = response.json()
# make dataframe from response object
data = pd.DataFrame(response)
# write data with native method using output_data_specs
self._write_data(data)
Custom data preprocessing#
OctaiPipe includes a handpicked set of preprocessing functionalities in the native
preprocessing step (Preprocessing Step). However, many datasets require very
customized preprocessing, making a perfect candidate for a custom pipeline step.
The below code pulls a dataset from a local CSV file and runs a custom preprocessing
method which picks out every third row in the dataset (set by custom config field
every_n_row
). The resulting dataframe is saved to another CSV.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 settings:
6 query_type: csv
7 query_config:
8 filepath_or_buffer: './data_in.csv'
9
10 output_data_specs:
11 default:
12 - datastore_type: local
13 settings:
14 file_path: ./data_out.csv
15 write_config:
16 index: false
17
18 run_specs:
19 every_n_row: 3
import logging
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomPreprocessingStep(PipelineStep):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
def _custom_preprocess(self, data, every_n_row: int):
return data.iloc[::every_n_row, :]
def run(self, every_n_row: int, **kwargs):
# get input data with native _load_data method
data = self._load_data()
# run custom preprocessing
data_preprocessed = self._custom_preprocess(data, every_n_row)
# write preprocessed data with native method using output_data_specs
self._write_data(data_preprocessed)
Custom model evaluation#
OctaiPipe includes its own model evaluation step Model Evaluation Step, but sometimes users need to extend this with their own evaluation metrics or methods. In addition, showing an example of a custom model evaluation step will display how to work with native OctaiPipe models in custom steps, allowing users to create custom model training and inference steps as well if necessary.
The following step is a custom model evaluation step which loads a previously trained
model named example_model
of version 1.0
. To retrieve the trained model from
OctaiPipe’s cloud model store, we use the type_model_mapping
which is a dictionary
matching the config name of a model (e.g. 'ridge_reg'
) and the respective
OctaiPipe model class. We then build the model with the build
method.
We evaluate the model against a dataset pulled with the native load_data
method.
To evaluate the model, we run the model’s predict
method. This is OctaiPipe’s
implementation of getting model predictions. If the original model object is needed
(e.g. the sklearn class), this can be found using the model._estimator
attribute.
The custom evaluation method returns the max value of the predictions as the score. This is a completely arbitrary metric, but shows how a custom metric can be implemented.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 settings:
6 query_type: csv
7 query_config:
8 filepath_or_buffer: './data_in.csv'
9
10 model_specs:
11 name: example_model
12 type: ridge_reg
13 version: "1.0"
14
15 run_specs: {}
import logging
from octaipipe.model_classes.model_mapping import type_model_mapping
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomEvaluationStep(PipelineStep):
def __init__(self, model_specs, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
# get the model id and instantiate the model object
model_type = model_specs['type']
output_path = self._out_coll._path
model_specs.update({'output_path': output_path,
'load_existing': True,
'model_load_specs': {
'version': model_specs.pop('version')}})
self._model = type_model_mapping(model_type)(**model_specs)
self._model.build()
def _custom_metric(self, y_pred):
return max(y_pred)
def run(self, **kwargs):
# get input data with native _load_data method
data = self._load_data()
y_pred = self._model.predict(data)
score = self._custom_metric(y_pred)
self._logger.info(f'Model got score: {score}')
Custom periodic step#
While it is great to run a pipeline step once for data processing, training, or other, we often want to run pipeline steps repeatedly in production environments. For example, data might be coming in from a machine over time and we want to run preprocessing as new data comes in.
To do this, we define a periodic pipeline step. It works the same way as a regular
pipeline step, but we let the run
method go through a loop which periodically pulls
data, runs preprocessing and appends it to the output file. We use the custom preprocessing
step from Custom data preprocessing.
Note that we set mode: a
in the write config to append the data to the existing
file.
0 name: custom_step
1
2 input_data_specs:
3 default:
4 - datastore_type: local
5 settings:
6 query_type: csv
7 query_config:
8 filepath_or_buffer: './data_in.csv'
9
10 output_data_specs:
11 default:
12 - datastore_type: local
13 settings:
14 file_path: ./data_out.csv
15 write_config:
16 index: false
17 mode: a
18
19 run_specs:
20 every_n_row: 3
21 run_every_n_s: 10
import logging
import time
import pandas as pd
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomPeriodicPreprocessingStep(PipelineStep):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._logger = logging.getLogger(__name__)
def _custom_preprocess(self, data, every_n_row: int):
return data.iloc[::every_n_row, :]
def run(self, every_n_row: int, run_every_n_s: int, **kwargs):
while True:
# get input data with native _load_data method
data = self._load_data()
# run custom preprocessing
data_preprocessed = self._custom_preprocess(data, every_n_row)
# write preprocessed data with native method using output_data_specs
self._write_data(data_preprocessed)
time.sleep(run_every_n_s)