Custom pipeline step#

This notebook shows the steps to follow for creating and saving a custom pipeline step

1. Create the pipeline step class#

The following class is a template to define a custom pipeline step.

Each pipeline step should inherit from the base class PipelineStep. At initialization, the parent class expects an output collector, a name, and two dictionaries with the input and output data specifications.

This template class has a method run_step_once in which the user defines how the step is run once. The class inherits the method periodic_step from PipelineStep which calls run_step_once at a time interval specified in the custom step config file (see below). The run method, which OctaiPipe executes when the custom class is called, This is an abstract method inherited from the PipelineStep class and has to be implemented for every pipeline step. The user defines in this run method whether the step is intended to be run once only (by calling run_step_once), or also be run periodically by calling the periodic_step inherited from PipelineStep that calls run_step_once at a regular interval.

[ ]:
import logging
from octaipipe_core.pipeline.pipeline_step import PipelineStep


class CustomStep(PipelineStep):

    def __init__(self, **kwargs):
        '''
        Class constructor. It includes a statement to inherint from
        PipelineStep class and a logger for releasing log messages.

        Args:
            Add a description of the constructor arguments.
        '''
        super().__init__(**kwargs)
        self.logger = logging.getLogger(__name__)

    def run(self, **kwargs):
        '''
        Runs the steps defined in the custom class.
        Use the "run_specs" section of the configuration
        file template to store the settings information
        related to this function.

        Args:
            Add a description of the arguments.

        Returns:
            None.

        '''

        self._logger.info('Starting custom step')

        periodic_run, period = self._run_periodic_or_once(**kwargs)

        if periodic_run:
            # periodic run
            self.periodic_step(period, **kwargs)
        else:
            # run once
            self.run_step_once(**kwargs)

        self._logger.info('Finished custom steps')

    def run_step_once(self, **kwargs):
        return

2. Create a configuration file for the new pipeline step#

Use the run_specs configuration section to save settings information of the run function. The extension of the file must be YML.

The input and output data specs must contain a datastore_type, which specifies the data storage for loading and writing data. These can be different types. If the step is to be run periodically, in run_specs one also specifies run_interval, either in minute (e.g. 2m), or in second (e.g. 10s).

name:  # Name of the configuration file

input_data_specs:
  default:
  - datastore_type:
    settings:
      query_type:
      query_template_path:
      query_config:
        start:
        bucket:
        measurement:
        fields:
        tags:

output_data_specs:
  default:
  - datastore_type: influxdb
    settings:
      bucket:
      measurement:
  - datastore_type: local
    settings:
      file_path:

run_specs:
  run_interval:

3. Save the new pipeline step#

Save and register the pipeline step. The user has the option to publicly share the created pipeline step. For that, set the public_pipeline_step to True.

Run the following command after deploying the container to create your custom pipeline step:

octaipipe_new_step --file-path {Path to the file with the class} --step-name {Name of the pipeline step}
  • file-path = path to the pipeline step python file

  • step-name = name of the pipeline step.

4. Local debugging for custom steps#

Octaipipe allows for locally running a custom pipeline step without having to register it. To do so, store the pipeline step Python file in a folder, and in the same folder, create an empty __init__.py script. Once it is prepared, run octaistep with the appropriate arguments:

octaistep --local-step-path {Path to the file with the class} --step-name {Name of the pipeline step} --config-path {Configuration file path}

Here the arguments are:

  • --local-step-path or alias -l: path to the pipeline step python file, i.e. file_path in the previous section. This argument takes precedent over --step-name.

  • --step-name: name of an existing (native or registered on database) pipeline step. If --local-step-path is not specified, or if no class is successfully loaded from the custom step file, OctaiPipe reverts to loading a pipeline step based on --step-name.

  • --config-path: path to the configuration file, i.e. config_file_path in the previous section.

At least one of --local-step-path and --step-name must be specified, otherwise an error message appears.

Example:

Consider a custom model training step Python script model_train_custom_step.py stored in the ./custom_steps folder, which also contains an empty __init__.py script and the configuration file model_train_custom_config.yml.

  • octaistep --local-step-path ./custom_steps/model_train_custom_step.py --config-path ./custom_steps/model_train_custom_config.yml runs the custom model training step.

  • octaistep --step-name model_training --config-path ./custom_steps/model_train_custom_config.yml runs native model training step

  • octaistep --local-step-path ./custom_steps/model_train_custom_step.py --step-name model_training --config-path ./custom_steps/model_train_custom_config.yml runs the custom step

  • octaistep  --config-path ./custom_steps/model_train_custom_config.yml results in an error, as neither --local-step-path nor --step-name is specified.