Federated EDA#
In order to create production-grade data science pipelines, data scientists need as much information about the dataset they are working with as possible. However, sometimes it is not possible for the data scientist to move data from where it is being generated.
OctaiPipe has developed a federated Exploratory Data Analysis framework that allows data scientists to understand their data without the privacy and cost problems associated with moving large volumes of raw data from edge devices.
The key features of Federated EDA allow you to:
Get general metadata about device datasets, such as column names, number of rows of data, or number of missing values.
Get column statistics for each device, such as mean, standard deviation, min, max.
Get aggregated column statistics across devices.
Running Federated EDA#
The code snippet below specifies the code needed to run Federated EDA with OctaiPipe.
The code uses the file configs/federated_eda.yml
. How to configure this file is
specified in the next section.
1 from octaipipe.feda.feda import FederatedEDA
2
3 feda = FederatedEDA(config_path='./configs/federated_eda.yml')
4
5 feda.run()
Configuring Federated EDA#
Below is an example configuration YAML file for federated EDA:
1 name: feda
2
3 infrastructure:
4 device_ids: [feda-test-0, feda-test-1]
5 device_group: []
6 image_name: 'octaipipe.azurecr.io/octaipipe_core-all_data_loaders:latest'
7
8 input_data_specs:
9 devices:
10 - device: default
11 datastore_type: influxdb
12 query_type: dataframe
13 query_template_path: ./configs/influx_query.txt
14 query_values:
15 start: "2024-04-11T16:00:00.000Z"
16 stop: "2024-04-11T16:30:00.000Z"
17 bucket: test-bucket
18 measurement: feda-test
19 data_converter: {}
20 - device: feda-test-1
21 datastore_type: influxdb
22 query_type: dataframe
23 query_template_path: ./configs/influx_query.txt
24 query_values:
25 start: "2024-04-11T16:00:00.000Z"
26 stop: "2024-04-11T16:30:00.000Z"
27 bucket: test-bucket
28 measurement: feda-test
29 data_converter: {}
30
31 feda_specs:
32 ignore_columns:
33 - b
34 min_available_devices: 2
35 device_data_level: dictionary # dictionary or "column_statistics"
36 global_data_level: # "all", or None (leave empty)
37
38 run_specs: {}
Infrastructure#
The infrastructure section is similar to that of the Federated Learning configuration.
The user can specify device_ids
which is a list of device IDs to include in the
EDA run. The device_group
field is a list of device groups to include.
The image_name
specifies which image to use to run federated EDA. Federated EDA
is part of OctaiPipe Core and can therefore be run using an OctaiPipe Core image.
Input Data specs#
The input data specs are similar to those in the Data Loading and Writing Utilities.
However, just like in Federated Learning, they contain a list in the devices
field where
each device can have its own specific input_data_specs
. The input_data_specs
therefore
also need to contain the device
field specifying which device the specs are for.
If a device is not found in the list, the default
device will be used.
FEDA specs#
This is the core configuration section of federated EDA. This section allows the user to specify what information is pulled from devices and at what level (device or global).
The ignore_columns
field is a list of columns that should be ignored when running
EDA on each device.
The min_available_devices
field specifies the minimum number of devices that need
to be available to run EDA. If not enough devices are available, EDA will fail. The
server will also wait for this many devices before starting aggregation. Therefore, if
data from all devices is necessary, set this to the number of total devices in the run.
The device_data_level
defines what data should be collected from devices. This can
take values dictionary
and column_statistics
. If set to dictionary
,
more general metadata (similar to a data dictionary) about devices will be gathered,
such as column names, number of rows of data, and number of missing values in
the dataset. If set to column_statistics
, the following statistics will be calculated
for each device:
Mean
Standard deviation
Min
Max
Median
Mode
Number of unique values
Note that when column_statistics
are calculated, the data dictionary will be returned
as well, but not vice versa.
The global_data_level
defines whether the column_statistics
calculated for
the devices should be aggregated or not. If set to all
, the statistics from individual
devices will not be saved, but aggregated to show global statistics. If device_data_level
is set to dictionary
, global_data_level
has to be set to None (leave field empty) as
no column statistics are calculated. If device_data_level
is column_statistics
and
global_data_level
is None
, both global and device level statistics will be saved.
If global_data_level
is all
or None
and device_data_level
is column_statistics
,
the following global statistics will be recorded:
Weighted mean
Pooled standard deviation
Global minimum
Global maximum
Average missing value count
Run specs#
The run specs should be included but left as an empty dictionary {}
.
Viewing results of FEDA#
In order to view the results of Federated EDA, find the Dataset ID that is printed
by the Federated EDA run. You can also retrieve this from the federated EDA object
using feda._dataset_id
if you cannot find it elsewhere. Then simply use the federated
EDA client as below:
1 from octaipipe_core.client.feda_client import FEDAClient
2
3 dataset_id: str = "replace_with_id"
4 feda_client = FEDAClient()
5 feda_client.get_dataset_by_id(dataset_id)
It is also possible to get all previous runs with the following code:
1 from octaipipe_core.client.feda_client import FEDAClient
2
3 feda_client = FEDAClient()
4 feda_client.get_dataset()
To more easily digest the output for different configs, you can run the following code:
Top-level data dictionary#
1 from octaipipe_core.client.feda_client import FEDAClient
2
3 dataset_id: str = "replace_with_id"
4 feda_client = FEDAClient()
5 record: dict = feda_client.get_dataset_by_id(dataset_id)
6
7 devices: List[dict] = [dev for dev in record['devices']]
8
9 devices: dict = {device['name']: dict(**device['metadata'], **device['deviceStatistics']) for device in devices}
10
11 pd.DataFrame.from_dict(devices, orient='index')
Global column statistics#
1 from octaipipe_core.client.feda_client import FEDAClient
2
3 dataset_id: str = "replace_with_id"
4 feda_client = FEDAClient()
5 record: dict = feda_client.get_dataset_by_id(dataset_id)
6
7 columns: dict = {col['name']: dict(**{'type': col['type']}, **col['statistics']) for col in record['columns']}
8
9 pd.DataFrame.from_dict(columns, orient='columns')
Device column statistics#
This creates a dictionary where each key is a device ID and the value is a Pandas dataframe with column statistics similar to the output from the global column statistics.
1 from octaipipe_core.client.feda_client import FEDAClient
2
3 dataset_id: str = "replace_with_id"
4 feda_client = FEDAClient()
5 record: dict = feda_client.get_dataset_by_id(dataset_id)
6
7 devices = [dev for dev in record['devices']]
8 dev_out = dict()
9 for device in devices:
10 dev_columns: dict = {col['name']: dict(**{'type': col['type']}, **col['statistics']) for col in device['columns']}
11 dev_out[device['name']] = pd.DataFrame.from_dict(dev_columns, orient='columns')
12
13 dev_out["replace_with_device_id"]