OctaiPipe Steps#
Pipeline steps are the building blocks of the OctaiPipe framework. A pipeline step is an object that processes the data; it has data input and data output. There are a range of available native pipeline steps in OctaiPipe. The user can also create their own steps.
With pipeline steps, testing and debugging become easier as each step has a well defined scope and function. Besides, modularity eliminates redundancy in computation and allows the user to accelerate the model development phase. It also allows full auditability and reproducibility of experiment results as each step configuration has a single entry point which is a templatized configuration file. As illustrated in the diagram, each pipeline step interacts with the database to load or write data.
Pipeline Steps#
A typical OctaiPipe machine learning workflow consists of multiple pipeline steps. To execute a step, simply use the OctaiStep command:
1octaistep --step-name <step name> --config-path <config path>
Alternatively, you can use the Python command from the OctaiPipe develop
submodule:
from octaipipe import develop
step_name: str = 'preprocessing'
config_path: str = './path/to/preprocessing_config.yml'
develop.run_step_local(step_name=step_name,
config_path=config_path)
Currently we serve the following native OctaiPipe Pipeline Steps:
To run one of these steps locally, use the following step names:
Preprocessing:
preprocessing
Feature engineering:
feature_engineering
Model training:
model_training
Model evaluation:
model_evaluation
Model inference:
model_inference
WASM Model inference:
model_inference
Android Model inference:
image_inference
Data drift:
data_drift
Feature selection:
feature_selection
Data writing:
data_writing
AutoML: cannot run locally, no local step name
Custom step: user-defined step name
Data Input and Output Specs#
Every pipeline step loads data from a data source and writes some data out.
In the config files, specs of the I/O, as discussed in Data Loading and Writing Utilities,
are given in input_data_specs
and output_data_specs
respectively. These are present in the config files of all the Pipeline Steps.
Input Data Specs#
OctaiPipe provides a range of options to satisfy your storage needs. In the current release,
the input data that will be queried by each of the pipleine steps can be stored in an InfluxDB
or SQL Database. The argument datastore_type
is used to specify the data storage type. It
takes one of the two options: influxdb or sql. We look at these two cases separately.
InfluxDB Database:
Here is an example of the input_data_specs
section that reads from an InfluxDB database:
1 input_data_specs:
2 datastore_type: influxdb
3 query_type: dataframe
4 query_template_path: ./configs/data/query.txt
5 query_values:
6 start: "2022-01-05T17:10:00.000Z"
7 stop: "2022-01-05T17:40:00.000Z"
8 bucket: live-metrics
9 measurement: def-metrics
10 tags:
11 MY_TAG: value_0
12 data_converter: {}
query_type is the type of the data loaded from InfluxDB. It can be ‘influx’, ‘csv’, ‘dataframe’, ‘stream’ or ‘stream_dataframe’, corresponding to query, query_csv and query_data_frame , query_stream and query_data_frame_stream methods respectively.
data_converter
defines the specs for the data converter that is used to convert the data returned from the influx query to the state
which your pipeline steps require. You do not have to use a converter; for example, you can do all the necessary manipulations as
part of your query. Learn more about converters here: :ref:` <Influx Flat Converter>`.
In the above example, we have specified both the start
and end
timestamps of the query. A typical use case of it is in a model
training pipeline. During inference, and in some other situations, we may also want to run a pipeline step at a regular interval,
using data within a certain past time window from now. To do this, one specifies the input_data_specs
like the following:
1 input_data_specs:
2 datastore_type: influxdb
3 query_type: dataframe
4 query_template_path: ./configs/data/influx_query_periodic.txt
5 query_values:
6 start: 2m
7 bucket: live-metrics
8 measurement: def-metrics
9 tags:
10 MY_TAG: value_0
11 data_converter: {}
Namely, we do not specify the end
in query_values
, and for start
in query_values
, instead of a timestamp, we specify the size of the past time window ending now, in minute (e.g. 2m),
or in second (e.g. 10s), for which we query the data. In addition, in run_specs
of the pipeline step, one needs to specify the run_interval
;
see the pages of the individual pipeline step to which it applies.
SQL Database:
Here is an example of the input_data_specs
section that reads from a SQL database:
1 input_data_specs:
2 datastore_type: sql
3 query_values:
4 table: my_table
5 cols:
6 - col1
7 - col2
8 conditions:
9 - "col1=1"
10 - "col2<4"
Output Data Specs#
The output of the OctaiPipe pipeline steps could be saved to various Data Store types. The output_data_specs field expects a list of specs. Data will be written to each of the sinks defined.
Here is an example of the output_data_specs
section that writes to an InfluxDB, SQL and CSV:
1output_data_specs:
2 - datastore_type: influxdb
3 settings:
4 bucket: test-bucket-1
5 measurement: testv1
6 - datastore_type: sql
7 settings:
8 db_table: test_table
9 data_type: dataframe
10 - datastore_type: csv
11 settings:
12 file_path: test-bucket-1
13 delimiter: ','
14 append: true