OctaiPipe Steps#

Pipeline step’s’ are the building blocks of the OctaiPipe framework. A pipeline step is an object that processes the data; it has data input and data output. There are a range of available native pipeline steps in OctaiPipe. The user can also create their own steps.

some-image

OctaiPipe Pipeline Steps#

With pipeline steps, testing and debugging become easier as each step has a well defined scope and function. Besides, modularity eliminates redundancy in computation and allows the user to accelerate the model development phase. It also allows full auditability and reproducibility of experiment results as each step configuration has a single entry point which is a templatized configuration file. As illustrated in the diagram, each pipeline step interacts with the database to load or write data.

Pipeline Steps#

A typical OctaiPipe machine learning workflow consists of multiple pipeline steps. To execute a step, simply use the OctaiStep command:

1octaistep --step-name <step name> --config-path <config path>

Alternatively, you can use the Python command from the OctaiPipe develop submodule:

Running OctaiStep#
from octaipipe import develop


step_name: str = 'preprocessing'
config_path: str = './path/to/preprocessing_config.yml'

develop.run_step_local(step_name=step_name,
                       config_path=config_path)

Currently we serve the following native OctaiPipe Pipeline Steps:

To run one of these steps locally, use the following step names:

  • Preprocessing: preprocessing

  • Feature engineering: feature_engineering

  • Model training: model_training

  • Model evaluation: model_evaluation

  • Model inference: model_inference

  • WASM Model inference: model_inference

  • Data drift: data_drift

  • Feature selection: feature_selection

  • AutoML: cannot run locally, no local step name

  • Custom step: user-defined step name

Data Input and Output Specs#

Every pipeline step loads data from a data source and writes some data out. In the config files, specs of the I/O, as discussed in Data Loading and Writing Utilities, are given in input_data_specs and output_data_specs respectively. These are present in the config files of all the Pipeline Steps.

Input Data Specs#

OctaiPipe provides a range of options to satisfy your storage needs. In the current release, the input data that will be queried by each of the pipleine steps can be stored in an InfluxDB or SQL Database. The argument datastore_type is used to specify the data storage type. It takes one of the two options: influxdb or sql. We look at these two cases separately.

  • InfluxDB Database:

Here is an example of the input_data_specs section that reads from an InfluxDB database:

 1 input_data_specs:
 2   datastore_type: influxdb
 3   query_type: dataframe
 4   query_template_path: ./configs/data/query.txt
 5   query_values:
 6     start: "2022-01-05T17:10:00.000Z"
 7     stop: "2022-01-05T17:40:00.000Z"
 8     bucket: live-metrics
 9     measurement: def-metrics
10     tags:
11       MY_TAG: value_0
12   data_converter: {}

query_type is the type of the data loaded from InfluxDB. It can be ‘influx’, ‘csv’, ‘dataframe’, ‘stream’ or ‘stream_dataframe’, corresponding to query, query_csv and query_data_frame , query_stream and query_data_frame_stream methods respectively.

data_converter defines the specs for the data converter that is used to convert the data returned from the influx query to the state which your pipeline steps require. You do not have to use a converter; for example, you can do all the necessary manipulations as part of your query. Learn more about converters here: :ref:` <Influx Flat Converter>`.

In the above example, we have specified both the start and end timestamps of the query. A typical use case of it is in a model training pipeline. During inference, and in some other situations, we may also want to run a pipeline step at a regular interval, using data within a certain past time window from now. To do this, one specifies the input_data_specs like the following:

 1 input_data_specs:
 2   datastore_type: influxdb
 3   query_type: dataframe
 4   query_template_path: ./configs/data/influx_query_periodic.txt
 5   query_values:
 6     start: 2m
 7     bucket: live-metrics
 8     measurement: def-metrics
 9     tags:
10       MY_TAG: value_0
11   data_converter: {}

Namely, we do not specify the end in query_values, and for start in query_values, instead of a timestamp, we specify the size of the past time window ending now, in minute (e.g. 2m), or in second (e.g. 10s), for which we query the data. In addition, in run_specs of the pipeline step, one needs to specify the run_interval; see the pages of the individual pipeline step to which it applies.

  • SQL Database:

Here is an example of the input_data_specs section that reads from a SQL database:

 1 input_data_specs:
 2   datastore_type: sql
 3     query_values:
 4       table: my_table
 5       cols:
 6         - col1
 7         - col2
 8       conditions:
 9         - "col1=1"
10         - "col2<4"

Output Data Specs#

The output of the OctaiPipe pipeline steps could be saved to various Data Store types. The output_data_specs field expects a list of specs. Data will be written to each of the sinks defined.

Here is an example of the output_data_specs section that writes to an InfluxDB, SQL and CSV:

 1output_data_specs:
 2  - datastore_type: influxdb
 3    settings:
 4      bucket: test-bucket-1
 5      measurement: testv1
 6  - datastore_type: sql
 7    settings:
 8      db_table: test_table
 9      data_type: dataframe
10  - datastore_type: csv
11    settings:
12      file_path: test-bucket-1
13      delimiter: ','
14      append: true