Federated Learning#

Federated learning, or FL, is a new machine-learning paradigm introduced by Google in 2016. It can effectively avoid privacy issues of the data. If one had to train a machine learning model in the past, we have to collect available data and train it in a centralised manner. With federated learning, edge devices upload their local training model to the server instead of uploading their data to a centralised location. The FL server aggregates these local models to create an aggregated global model. The high-level comparison between these two methods is illustrated using the figure below.

high-level comparison of FL and ML

Typical FL training involves the following steps.

  1. A machine-learning process triggers deployment of the FL server.

  2. The FL Server randomly requests available devices or FL clients to perform machine learning processes using their own local data to create the own local model. These local models are initially based on the global model sent from the FL Server.

  3. Once training is completed, these selected FL client push their local model to the FL server.

  4. The FL server aggregates another version of the global model when receiving the required number of local models. The aggregation rule is based on the strategy defined by the user.

  5. The FL server starts another round of training by randomly requesting available devices using the latest global model.

  6. The FL training process terminates after a certain rounds of training.

FL process of OctaiPipe

From the process above, we may notice some keys to FL. These are

  • FL infrastructure, including devices and FL server.

  • The device selection algorithm.

  • The model aggregation algorithm.

  • The neural network model for this experiment.

You may notice FL can also consider machine learning as part of the pipeline, which makes it perfect for using Octaipipe.

To perform FL using Octaipipe, like other available machine learning such as AutoML, we use YML to define the FL infrastructure and its respective processes. Device selection and model aggregation are referred to as the “strategy” in Octaipipe, which we can define using a pythonic interface, normally via a jupyter notebook in OctaiLab. Similar to the strategy, one may use a predefined neural network model provided by OctaiPipe or define your own using a code-based pythonic method.

Since FL is one of Octaipipe’s features, you can easily conduct FL as one of many steps in the pipeline that Octaipipe offers. Hence, this user manual introduces how to perform FL using OctaiPipe. We start from the prerequisites needed when one would like to use FL with Octaipipe. After that, the required steps and their details are discussed. And we end with the programmer’s guide in related libraries.

Prerequisites#

Some essential steps have to complete before performing federated learning.

  • Collect and register device information with Octaipipe.

  • Prepare yaml configuration for FL and related pipeline.

Steps and Results#

To properly perform FL using Octaipipe, there are several essential processes. These processes will lead to the creation of a machine-learning model. All begin with the registration of the devices. After that, a definition file is required before deploying respective container images. Once the container image is deployed to each device, one can start the federated learning task with a simple command. The information of each process will be stored in the database, which is the summary report in Octaipipe. And as always, a machine-learning model will be created when the learning process is completed. In the following section, we will describe these in detail.

Device Registration#

This is the first step of any FL experiment using Octaipipe. Please refer to Register a Device for more details.

FL Server Deployment#

Once devices are registered in Octaipipe, we use “kubectl” under the hood to deploy the k8s server. This server plays an important role. It can deploy container images and the FL Server, which aggregates models from selected edge devices. Octaipipe has already provided this feature to create the server for you. All you need to do is to prepare a YAML configuration file and use octaifl.run() to deploy.

Define Pipeline Steps for FL#

After devices have been properly registered and initialised, the next step is to provide FL specifications. An FL specification can be described by its infrastructure, typical OctaiPipe configurations relating to input, output and training specs as well as the FL specific aggregation strategy. We will demonstrate how to perform federated learning in Octaipipe and being by discussing the YML below.

  1 name: federated_learning
  2 description: Federated Learning Demo
  3
  4 infrastructure:
  5   device_ids: [FL-01, FL-02, FL-03, FL-04]
  6   device_group: group_1
  7   image_name: octaipipe.azurecr.io/octaipipe-all_data_loaders:latest
  8   server_image: octaipipe.azurecr.io/fl_server:latest
  9   env:
 10     ENV_VAR_0: value_0
 11     ENV_VAR_1: value_1
 12
 13 strategy:
 14   fraction_fit: 1.0
 15   fraction_evaluate: 1.0
 16   min_fit_clients: 2
 17   min_evaluate_clients: 2
 18   min_available_clients: 2
 19   evaluate_metrics_aggregation_fn: weighted_average
 20   num_rounds: 10
 21   initial_model: None
 22   save_best_model: true
 23
 24 input_data_specs:
 25   devices:
 26     - device: default
 27       datastore_type: influxdb
 28       query_type: dataframe
 29       query_template_path: ./configs/data/influx_query_def.txt
 30       query_values:
 31         start: "2022-11-10T00:00:00.000Z"
 32         stop: "2022-11-11T00:00:00.000Z"
 33         bucket: cmapss-bucket
 34         measurement: sensors-raw
 35         tags: {}
 36     - device: FL-01
 37       datastore_type: influxdb
 38       query_type: dataframe
 39       query_template_path: ./configs/data/influx_query_1.txt
 40       query_values:
 41         start: "2022-11-10T00:00:00.000Z"
 42         stop: "2022-11-11T00:00:00.000Z"
 43         bucket: cmapss-bucket
 44         measurement: sensors-raw
 45         tags: {}
 46     - device: FL-02
 47       datastore_type: influxdb
 48       query_type: dataframe
 49       query_template_path: ./configs/data/influx_query_2.txt
 50       query_values:
 51         start: "2022-11-10T00:00:00.000Z"
 52         stop: "2022-11-11T00:00:00.000Z"
 53         bucket: cmapss-bucket
 54         measurement: sensors-raw
 55         tags: {}
 56     - device: FL-03
 57       datastore_type: influxdb
 58       query_type: dataframe
 59       query_template_path: ./configs/data/influx_query_3.txt
 60       query_values:
 61         start: "2022-11-10T00:00:00.000Z"
 62         stop: "2022-11-11T00:00:00.000Z"
 63         bucket: cmapss-bucket
 64         measurement: sensors-raw
 65         tags: {}
 66     - device: FL-04
 67       datastore_type: influxdb
 68       query_type: dataframe
 69       query_template_path: ./configs/data/influx_query_4.txt
 70       query_values:
 71         start: "2022-11-10T00:00:00.000Z"
 72         stop: "2022-11-11T00:00:00.000Z"
 73         bucket: cmapss-bucket
 74         measurement: sensors-raw
 75         tags: {}
 76   data_converter: {}
 77
 78 evaluation_data_specs:
 79   devices:
 80     - device: default
 81       datastore_type: influxdb
 82       query_type: dataframe
 83       query_template_path: ./configs/data/influx_query_eval_def.txt
 84       query_values:
 85         start: "2022-11-10T00:00:00.000Z"
 86         stop: "2022-11-11T00:00:00.000Z"
 87         bucket: cmapss-bucket
 88         measurement: sensors-raw
 89         tags: {}
 90     - device: FL-01
 91       datastore_type: influxdb
 92       query_type: dataframe
 93       query_template_path: ./configs/data/influx_query_eval_1.txt
 94       query_values:
 95         start: "2022-11-10T00:00:00.000Z"
 96         stop: "2022-11-11T00:00:00.000Z"
 97         bucket: cmapss-bucket
 98         measurement: sensors-raw
 99         tags: {}
100     - device: FL-02
101       datastore_type: influxdb
102       query_type: dataframe
103       query_template_path: ./configs/data/influx_query_eval_2.txt
104       query_values:
105         start: "2022-11-10T00:00:00.000Z"
106         stop: "2022-11-11T00:00:00.000Z"
107         bucket: cmapss-bucket
108         measurement: sensors-raw
109         tags: {}
110     - device: FL-03
111       datastore_type: influxdb
112       query_type: dataframe
113       query_template_path: ./configs/data/influx_query_eval_3.txt
114       query_values:
115         start: "2022-11-10T00:00:00.000Z"
116         stop: "2022-11-11T00:00:00.000Z"
117         bucket: cmapss-bucket
118         measurement: sensors-raw
119         tags: {}
120     - device: FL-04
121       datastore_type: influxdb
122       query_type: dataframe
123       query_template_path: ./configs/data/influx_query_eval_4.txt
124       query_values:
125         start: "2022-11-10T00:00:00.000Z"
126         stop: "2022-11-11T00:00:00.000Z"
127         bucket: cmapss-bucket
128         measurement: sensors-raw
129         tags: {}
130   data_converter: {}
131
132 model_specs:
133   type: base_torch
134   load_existing: false
135   name: test_torch
136   model_load_specs:
137     version: '000'
138   model_params:
139     loss_fn: mse
140     scaling: standard
141     metric: rmse
142     epochs: 10
143     batch_size: 32
144
145 run_specs:
146   target_label: RUL
147   cycle_id: "Machine number"
148   backend: pytorch

From the YML above, a typical FL YML consists of these blocks:

  • infrastructure

  • input_data_specs

  • evaluation_data_specs

  • model_specs

  • run_specs

Note

The output_data_specs block does not need to be set for Federated Learning Pipelines and will not do anything if it is included in the YML configuration.

Like with other pipeline steps, one needs to define “input_data_specs”, “evaluation_data_specs” to describe the data for the FL experiment. OctaiPipe provides the feature to specify query values for each device individually by query_values['devices']. Hence, we would like to introduce the “infrastructure”, “model_specs”, and “run_specs” in the following sections.

infrastructure#

From the high-level machine-learning life cycle point of view, there is no difference between centralised ML and FL. Hence, there should be no difference in the YML configuration. In fact, the only difference in YML between FL and typical ML is an additional block that has to be added to describe the infrastructure, which is not required for typical machine learning. An example of this infrastructure block is presented underneath.

1infrastructure:
2  device_ids: [FL-01, FL-02, FL-03, FL-04]
3  device_group: group_1
4  image_name: octaipipe.azurecr.io/octaipipe-all_data_loaders:latest
5  server_image: octaipipe.azurecr.io/fl_server:latest
6  env:
7    ENV_VAR_0: value_0
8    ENV_VAR_1: value_1

Key

Value Description

Note

Layer

infrastructure

Define the following block belongs to the the infrastructure

required field

1

device_ids

Define the FL clients of this experiment. It is a list-type variable contains edge device_ids.

required field

2

device_group

List of device groups to run FL on (will be

be added to device ID list).

required field

2

image_name

Name of the OctaiPipe container image to

use.

required field

2

server_image

Name of the FL server container image to

use.

required field

2

env

Dictionary of environment variable names

and values to set on device for experiment

required field

2

model_specs#

A typical model_specs block for FL experiment is as shown below:

 1model_specs:
 2  type: base_pytorch
 3  load_existing: false
 4  name: test_torch
 5  model_load_specs:
 6    version: 3
 7  model_params:
 8    loss_fn: mse
 9    scaling: standard
10    metric: rmse
11    epochs: 10
12    batch_size: 32
13  custom_model:
14    file_path: ../../src/octaipipe/models/fl_aquarium/base_pytorch.py

The difference to other types of ML experiment is one has to assign a custom_model file_path within this block.

run_specs#

This defines the target label, cycle id and FL Framework of the experiment. An example is shown below:

1run_specs:
2  target_label: RUL
3  cycle_id: "Machine number"
4  backend: pytorch

Key

Value Description

Note

Layer

target_label

The name of the label to use as the target for training the model

required

1

backend

The framework to use. See documentation on frameworks. If not provided and using an FL model build into OctaiPipe, backend can be inferred. Currently: ‘pytorch’, ‘sklearn’, ‘xgboost’

optional

1

cycle_id

The name of the column that identifies and operating cycle. The validation set is split grouped on this column if provided.

optional

1

strategy#

Octaipipe supports two federated learning (FL) strategies tailored to different types of machine learning models: one for PyTorch and scikit-learn models, and another for XGBoost models. Both of these are extensions of Flower’s built in flwr.server.strategy.FedAvg strategy. See Strategy for more details.

These strategies incorporate specific defaults to ensure compatibility and optimal performance with their respective model types.

You will find Flower’s documentation on the built in strategy options of FedAvg in [their documentation](https://flower.ai/docs/framework/ref-api/flwr.server.strategy.FedAvg.html#fedavg). Of these only the following are supported in OctaiPipe:

For PyTorch and scikit-learn Models:#

  • fraction_fit (float, optional): The fraction of clients to use during training. If the specified minimum number of clients (min_fit_clients) is greater than the available fraction (fraction_fit * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.

  • fraction_evaluate (float, optional): The fraction of clients to use during validation. If the specified minimum number of clients (min_evaluate_clients) is greater than the available fraction (fraction_evaluate * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.

  • min_fit_clients (int, optional): The minimum number of clients to use during training. The default value is 2.

  • min_evaluate_clients (int, optional): The minimum number of clients to use during validation. The default value is 2.

  • min_available_clients (int, optional): The minimum total number of clients that must be available in the system. The default value is 2.

  • evaluate_metrics_aggregation_fn (str): Maps to a function for aggregating metrics during validation. Defaults to weighted average. Additional evaluation aggregation functions can be provided in the custom StrategyConfig class.

  • num_rounds (int, optional): The number of rounds to train for. The default value is 10.

  • round_timeout (float, optional): Time to wait until aggregation goes ahead regardless of number of clients that have sent model updates. Defaults to None, which means no timeout.

Additional Options for PyTorch and scikit-learn:

  • initial_model (str, optional): To initiate global model with an existing model, specify the model_id of the model here. Defaults to None.

  • save_best_model (bool, optional): Whether to save the model from the last or best (based on metric specified in model_specs) communication round. Defaults to False.

Defaults Options: * fraction_fit and fraction_evaluate set to 1.0: Fraction of Clients Participating in Training and Evaluation (100%) * min_fit_clients and min_evaluate_clients set to 2: Minimum Number of Clients required for both training and evaluation, with a total minimum of 2 available clients (min_available_clients) * evaluate_metrics_aggregation_fn: ‘weighted_average’: Evaluation Metrics Aggregation Function (Weighted average) * num_rounds: 10: Number of Training Rounds * round_timeout: None: No timeout, always wait for enough clients to aggregate * initial_model: None: Initial Model Weights (Can be specified; otherwise, defaults to initializing with random client model weights)

For XGBoost Models:#

  • fraction_fit (float, optional): The fraction of clients to use during training. If the specified minimum number of clients (min_fit_clients) is greater than the available fraction (fraction_fit * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.

  • fraction_evaluate (float, optional): The fraction of clients to use during validation. If the specified minimum number of clients (min_evaluate_clients) is greater than the available fraction (fraction_evaluate * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.

  • min_fit_clients (int, optional): The minimum number of clients to use during training. The default value is 2.

  • min_evaluate_clients (int, optional): The minimum number of clients to use during validation. The default value is 2.

  • min_available_clients (int, optional): The minimum total number of clients that must be available in the system. The default value is 2.

  • evaluate_metrics_aggregation_fn (str): Maps to a function for aggregating metrics during validation. Defaults to weighted average. Additional evaluation aggregation functions can be provided in the custom StrategyConfig class.

  • num_rounds (int, optional): The number of rounds to train for. The default value is 10.

Additional Options for PyTorch and scikit-learn:

  • num_local_rounds (int, optional): The number of local rounds of training to perform on the device. The default value is 1.

  • normalized_learning_rate (bool, optional): Whether to normalize the learning rate based on the number of samples each client contributes. The default value is False.

Default Strategy Options: * fraction_fit and fraction_evaluate set to 1.0: Fraction of Clients Participating in Training and Evaluation (100%) * min_fit_clients and min_evaluate_clients set to 2: Minimum Number of Clients required for both training and evaluation, with a total minimum of 2 available clients (min_available_clients) * evaluate_metrics_aggregation_fn: ‘weighted_average’: Evaluation Metrics Aggregation Function (Weighted average) * num_rounds: 10: Number of Training Rounds * num_local_rounds: 1: Number of Local Rounds (Indicating that each client will perform one round of training locally before aggregation) * normalized_learning_rate: False: Normalized Learning Rate (Disabled by default, but can be enabled to adjust the learning rate based on the number of samples each client contributes)

[See here](Customise Strategy Parameters) for home to customise a strategy:

Custom PyTorch model#

Once the FL infrastructure and its strategy are defined, the next step is to define the neural network model. Octaipipe currently supports using PyTorch to define the neural network. In Octaipipe, one may use a code-based style to define your own PyTorch neural network model. Here, we provide the example shown below:

 1from octaipipe.model_classes.fl_aquarium.base_pytorch import BasePytorch
 2import torch.nn as nn
 3import torch.nn.functional as F
 4class CustomModel(BasePytorch):
 5    def __init__(self, **kwargs):
 6        super().__init__(**kwargs)
 7    def _build_model(self, input_shape, output_shape):
 8        '''Builds model when class is initialized.
 9        Args:
10            input_shape: number of columns in X.
11            output_shape: number of columns in y
12        '''
13        self.dense1 = nn.Linear(input_shape, input_shape * 2)
14        self.drop1 = nn.Dropout(p=0.5)
15        self.dense2 = nn.Linear(input_shape * 2, input_shape * 2)
16        self.drop2 = nn.Dropout(p=0.5)
17        self.dense3 = nn.Linear(input_shape * 2, output_shape)
18    def forward(self, x):
19        '''Defines how data is forward propagated'''
20        x = F.relu(self.dense1(x))
21        x = self.drop1(x)
22        x = F.relu(self.dense2(x))
23        x = self.drop2(x)
24        x = self.dense3(x)
25        return x

Deploy and perform FL#

After one has prepared everything, one may start the FL experiment by simply issuing the following command to trigger it.

1octaifl start -c <pipeline step definition file>

Or, you may use the code-based method as shown below.

1from octaipipe.federated_learning.run_fl import OctaiFL
2
3FlYml = 'path to definition file'
4octaiFl = OctaiFL(FlYml)
5octaiFl.run()

The OctaiFL infrastructure is torn down after the FL experiment terminates. However, you may also use the following methods to terminate resource infrastructure before the experiment finishes. This can be done using the CLI, as shown below.

1octaifl down --deployment-id <deployment-id>

Or using a python code-based statement.

1octaifl_teardown(deployment_id='<deployment-id>')

Summary Report and Detailed Experiment Info#

When an FL experiment starts, one may be eager to see how the FL experiment progresses. Octaipipe offers two APIs to retrieve this information. You may retrieve summary reports regarding the latest information or detailed information via the experiments submodule.

1import pandas as pd
2from octaipipe import experiments
3
4experiments.get_experiment_by_id('2ecab1ab')

From the code above, one may discover details about the experiment once experiment_id is known. One may issue experiments.get_experiment_by_id() to get detailed information about a specific experiment.

  • Detailed experiment info provides complete experiment information within the experiment’s lifespan. If an experiment has ten rounds of communication, it will provide ten detailed experiment records.

  • Detailed experiment info provides the following information. * experimentId * experimentDescription * date * userId * communicationRound * currentStatus * createDatetime * experimentStatus * flConfigFile * flServer * flStrategy

Machine Learning Model#

Once trained and the experiment is complete, one may be keen to use your model for inference. Octaipipe also provides a models submodule. You may retrieve the model by the experimentId or modelId. Here is example code to retrieve the model. Once the model information is retrieved, one may use these to perform model inference. We suggest using the model name and its version number while performing inference.

1from octaipipe import models
2
3models.find_models_by_experiment_id('2ecab1ab')

Generally, each communication round will create an updated model. Octaipipe also provides an API to retrieve evaluation information for these models. All you need to do is to use the EvaluationClient API. Beneath is example code to get the model evaluation information by its modelId.

1import pandas as pd
2from octaipipe_core.client.evaluation_client import EvaluationClient
3
4ev_client = EvaluationClient()
5evaluation = ev_client.get_evaluation_model_id('16826f33')
6pd.DataFrame(evaluation).sort_values('communicationRound')

Federated Learning Details#

If you want to understand the FL libraries that Octaipipe provides, please refer to the following sections for more detail.