Federated EDA#

In order to create production-grade data science pipelines, data scientists need as much information about the dataset they are working with as possible. However, sometimes it is not possible for the data scientist to move data from where it is being generated.

OctaiPipe has developed a federated Exploratory Data Analysis framework that allows data scientists to understand their data without the privacy and cost problems associated with moving large volumes of raw data from edge devices.

The key features of Federated EDA allow you to:

  • Get general metadata about device datasets, such as column names, number of rows of data, or number of missing values.

  • Get column statistics for each device, such as mean, standard deviation, min, max.

  • Get aggregated column statistics across devices.

Running Federated EDA#

The code snippet below specifies the code needed to run Federated EDA with OctaiPipe. The code uses the file configs/federated_eda.yml. How to configure this file is specified in the next section.

1  from octaipipe.feda.feda import FederatedEDA
2
3  feda = FederatedEDA(config_path='./configs/federated_eda.yml')
4
5  feda.run()

Configuring Federated EDA#

Below is an example configuration YAML file for federated EDA:

 1  name: feda
 2
 3  infrastructure:
 4    device_ids: [feda-test-0, feda-test-1]
 5    device_group: []
 6    image_name: 'octaipipe.azurecr.io/octaipipe_core-all_data_loaders:latest'
 7
 8  input_data_specs:
 9    devices:
10    - device: default
11      datastore_type: influxdb
12      query_type: dataframe
13      query_template_path: ./configs/influx_query.txt
14      query_values:
15        start: "2024-04-11T16:00:00.000Z"
16        stop: "2024-04-11T16:30:00.000Z"
17        bucket: test-bucket
18        measurement: feda-test
19      data_converter: {}
20    - device: feda-test-1
21      datastore_type: influxdb
22      query_type: dataframe
23      query_template_path: ./configs/influx_query.txt
24      query_values:
25        start: "2024-04-11T16:00:00.000Z"
26        stop: "2024-04-11T16:30:00.000Z"
27        bucket: test-bucket
28        measurement: feda-test
29      data_converter: {}
30
31  feda_specs:
32    ignore_columns:
33    - b
34    min_available_devices: 2
35    device_data_level: dictionary # dictionary or "column_statistics"
36    global_data_level: # "all", or None (leave empty)
37
38  run_specs: {}

Infrastructure#

The infrastructure section is similar to that of the Federated Learning configuration. The user can specify device_ids which is a list of device IDs to include in the EDA run. The device_group field is a list of device groups to include.

The image_name specifies which image to use to run federated EDA. Federated EDA is part of OctaiPipe Core and can therefore be run using an OctaiPipe Core image.

Input Data specs#

The input data specs are similar to those in the Data Loading and Writing Utilities. However, just like in Federated Learning, they contain a list in the devices field where each device can have its own specific input_data_specs. The input_data_specs therefore also need to contain the device field specifying which device the specs are for. If a device is not found in the list, the default device will be used.

FEDA specs#

This is the core configuration section of federated EDA. This section allows the user to specify what information is pulled from devices and at what level (device or global).

The ignore_columns field is a list of columns that should be ignored when running EDA on each device.

The min_available_devices field specifies the minimum number of devices that need to be available to run EDA. If not enough devices are available, EDA will fail. The server will also wait for this many devices before starting aggregation. Therefore, if data from all devices is necessary, set this to the number of total devices in the run.

The device_data_level defines what data should be collected from devices. This can take values dictionary and column_statistics. If set to dictionary, more general metadata (similar to a data dictionary) about devices will be gathered, such as column names, number of rows of data, and number of missing values in the dataset. If set to column_statistics, the following statistics will be calculated for each device:

  • Mean

  • Standard deviation

  • Min

  • Max

  • Median

  • Mode

  • Number of unique values

Note that when column_statistics are calculated, the data dictionary will be returned as well, but not vice versa.

The global_data_level defines whether the column_statistics calculated for the devices should be aggregated or not. If set to all, the statistics from individual devices will not be saved, but aggregated to show global statistics. If device_data_level is set to dictionary, global_data_level has to be set to None (leave field empty) as no column statistics are calculated. If device_data_level is column_statistics and global_data_level is None, both global and device level statistics will be saved.

If global_data_level is all or None and device_data_level is column_statistics, the following global statistics will be recorded:

  • Weighted mean

  • Pooled standard deviation

  • Global minimum

  • Global maximum

  • Average missing value count

Run specs#

The run specs should be included but left as an empty dictionary {}.

Viewing results of FEDA#

In order to view the results of Federated EDA, find the Dataset ID that is printed by the Federated EDA run. You can also retrieve this from the federated EDA object using feda._dataset_id if you cannot find it elsewhere. Then simply use the federated EDA client as below:

1  from octaipipe_core.client.feda_client import FEDAClient
2
3  dataset_id: str = "replace_with_id"
4  feda_client = FEDAClient()
5  feda_client.get_dataset_by_id(dataset_id)

It is also possible to get all previous runs with the following code:

1  from octaipipe_core.client.feda_client import FEDAClient
2
3  feda_client = FEDAClient()
4  feda_client.get_dataset()

To more easily digest the output for different configs, you can run the following code:

Top-level data dictionary#

 1  from octaipipe_core.client.feda_client import FEDAClient
 2
 3  dataset_id: str = "replace_with_id"
 4  feda_client = FEDAClient()
 5  record: dict = feda_client.get_dataset_by_id(dataset_id)
 6
 7  devices: List[dict] = [dev for dev in record['devices']]
 8
 9  devices: dict = {device['name']: dict(**device['metadata'], **device['deviceStatistics']) for device in devices}
10
11  pd.DataFrame.from_dict(devices, orient='index')

Global column statistics#

1  from octaipipe_core.client.feda_client import FEDAClient
2
3  dataset_id: str = "replace_with_id"
4  feda_client = FEDAClient()
5  record: dict = feda_client.get_dataset_by_id(dataset_id)
6
7  columns: dict = {col['name']: dict(**{'type': col['type']}, **col['statistics']) for col in record['columns']}
8
9  pd.DataFrame.from_dict(columns, orient='columns')

Device column statistics#

This creates a dictionary where each key is a device ID and the value is a Pandas dataframe with column statistics similar to the output from the global column statistics.

 1  from octaipipe_core.client.feda_client import FEDAClient
 2
 3  dataset_id: str = "replace_with_id"
 4  feda_client = FEDAClient()
 5  record: dict = feda_client.get_dataset_by_id(dataset_id)
 6
 7  devices = [dev for dev in record['devices']]
 8  dev_out = dict()
 9  for device in devices:
10      dev_columns: dict = {col['name']: dict(**{'type': col['type']}, **col['statistics']) for col in device['columns']}
11      dev_out[device['name']] = pd.DataFrame.from_dict(dev_columns, orient='columns')
12
13  dev_out["replace_with_device_id"]