Data Loading and Writing Utilities#

The database layer of OctaiPipe provides an interface for connecting to databases to load and write data to and from storage. This is beneficial as it enables:

Storing large volumes of data - Databases can store larger volumes of data compared to local systems, so providing an interface for retrieving and writing data is crucial for to-scale solutions.

Sharing data - In order to share data and results, a shared file system that can be reached by multiple users is needed. Being able to connect to a shared database enables this.

Interaction with the database#

Currently, OctaiPipe has support for InfluxDB and Azure SQL. For each database type, there is a client, source (data loader), and a sink (data writer) class. A diagram showing how the client, source, and sink for InfluxDB interact with the database is shown below. The diagram shows how the InfluxDataLoader and InfluxDataWriter inherit from the base Source and Sink classes. They both also inherit from the InfluxClient, which connects to InfluxDB. It is worth noting here that the Source and Sink do not use the same InfluxClient, but set up individual connections.

Diagram of class structure InfluxDB.

The client is set up to connect to the database and is unique for each database type. For example, the InfluxClient class sets up a connection to InfluxDB using connection credentials provided in configs or from environment variables.

The source class is used to load data from the database, and each specific source (e.g. InfluxDataLoader) inherits from the Source base class. The source base class has the abstract methods load and _compose_query, which need to be implemented by each child class and are unique for each database type. Apart from this, the load_from_query and load_from_config methods allow users to load data directly using a query or from a template populated by a config file. A typical template and config for InfluxDB and SQL are shown below:

InfluxDB Configuration#

1# Influx template
3|> range(start: [start], stop: [stop])
4|> filter(fn:(r) => r._measurement == [measurement])
5|> filter(fn: (r) => r["TAG1"] == [TAG1])
6|> filter(fn:(r) => [fields])
7|> drop(columns: ["_start", "_stop", "_batch",
8                  "table", "result", "_measurement",
9                  "table", "result"])
 1# Influx load config
 3  datastore_type: influxdb
 4  query_template_path: ./configs/data/influx_query.txt
 5  query_type: dataframe
 6  query_values:
 7    start: "2022-01-01T00:00:00.000Z"
 8    stop: "2022-01-02T00:00:00.000Z"
 9    bucket: bucket_1
10    measurement: metric_1
11    tags:
12      TAG1: some_tag
13  data_converter: {}
1# Influx write config
3  - datastore_type: influxdb
4    settings:
5      bucket: test-bucket
6      measurement: live-model-predictions

SQL Configuration#

1% SQL template
2SELECT [cols] FROM [table] WHERE [conditions];
 1# SQL loading config
 3  query_values:
 4    table: my_table
 5    cols:
 6     - col1
 7     - col2
 8    conditions:
 9     - "col1=1"
10     - "col2<4"
1# SQL writing config
3  - datastore_type: sql
4    settings:
5      data_type: dataframe
6      db_table: output-table

For both the SQL and InfluxDB templates above, the keys within brackets (e.g. [table]) are replaced with the value (in this case “my_table”).

The sink class is used to write data to a database and each sink (e.g. InfluxDataWriter) inherits from the Sink base class. Each Sink class must implement the abstract method write to define how data is written to the database type.

MQTT Configuration#

MQTT is a lightweight messaging protocol used to send messages through a message broker. A publisher adds messages with a specific topic to a queue on the broker and clients that subscribe to the same topic can read them. It is fast and lightweight, often used for real-time, IoT applications.

OctaiPipe implements MQTT using Eclipse’s Mosquitto MQTT broker. This can be installed on devices using the install_mosquitto_on_devices function from The function takes either device_ids or device_groups, a list of device IDs or device groups to start the broker on.

To use MQTT for data loading and writing, you can use OctaiPipe’s built in MQTT data loading and writing functionalities.

MQTT Data Loading#

An example data loading configuration is shown below. The connection_params dictionary contains information given to the client on initialization, mainly used to connect to the broker. Topic is the only required argument, secifying which topic to subscribe to. The broker_address is the hostname of the device running the broker. If using MQTT with OctaiPipe edge deployments or Federated Learning, this should be set to mosquitto. The port is the port to connect on. This is 1883 by default when setting Mosquitto up with OctaiPipe. Username and password are credentials used for the broker if set up. Client ID is the name the client connects with. A default is assigned if not given. The user can also speciy use_tls to True in order to connect using TLS encryption.

For MQTT, datastore_type should be mqtt in lowercase. The query_template_path is not used, and can be left empty. The return type is specified by the query_type field and can be either dataframe to return a Pandas DataFrame or json to return a list of dictionaries.

The query_values dictionary configures data loading at runtime. Time to wait for messages when loading data is set with loop_seconds (defaults to 0). If a key in the message retrieved is the index of the data, this can be specified with index_key. Finally, if the topic needs to be set/changed when calling the load functionality, this can be set with topic. This can be used for example in FL, to specify one topic for the input_data_specs and one for evaluation_data_specs. The add_tags argument is a list specifying anytags to add to the data when reading. For a dataframe this will be added as a new column and for JSON will be added to each record dictionary. If messages are nested in a dictionary where the data needs to be accessed from a specific key, the message_name argument can be used to retrieve the data.

Input data specs for MQTT#
 2  topic: sensor-3
 3  broker_address: mosquitto
 4  port: 1883
 5  username: mqtt-user-1
 6  password: alligator1
 7  client_id: sensor-3-subscriber
 8  use_tls: false
 9datastore_type: mqtt
10query_template_path: # this field is expected as input but not used
11query_type: dataframe # dataframe/json
13  loop_seconds: 1
14  index_key: time
15  topic: sensor-3
16  add_tags: []
17  message_name:

MQTT Data Writing#

Below is an example of an MQTT data writing config for inference, i.e. output_data_specs for MQTT. The connection_params are the same as for data loading, except the topic specifies where to publish messages to rather than what to subscribe to. Topic has to be specified either here or in the mqtt parameters below.

The datastore_type should be mqtt. The mqtt dictionary specifies the writing configurations at runtime. The topic is what topic messages should be published to, and will overwrite a topic specified in connection_params. To write a full dataset as one message, set write_as_one to True. Otherwise, each record (row) will be written as a single message. The write_index field specifies whether to add the index of a Pandas DataFrame as a field when writing. To save the last message sent to the queue, retain can be set to True. This can be useful when we want to add messages to be consumed by a subscriber that is yet to connect. A dictionary of tags can be added to send the message as a dictionary with the data in the field with name message_name (defaults to “data”) and additonal key-value pairs for each tag.

Output data specs for MQTT#
 2  broker_address: mosquitto
 3  port: 1883
 4  username: mqtt-user-1
 5  password: alligator1
 6  client_id: sensor-3-publisher
 7  use_tls: false
 8datastore_type: mqtt
10  topic: sensor-3
11  write_as_one: false
12  write_index: true
13  retain: false
14  tags:
15    tagId: data_tag_0
16  message_name: data_field
17name: prediction
18units: ''

NOTE that the above config is for inference. For other pipeline steps, the fields in the mqtt dictionary should be directly in output_data_specs, as below:

Output data specs for MQTT#
 1datastore_type: mqtt
 3  broker_address: mosquitto
 5  topic: sensor-3
 6  write_as_one: false
 7  write_index: true
 8  retain: false
 9  tags:
10    tagId: data_tag_0
11  message_name: data_field

Interaction with API#

ApiDataLoader class inherits from the Source base class. API uses existing abstractions to load and write the data. The source base class has the abstract methods load and _compose_query, which need to be implemented by each child class and are unique for each database type. Apart from this, the load_from_config method will allow users to load data directly from an API endpoint mentioned in a template.

The sink class is used to write data using ApiDataWriter class which inherits from the Sink base class. Sink class implements the abstract method write to define how data is written to the database type. The arguments to be passed to the write method is the data and the API endpoint. The data can be of type dataframe and json.

API Configuration#

1# API template

Local data Configuration#

1# Local template
Input data specs for data loading#
1# Local loading config
2query_template_path: 'dummy' # this field is expected as input but not used
3query_type: csv # csv or excel
5 filepath_or_buffer: path/to/file.csv
6 skiprows: 2

The local data loading configuration is a bit different from InfluxDB and Azure SQL. There is no query template as we load straight from a file. We can therefore set this field to any value. The query type can be set to ‘csv’ or ‘excel’, which uses Pandas read_csv or read_excel. Query values is a dictionary, where each key-value pair represents an argument to the underlying Pandas function. In the example above, filepath_or_buffer specifies the path to the data and skiprows specifies how many rows to skip at the beginning of the file. More arguments can be found for Pandas read_csv and read_excel below:

Read CSV: read_csv.

Read excel: read_excel.

For data writing, the configs simply specify the file_path, where the extension determines how data is written. Ending the path with ‘.csv’ will make use of Pandas write_csv. Options and functions are below:

  • .csv uses Pandas to_csv

  • .xls or .xlsx uses Pandas to_excel

  • .json uses json package json.dump

  • .zip uses shutil.make_archive

The to_csv and to_excel functions can also be given arguments by including a dictionary called write_config of argument names and values. An example is below:

local data writing output data specs#
1# Local writing config
2file_path: path/to/file.csv
4 index: True
5 sep: ';'

NOTE: When you use data from a local filestorage on your device, you will need to make sure that OctaiPipe has read and write access to the files you want to access. To ensure this, you need to run the following code on your device before you run a deployment:

sudo chown -R root:1080 {replace with folder path}

If you cannot change file permissions on your machine, you can run OctaiPipe with root images, see OctaiPipe Images.

Data Converters#

Converters are object that transform data from the format the database API return into the form that is required by the pipeline step.

Available Converters