Custom pipeline step#
This notebook shows the steps to follow for creating and saving a custom pipeline step
1. Create the pipeline step class#
The following class is a template to define a custom pipeline step.
Each pipeline step should inherit from the base class PipelineStep
. At initialization, the parent class expects an output collector, a name, and two dictionaries with the input and output data specifications.
This template class has a method run_step_once in which the user defines how the step is run once. The class inherits the method periodic_step from PipelineStep which calls run_step_once at a time interval specified in the custom step config file (see below). The run method, which OctaiPipe executes when the custom class is called, This is an abstract method inherited from the PipelineStep class and has to be implemented for every pipeline step. The user defines in this run method whether the step is intended to be run once only (by calling run_step_once), or also be run periodically by calling the periodic_step inherited from PipelineStep that calls run_step_once at a regular interval.
[ ]:
import logging
from octaipipe_core.pipeline.pipeline_step import PipelineStep
class CustomStep(PipelineStep):
def __init__(self, **kwargs):
'''
Class constructor. It includes a statement to inherint from
PipelineStep class and a logger for releasing log messages.
Args:
Add a description of the constructor arguments.
'''
super().__init__(**kwargs)
self.logger = logging.getLogger(__name__)
def run(self, **kwargs):
'''
Runs the steps defined in the custom class.
Use the "run_specs" section of the configuration
file template to store the settings information
related to this function.
Args:
Add a description of the arguments.
Returns:
None.
'''
self._logger.info('Starting custom step')
periodic_run, period = self._run_periodic_or_once(**kwargs)
if periodic_run:
# periodic run
self.periodic_step(period, **kwargs)
else:
# run once
self.run_step_once(**kwargs)
self._logger.info('Finished custom steps')
def run_step_once(self, **kwargs):
return
2. Create a configuration file for the new pipeline step#
Use the run_specs configuration section to save settings information of the run function. The extension of the file must be YML.
The input and output data specs must contain a datastore_type, which specifies the data storage for loading and writing data. These can be different types. If the step is to be run periodically, in run_specs one also specifies run_interval, either in minute (e.g. 2m), or in second (e.g. 10s).
name: # Name of the configuration file
input_data_specs:
default:
- datastore_type:
settings:
query_type:
query_template_path:
query_config:
start:
bucket:
measurement:
fields:
tags:
output_data_specs:
default:
- datastore_type: influxdb
settings:
bucket:
measurement:
- datastore_type: local
settings:
file_path:
run_specs:
run_interval:
3. Save the new pipeline step#
Save and register the pipeline step. The user has the option to publicly share the created pipeline step. For that, set the public_pipeline_step to True.
Run the following command after deploying the container to create your custom pipeline step:
octaipipe_new_step --file-path {Path to the file with the class} --step-name {Name of the pipeline step}
file-path
= path to the pipeline step python filestep-name
= name of the pipeline step.
4. Local debugging for custom steps#
Octaipipe allows for locally running a custom pipeline step without having to register it. To do so, store the pipeline step Python file in a folder, and in the same folder, create an empty __init__.py
script. Once it is prepared, run octaistep with the appropriate arguments:
octaistep --local-step-path {Path to the file with the class} --step-name {Name of the pipeline step} --config-path {Configuration file path}
Here the arguments are:
--local-step-path
or alias-l
: path to the pipeline step python file, i.e. file_path in the previous section. This argument takes precedent over--step-name
.--step-name
: name of an existing (native or registered on database) pipeline step. If--local-step-path
is not specified, or if no class is successfully loaded from the custom step file, OctaiPipe reverts to loading a pipeline step based on--step-name
.--config-path
: path to the configuration file, i.e.config_file_path
in the previous section.
At least one of --local-step-path
and --step-name
must be specified, otherwise an error message appears.
Example:
Consider a custom model training step Python script model_train_custom_step.py
stored in the ./custom_steps
folder, which also contains an empty __init__.py
script and the configuration file model_train_custom_config.yml
.
octaistep --local-step-path ./custom_steps/model_train_custom_step.py --config-path ./custom_steps/model_train_custom_config.yml
runs the custom model training step.octaistep --step-name model_training --config-path ./custom_steps/model_train_custom_config.yml
runs native model training stepoctaistep --local-step-path ./custom_steps/model_train_custom_step.py --step-name model_training --config-path ./custom_steps/model_train_custom_config.yml
runs the custom stepoctaistep --config-path ./custom_steps/model_train_custom_config.yml
results in an error, as neither--local-step-path
nor--step-name
is specified.