Federated Learning#
Federated learning, or FL, is a new machine-learning paradigm introduced by Google in 2016. It can effectively avoid privacy issues of the data. If one had to train a machine learning model in the past, we have to collect available data and train it in a centralised manner. With federated learning, edge devices upload their local training model to the server instead of uploading their data to a centralised location. The FL server aggregates these local models to create an aggregated global model. The high-level comparison between these two methods is illustrated using the figure below.
Typical FL training involves the following steps.
A machine-learning process triggers deployment of the FL server.
The FL Server randomly requests available devices or FL clients to perform machine learning processes using their own local data to create the own local model. These local models are initially based on the global model sent from the FL Server.
Once training is completed, these selected FL client push their local model to the FL server.
The FL server aggregates another version of the global model when receiving the required number of local models. The aggregation rule is based on the strategy defined by the user.
The FL server starts another round of training by randomly requesting available devices using the latest global model.
The FL training process terminates after a certain rounds of training.
From the process above, we may notice some keys to FL. These are
FL infrastructure, including devices and FL server.
The device selection algorithm.
The model aggregation algorithm.
The neural network model for this experiment.
You may notice FL can also consider machine learning as part of the pipeline, which makes it perfect for using Octaipipe.
To perform FL using Octaipipe, like other available machine learning such as AutoML, we use YML to define the FL infrastructure and its respective processes. Device selection and model aggregation are referred to as the “strategy” in Octaipipe, which we can define using a pythonic interface, normally via a jupyter notebook in OctaiLab. Similar to the strategy, one may use a predefined neural network model provided by OctaiPipe or define your own using a code-based pythonic method.
Since FL is one of Octaipipe’s features, you can easily conduct FL as one of many steps in the pipeline that Octaipipe offers. Hence, this user manual introduces how to perform FL using OctaiPipe. We start from the prerequisites needed when one would like to use FL with Octaipipe. After that, the required steps and their details are discussed. And we end with the programmer’s guide in related libraries.
Prerequisites#
Some essential steps have to complete before performing federated learning.
Collect and register device information with Octaipipe.
Prepare yaml configuration for FL and related pipeline.
Steps and Results#
To properly perform FL using Octaipipe, there are several essential processes. These processes will lead to the creation of a machine-learning model. All begin with the registration of the devices. After that, a definition file is required before deploying respective container images. Once the container image is deployed to each device, one can start the federated learning task with a simple command. The information of each process will be stored in the database, which is the summary report in Octaipipe. And as always, a machine-learning model will be created when the learning process is completed. In the following section, we will describe these in detail.
Device Registration#
This is the first step of any FL experiment using Octaipipe. Please refer to Register a Device for more details.
FL Server Deployment#
Once devices are registered in Octaipipe, we use “kubectl” under the hood to deploy the k8s server. This server plays an important role. It can deploy container images and the FL Server, which aggregates models from selected edge devices. Octaipipe has already provided this feature to create the server for you. All you need to do is to prepare a YAML configuration file and use octaifl.run() to deploy.
Define Pipeline Steps for FL#
After devices have been properly registered and initialised, the next step is to provide FL specifications. An FL specification can be described by its infrastructure, typical OctaiPipe configurations relating to input, output and training specs as well as the FL specific aggregation strategy. We will demonstrate how to perform federated learning in Octaipipe and being by discussing the YML below.
1 name: federated_learning
2 description: Federated Learning Demo
3
4 infrastructure:
5 device_ids: [FL-01, FL-02, FL-03, FL-04]
6 device_group: group_1
7 image_name: octaipipe.azurecr.io/octaipipe-all_data_loaders:latest
8 server_image: octaipipe.azurecr.io/fl_server:latest
9 env:
10 ENV_VAR_0: value_0
11 ENV_VAR_1: value_1
12
13 strategy:
14 fraction_fit: 1.0
15 fraction_evaluate: 1.0
16 min_fit_clients: 2
17 min_evaluate_clients: 2
18 min_available_clients: 2
19 evaluate_metrics_aggregation_fn: weighted_average
20 num_rounds: 10
21 initial_model: None
22 save_best_model: true
23
24 input_data_specs:
25 devices:
26 - device: default
27 datastore_type: influxdb
28 query_type: dataframe
29 query_template_path: ./configs/data/influx_query_def.txt
30 query_values:
31 start: "2022-11-10T00:00:00.000Z"
32 stop: "2022-11-11T00:00:00.000Z"
33 bucket: cmapss-bucket
34 measurement: sensors-raw
35 tags: {}
36 - device: FL-01
37 datastore_type: influxdb
38 query_type: dataframe
39 query_template_path: ./configs/data/influx_query_1.txt
40 query_values:
41 start: "2022-11-10T00:00:00.000Z"
42 stop: "2022-11-11T00:00:00.000Z"
43 bucket: cmapss-bucket
44 measurement: sensors-raw
45 tags: {}
46 - device: FL-02
47 datastore_type: influxdb
48 query_type: dataframe
49 query_template_path: ./configs/data/influx_query_2.txt
50 query_values:
51 start: "2022-11-10T00:00:00.000Z"
52 stop: "2022-11-11T00:00:00.000Z"
53 bucket: cmapss-bucket
54 measurement: sensors-raw
55 tags: {}
56 - device: FL-03
57 datastore_type: influxdb
58 query_type: dataframe
59 query_template_path: ./configs/data/influx_query_3.txt
60 query_values:
61 start: "2022-11-10T00:00:00.000Z"
62 stop: "2022-11-11T00:00:00.000Z"
63 bucket: cmapss-bucket
64 measurement: sensors-raw
65 tags: {}
66 - device: FL-04
67 datastore_type: influxdb
68 query_type: dataframe
69 query_template_path: ./configs/data/influx_query_4.txt
70 query_values:
71 start: "2022-11-10T00:00:00.000Z"
72 stop: "2022-11-11T00:00:00.000Z"
73 bucket: cmapss-bucket
74 measurement: sensors-raw
75 tags: {}
76 data_converter: {}
77
78 evaluation_data_specs:
79 devices:
80 - device: default
81 datastore_type: influxdb
82 query_type: dataframe
83 query_template_path: ./configs/data/influx_query_eval_def.txt
84 query_values:
85 start: "2022-11-10T00:00:00.000Z"
86 stop: "2022-11-11T00:00:00.000Z"
87 bucket: cmapss-bucket
88 measurement: sensors-raw
89 tags: {}
90 - device: FL-01
91 datastore_type: influxdb
92 query_type: dataframe
93 query_template_path: ./configs/data/influx_query_eval_1.txt
94 query_values:
95 start: "2022-11-10T00:00:00.000Z"
96 stop: "2022-11-11T00:00:00.000Z"
97 bucket: cmapss-bucket
98 measurement: sensors-raw
99 tags: {}
100 - device: FL-02
101 datastore_type: influxdb
102 query_type: dataframe
103 query_template_path: ./configs/data/influx_query_eval_2.txt
104 query_values:
105 start: "2022-11-10T00:00:00.000Z"
106 stop: "2022-11-11T00:00:00.000Z"
107 bucket: cmapss-bucket
108 measurement: sensors-raw
109 tags: {}
110 - device: FL-03
111 datastore_type: influxdb
112 query_type: dataframe
113 query_template_path: ./configs/data/influx_query_eval_3.txt
114 query_values:
115 start: "2022-11-10T00:00:00.000Z"
116 stop: "2022-11-11T00:00:00.000Z"
117 bucket: cmapss-bucket
118 measurement: sensors-raw
119 tags: {}
120 - device: FL-04
121 datastore_type: influxdb
122 query_type: dataframe
123 query_template_path: ./configs/data/influx_query_eval_4.txt
124 query_values:
125 start: "2022-11-10T00:00:00.000Z"
126 stop: "2022-11-11T00:00:00.000Z"
127 bucket: cmapss-bucket
128 measurement: sensors-raw
129 tags: {}
130 data_converter: {}
131
132 model_specs:
133 type: base_torch
134 load_existing: false
135 name: test_torch
136 model_load_specs:
137 version: '000'
138 model_params:
139 loss_fn: mse
140 scaling: standard
141 metric: rmse
142 epochs: 10
143 batch_size: 32
144
145 run_specs:
146 target_label: RUL
147 cycle_id: "Machine number"
148 backend: pytorch
From the YML above, a typical FL YML consists of these blocks:
infrastructure
input_data_specs
evaluation_data_specs
model_specs
run_specs
Note
The output_data_specs
block does not need to be set for Federated Learning Pipelines
and will not do anything if it is included in the YML configuration.
Like with other pipeline steps, one needs to define “input_data_specs”, “evaluation_data_specs”
to describe the data for the FL experiment. OctaiPipe provides the
feature to specify query values for each device individually by query_values['devices']
.
Hence, we would like to introduce the “infrastructure”, “model_specs”, and “run_specs” in the
following sections.
infrastructure#
From the high-level machine-learning life cycle point of view, there is no difference between centralised ML and FL. Hence, there should be no difference in the YML configuration. In fact, the only difference in YML between FL and typical ML is an additional block that has to be added to describe the infrastructure, which is not required for typical machine learning. An example of this infrastructure block is presented underneath.
1infrastructure:
2 device_ids: [FL-01, FL-02, FL-03, FL-04]
3 device_group: group_1
4 image_name: octaipipe.azurecr.io/octaipipe-all_data_loaders:latest
5 server_image: octaipipe.azurecr.io/fl_server:latest
6 env:
7 ENV_VAR_0: value_0
8 ENV_VAR_1: value_1
Key |
Value Description |
Note |
Layer |
---|---|---|---|
|
Define the following block belongs to the the infrastructure |
required field |
1 |
|
Define the FL clients of this experiment. It is a list-type variable contains edge device_ids. |
required field |
2 |
|
|
required field |
2 |
|
|
required field |
2 |
|
|
required field |
2 |
|
|
required field |
2 |
model_specs#
A typical model_specs block for FL experiment is as shown below:
1model_specs:
2 type: base_pytorch
3 load_existing: false
4 name: test_torch
5 model_load_specs:
6 version: 3
7 model_params:
8 loss_fn: mse
9 scaling: standard
10 metric: rmse
11 epochs: 10
12 batch_size: 32
13 custom_model:
14 file_path: ../../src/octaipipe/models/fl_aquarium/base_pytorch.py
The difference to other types of ML experiment is one has to assign a custom_model file_path within this block.
run_specs#
This defines the target label, cycle id and FL Framework of the experiment. An example is shown below:
1run_specs:
2 target_label: RUL
3 cycle_id: "Machine number"
4 backend: pytorch
Key |
Value Description |
Note |
Layer |
|
The name of the label to use as the target for training the model |
required |
1 |
|
The framework to use. See documentation on frameworks. If not provided and using an FL model build into OctaiPipe, backend can be inferred. Currently: ‘pytorch’, ‘sklearn’, ‘xgboost’ |
optional |
1 |
|
The name of the column that identifies and operating cycle. The validation set is split grouped on this column if provided. |
optional |
1 |
strategy#
Octaipipe supports two federated learning (FL) strategies tailored to different types of machine learning models: one for PyTorch and scikit-learn models, and another for XGBoost models. Both of these are extensions of Flower’s built in flwr.server.strategy.FedAvg strategy. See Strategy for more details.
These strategies incorporate specific defaults to ensure compatibility and optimal performance with their respective model types.
You will find Flower’s documentation on the built in strategy options of FedAvg in [their documentation](https://flower.ai/docs/framework/ref-api/flwr.server.strategy.FedAvg.html#fedavg). Of these only the following are supported in OctaiPipe:
For PyTorch and scikit-learn Models:#
fraction_fit (float, optional): The fraction of clients to use during training. If the specified minimum number of clients (min_fit_clients) is greater than the available fraction (fraction_fit * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.
fraction_evaluate (float, optional): The fraction of clients to use during validation. If the specified minimum number of clients (min_evaluate_clients) is greater than the available fraction (fraction_evaluate * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.
min_fit_clients (int, optional): The minimum number of clients to use during training. The default value is 2.
min_evaluate_clients (int, optional): The minimum number of clients to use during validation. The default value is 2.
min_available_clients (int, optional): The minimum total number of clients that must be available in the system. The default value is 2.
evaluate_metrics_aggregation_fn (str): Maps to a function for aggregating metrics during validation. Defaults to weighted average. Additional evaluation aggregation functions can be provided in the custom StrategyConfig class.
num_rounds (int, optional): The number of rounds to train for. The default value is 10.
round_timeout (float, optional): Time to wait until aggregation goes ahead regardless of number of clients that have sent model updates. Defaults to None, which means no timeout.
Additional Options for PyTorch and scikit-learn:
initial_model (str, optional): To initiate global model with an existing model, specify the model_id of the model here. Defaults to None.
save_best_model (bool, optional): Whether to save the model from the last or best (based on metric specified in model_specs) communication round. Defaults to False.
Defaults Options: * fraction_fit and fraction_evaluate set to 1.0: Fraction of Clients Participating in Training and Evaluation (100%) * min_fit_clients and min_evaluate_clients set to 2: Minimum Number of Clients required for both training and evaluation, with a total minimum of 2 available clients (min_available_clients) * evaluate_metrics_aggregation_fn: ‘weighted_average’: Evaluation Metrics Aggregation Function (Weighted average) * num_rounds: 10: Number of Training Rounds * round_timeout: None: No timeout, always wait for enough clients to aggregate * initial_model: None: Initial Model Weights (Can be specified; otherwise, defaults to initializing with random client model weights)
For XGBoost Models:#
fraction_fit (float, optional): The fraction of clients to use during training. If the specified minimum number of clients (min_fit_clients) is greater than the available fraction (fraction_fit * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.
fraction_evaluate (float, optional): The fraction of clients to use during validation. If the specified minimum number of clients (min_evaluate_clients) is greater than the available fraction (fraction_evaluate * number of clients), then the minimum number of clients will still be sampled. The default value is 1.0.
min_fit_clients (int, optional): The minimum number of clients to use during training. The default value is 2.
min_evaluate_clients (int, optional): The minimum number of clients to use during validation. The default value is 2.
min_available_clients (int, optional): The minimum total number of clients that must be available in the system. The default value is 2.
evaluate_metrics_aggregation_fn (str): Maps to a function for aggregating metrics during validation. Defaults to weighted average. Additional evaluation aggregation functions can be provided in the custom StrategyConfig class.
num_rounds (int, optional): The number of rounds to train for. The default value is 10.
Additional Options for PyTorch and scikit-learn:
num_local_rounds (int, optional): The number of local rounds of training to perform on the device. The default value is 1.
normalized_learning_rate (bool, optional): Whether to normalize the learning rate based on the number of samples each client contributes. The default value is False.
Default Strategy Options: * fraction_fit and fraction_evaluate set to 1.0: Fraction of Clients Participating in Training and Evaluation (100%) * min_fit_clients and min_evaluate_clients set to 2: Minimum Number of Clients required for both training and evaluation, with a total minimum of 2 available clients (min_available_clients) * evaluate_metrics_aggregation_fn: ‘weighted_average’: Evaluation Metrics Aggregation Function (Weighted average) * num_rounds: 10: Number of Training Rounds * num_local_rounds: 1: Number of Local Rounds (Indicating that each client will perform one round of training locally before aggregation) * normalized_learning_rate: False: Normalized Learning Rate (Disabled by default, but can be enabled to adjust the learning rate based on the number of samples each client contributes)
[See here](Customise Strategy Parameters) for home to customise a strategy:
Custom PyTorch model#
Once the FL infrastructure and its strategy are defined, the next step is to define the neural network model. Octaipipe currently supports using PyTorch to define the neural network. In Octaipipe, one may use a code-based style to define your own PyTorch neural network model. Here, we provide the example shown below:
1from octaipipe.model_classes.fl_aquarium.base_pytorch import BasePytorch
2import torch.nn as nn
3import torch.nn.functional as F
4class CustomModel(BasePytorch):
5 def __init__(self, **kwargs):
6 super().__init__(**kwargs)
7 def _build_model(self, input_shape, output_shape):
8 '''Builds model when class is initialized.
9 Args:
10 input_shape: number of columns in X.
11 output_shape: number of columns in y
12 '''
13 self.dense1 = nn.Linear(input_shape, input_shape * 2)
14 self.drop1 = nn.Dropout(p=0.5)
15 self.dense2 = nn.Linear(input_shape * 2, input_shape * 2)
16 self.drop2 = nn.Dropout(p=0.5)
17 self.dense3 = nn.Linear(input_shape * 2, output_shape)
18 def forward(self, x):
19 '''Defines how data is forward propagated'''
20 x = F.relu(self.dense1(x))
21 x = self.drop1(x)
22 x = F.relu(self.dense2(x))
23 x = self.drop2(x)
24 x = self.dense3(x)
25 return x
Deploy and perform FL#
After one has prepared everything, one may start the FL experiment by simply issuing the following command to trigger it.
1octaifl start -c <pipeline step definition file>
Or, you may use the code-based method as shown below.
1from octaipipe.federated_learning.run_fl import OctaiFL
2
3FlYml = 'path to definition file'
4octaiFl = OctaiFL(FlYml)
5octaiFl.run()
The OctaiFL infrastructure is torn down after the FL experiment terminates. However, you may also use the following methods to terminate resource infrastructure before the experiment finishes. This can be done using the CLI, as shown below.
1octaifl down --deployment-id <deployment-id>
Or using a python code-based statement.
1octaifl_teardown(deployment_id='<deployment-id>')
Summary Report and Detailed Experiment Info#
When an FL experiment starts, one may be eager to see how the FL experiment progresses.
Octaipipe offers two APIs to retrieve this information. You may retrieve summary reports
regarding the latest information or detailed information via the experiments
submodule.
1import pandas as pd
2from octaipipe import experiments
3
4experiments.get_experiment_by_id('2ecab1ab')
From the code above, one may discover details about the experiment once experiment_id is known. One may issue experiments.get_experiment_by_id() to get detailed information about a specific experiment.
Detailed experiment info provides complete experiment information within the experiment’s lifespan. If an experiment has ten rounds of communication, it will provide ten detailed experiment records.
Detailed experiment info provides the following information. * experimentId * experimentDescription * date * userId * communicationRound * currentStatus * createDatetime * experimentStatus * flConfigFile * flServer * flStrategy
Machine Learning Model#
Once trained and the experiment is complete, one may be keen to use your model for inference.
Octaipipe also provides a models
submodule. You may retrieve the model by
the experimentId or modelId. Here is example code to retrieve the model. Once the model
information is retrieved, one may use these to perform model inference. We suggest using
the model name and its version number while performing inference.
1from octaipipe import models
2
3models.find_models_by_experiment_id('2ecab1ab')
Generally, each communication round will create an updated model. Octaipipe also provides an API to retrieve evaluation information for these models. All you need to do is to use the EvaluationClient API. Beneath is example code to get the model evaluation information by its modelId.
1import pandas as pd
2from octaipipe_core.client.evaluation_client import EvaluationClient
3
4ev_client = EvaluationClient()
5evaluation = ev_client.get_evaluation_model_id('16826f33')
6pd.DataFrame(evaluation).sort_values('communicationRound')
Federated Learning Details#
If you want to understand the FL libraries that Octaipipe provides, please refer to the following sections for more detail.