Data Loading and Writing Utilities#

The database layer of OctaiPipe provides an interface for connecting to databases to load and write data to and from storage. This is beneficial as it enables:

Storing large volumes of data - Databases can store larger volumes of data compared to local systems, so providing an interface for retrieving and writing data is crucial for to-scale solutions.

Sharing data - In order to share data and results, a shared file system that can be reached by multiple users is needed. Being able to connect to a shared database enables this.

Interaction with the database#

For each data store type, there is a client, source (data loader), and a sink (data writer). A diagram showing how the client, source, and sink for InfluxDB interact with the database is shown below. The diagram shows how the InfluxDataLoader and InfluxDataWriter inherit from the base Source and Sink classes. They both also inherit from the InfluxClient, which connects to InfluxDB.

Diagram of class structure InfluxDB.

The client is set up to connect to the database and is unique for each database type. For example, the InfluxClient class sets up a connection to InfluxDB using connection credentials provided in configs or from environment variables.

Configuring data stores in OctaiPipe#

The configuration of data loading and writing in OctaiPipe has the same top-level components and structure for all data stores.

There are input_data_specs and output_data_specs for where to read and write the data respectively. Each of these contains a dictionary of device IDs (relating to the device ID in OctaiPipe Portal). For each device ID, there is a list of data specs. You can also provide a default spec that is used for all devices not specified.

Each spec contains the field datastore_type which specifies where to read data from. The available types are:

  • influxdb

  • mqtt

  • sql

  • local

  • csv

The settings field contains the specific configurations for the datastore type. The configurations for each type are in the sections below.

The connection_params are optional and tell OctaiPipe how to connect to the datastore.

In the example below, we specify input_data_specs for device device_0 to read from influx and a local CSV. The default specs tell any other devices to read only from influx.

The output_data_specs tell device_0 to write to mqtt while all other devices will pick up the specs from default and write to influx.

 1# input config
 3  device_0:
 4    - datastore_type: influxdb
 5      settings:
 6        query_template: |
 7          from(bucket: [bucket])
 8            |> range(start: [start], stop: [stop])
 9            |> filter(fn:(r) => r._measurement == [measurement])
10            |> filter(fn:(r) => [fields])
11            |> drop(columns: ["_start", "_stop", "_batch",
12                              "table", "result", "_measurement"])
13        query_config:
14          start: "2022-01-01T00:00:00.000Z"
15          stop: "2022-01-02T00:00:00.000Z"
16          bucket: bucket_1
17          measurement: metric_1
18      connection_params:
19        url: "http://influxdb:8086"
20        org: "my-org"
21        token: "my-token"
22    - datastore_type: local
23      settings:
24        query_config:
25          filepath_or_buffer: ./path/to/csv_file.csv
26  default:
27    - datastore_type: influxdb
28      settings:
29        query_template: |
30          from(bucket: [bucket])
31            |> range(start: [start], stop: [stop])
32            |> filter(fn:(r) => r._measurement == [measurement])
33            |> filter(fn:(r) => [fields])
34            |> drop(columns: ["_start", "_stop", "_batch",
35                              "table", "result", "_measurement"])
36        query_config:
37          start: "2022-01-01T00:00:00.000Z"
38          stop: "2022-01-02T00:00:00.000Z"
39          bucket: bucket_1
40          measurement: metric_1
41      connection_params:
42        url: "http://influxdb:8086"
43        org: "my-org"
44        token: "my-token"
46# output config
48  device_0:
49    - datastore_type: mqtt
50      settings:
51        topic: some_topic
52      connection_params:
53        broker_address: mosquitto
54        port: 1883
55        username: mqtt-user-1
56        password: alligator1
57        client_id: device-1-publisher
58        use_tls: false
59  default:
60    - datastore_type: influxdb
61      settings:
62        bucket: output-bucket
63        measurement: my-measurement
64      connection_params:
65        url: "http://influxdb:8086"
66        org: "my-org"
67        token: "my-token"

InfluxDB Configuration#

The connection parameters for influx take the url, org and token for the client to connect. If influx is running on the same device and docker network as OctaiPipe, the influx url can just be the name of the container and port, e.g. http://influxdb:8086.

In settings, a query_template can be provided as string or query_template_path can be provided as a path to a query_template. query_config can then be used to provide key-value pairs to replace the place holders [key] in the template.

If you wish to just provide a full query, with no placeholder, just set the full query in query_template and provide no query_configs.

1# Influx template
2from(bucket: [bucket])
3  |> range(start: [start], stop: [stop])
4  |> filter(fn:(r) => r._measurement == [measurement])
5  |> filter(fn:(r) => [fields])
6  |> drop(columns: ["_start", "_stop", "_batch",
7                    "table", "result", "_measurement"])
 1# Influx load config
 3  default:
 4  - datastore_type: influxdb
 5    connection_params:
 6       url: "http://influxdb:8086"
 7       org: "my-org"
 8       token: "my-token"
 9    settings:
10      query_template_path: ./configs/data/influx_query.txt
11      query_config:
12        start: "2022-01-01T00:00:00.000Z"
13        stop: "2022-01-02T00:00:00.000Z"
14        bucket: bucket_1
15        measurement: metric_1
16        tags:
17          TAG1: some_tag
1# Influx write config
3  default:
4  - datastore_type: influxdb
5    settings:
6      bucket: test-bucket
7      measurement: live-model-predictions

SQL Configuration#

SQL, like influx, takes a query template and fields to replace. If you wish to just specify a SQL query, just put the whole query as query_template and submit no query_config.

The connection params consists of the ODBC connection string for the database.

1% SQL template
2SELECT [cols] FROM [table] WHERE [conditions];
 1# SQL loading config
 3  default:
 4  - datastore_type: sql
 5    connection_params:
 6      connection_str: some_connection_str
 7    settings:
 8      query_template: |
 9        SELECT [cols] FROM [table] WHERE [conditions];
10      query_config:
11        table: my_table
12        cols:
13          - col1
14          - col2
15        conditions:
16          - "col1=1"
17          - "col2<4"
1# SQL writing config
3  default:
4  - datastore_type: sql
5    settings:
6      data_type: dataframe
7      db_table: output-table

MQTT Configuration#

MQTT is a lightweight messaging protocol used to send messages through a message broker. A publisher adds messages with a specific topic to a queue on the broker and clients that subscribe to the same topic can read them. It is fast and lightweight, often used for real-time, IoT applications.

OctaiPipe implements MQTT using Eclipse’s Mosquitto MQTT broker. This can be installed on devices using the install_mosquitto_on_devices function from The function takes either device_ids or device_groups, a list of device IDs or device groups to start the broker on.

To use MQTT for data loading and writing, you can use OctaiPipe’s built in MQTT data loading and writing functionalities.

MQTT Data Loading#

An example data loading configuration is shown below. The connection_params dictionary contains information given to the client on initialization, mainly used to connect to the broker. Topic is the only required argument, secifying which topic to subscribe to. The broker_address is the hostname of the device running the broker. If using MQTT with OctaiPipe edge deployments or Federated Learning, this should be set to mosquitto. The port is the port to connect on. This is 1883 by default when setting Mosquitto up with OctaiPipe. Username and password are credentials used for the broker if set up. Client ID is the name the client connects with. A default is assigned if not given. The user can also speciy use_tls to True in order to connect using TLS encryption.

For MQTT, datastore_type should be mqtt in lowercase. The query_template_path is not used, and can be left empty. The return type is specified by the query_type field and can be either dataframe to return a Pandas DataFrame or json to return a list of dictionaries.

The query_config dictionary configures data loading at runtime. Time to wait for messages when loading data is set with loop_seconds (defaults to 0). If a key in the message retrieved is the index of the data, this can be specified with index_key. Finally, if the topic needs to be set/changed when calling the load functionality, this can be set with topic. This can be used for example in FL, to specify one topic for the input_data_specs and one for evaluation_data_specs. The add_tags argument is a list specifying anytags to add to the data when reading. For a dataframe this will be added as a new column and for JSON will be added to each record dictionary. If messages are nested in a dictionary where the data needs to be accessed from a specific key, the message_name argument can be used to retrieve the data.

Input data specs for MQTT#
 2  device_0:
 3    - datastore_type: mqtt
 4      connection_params:
 5        broker_address: mosquitto
 6        port: 1883
 7        username: mqtt-user-1
 8        password: alligator1
 9        client_id: sensor-3-subscriber
10        use_tls: false
11      settings:
12        query_type: dataframe # dataframe/json
13        query_config:
14          loop_seconds: 1
15          index_key: time
16          topic: sensor-3
17          add_tags: []
18          message_name: some-name

MQTT Data Writing#

Below is an example of an MQTT data writing config output_data_specs. The connection_params are the same as for data loading, except the topic specifies where to publish messages to rather than what to subscribe to. Topic has to be specified either here or in the mqtt parameters below.

The datastore_type should be mqtt. The mqtt dictionary specifies the writing configurations at runtime. The topic is what topic messages should be published to, and will overwrite a topic specified in connection_params. To write a full dataset as one message, set write_as_one to True. Otherwise, each record (row) will be written as a single message. The write_index field specifies whether to add the index of a Pandas DataFrame as a field when writing. To save the last message sent to the queue, retain can be set to True. This can be useful when we want to add messages to be consumed by a subscriber that is yet to connect. A dictionary of tags can be added to send the message as a dictionary with the data in the field with name message_name (defaults to “data”) and additonal key-value pairs for each tag.

Output data specs for MQTT#
 2  device_0:
 3  - datastore_type: mqtt
 4    connection_params:
 5      broker_address: mosquitto
 6      port: 1883
 7      username: mqtt-user-1
 8      password: alligator1
 9      client_id: sensor-3-publisher
10      use_tls: false
11    settings:
12      topic: sensor-3
13      write_as_one: false
14      write_index: true
15      retain: false
16      tags:
17        tagId: data_tag_0
18      message_name: data_field

Local data Configuration#

Local data loading has no connection parameters. However, it is important to note that if you are running OctaiPipe with the default octaipipe user, you might need to give the octaipipe user on your machine read and write permissions to the files/folders you wish to read/write from.

Input data specs for data loading#
1# Local loading config
3  default:
4    - datastore_type: local
5      settings:
6        query_type: csv # can be csv or excel
7        query_config:
8          filepath_or_buffer: path/to/file.csv
9          skiprows: 2

The query type can be set to ‘csv’ or ‘excel’, which uses Pandas read_csv or read_excel. Query config is a dictionary, where each key-value pair represents an argument to the underlying Pandas function. In the example above, filepath_or_buffer specifies the path to the data and skiprows specifies how many rows to skip at the beginning of the file.

For excel, the minimum arguments would be io for the filepath and sheet_name if the default first sheet should not be used.

More arguments can be found for Pandas read_csv and read_excel below:

For data writing, the configs simply specify the file_path, where the extension determines how data is written. Ending the path with ‘.csv’ will make use of Pandas write_csv. Options and functions are below:

  • .csv uses Pandas to_csv

  • .xls or .xlsx uses Pandas to_excel

  • .json uses json package json.dump

  • .zip uses shutil.make_archive

The to_csv and to_excel functions can also be given arguments by including a dictionary called write_config of argument names and values. An example is below:

local data writing output data specs#
1# Local writing config
3  default:
4    - datastore_type: local
5      settings:
6        file_path: path/to/file.csv
7        write_config:
8          index: true
9          sep: ';'

NOTE: When you use data from a local filestorage on your device, you will need to make sure that OctaiPipe has read and write access to the files you want to access. To ensure this, you need to run the following code on your device before you run a deployment:

sudo chown -R root:1080 {replace with folder path}

If you cannot change file permissions on your machine, you can run OctaiPipe with root images, see OctaiPipe Images.

CSV Configuration#

Specific configurations for CSV data loading and writing. This data store type is less flexible than local data loading and writing, but is useful for simple CSV files.

CSV Data Loading Configuration#

CSV data loading supports specifying delimiters, handling headers, setting index columns, and excluding specific columns. The configuration is based on the CsvDataLoader class.

Example of input data specifications for CSV:

CSV data input data specs#
 2  device_0:
 3    - datastore_type: csv
 4      settings:
 5        file_path: /data/sensor_measurements.csv
 6        delimiter: ';'  # Semicolon-separated file
 7        headers: true   # File has a header row
 8        index:          # Columns to use as index
 9          - timestamp
10          - sensor_id
11        exclude:        # Columns to exclude from processing
12          - raw_data_hex
13          - metadata_column

CSV Data Writing Configuration#

CSV data writing allows specifying file paths, writing mode, and delimiter. The configuration is based on the CsvDataWriter class.

Example of output data specifications for CSV:

CSV data output data specs#
 2  device_0:
 3    - datastore_type: csv
 4      settings:
 5        file_path: /outputs/processed_sensor_data.csv
 6        delimiter: ','     # CSV delimiter (comma by default)
 7        append: false      # Overwrite existing file
 9  device_1:
10    - datastore_type: csv
11      settings:
12        file_path: /outputs/incremental_log.csv
13        delimiter: ';'    # Semicolon-separated file
14        append: true      # Append to existing file if it exists

Key Configuration Options#

Input Configuration:

  • file_path: Path to the CSV file

  • delimiter: File separator (,, ;, t, etc.) Defaults to comma

  • headers: Whether the file has a header row. Defaults to true

  • index: Columns to use as DataFrame index. Defaults to []

  • exclude: Columns to exclude from processing. Defaults to []

Output Configuration:

  • file_path: Destination path for the CSV file

  • delimiter: Column separator (defaults to comma)

  • append:

    • true: Append to existing file

    • false: Overwrite existing file (default behavior)

NOTE: As with the local data store type - When you use data from a local filestorage on your device, you will need to make sure that OctaiPipe has read and write access to the files you want to access. To ensure this, you need to run the following code on your device before you run a deployment:

sudo chown -R root:1080 {replace with folder path}

If you cannot change file permissions on your machine, you can run OctaiPipe with root images, see OctaiPipe Images.