Federated PyTorch#
Base Class#
- class octaipipe.model_classes.fl_aquarium.base_pytorch.BasePytorch(loss_fn: str = None, scaling: str = None, metric: str = None, epochs: int = 5, batch_size: int = 32, input_shape: int = None, output_shape: int = None, differential_privacy: dict = {}, **kwargs)#
Guide#
Federated learning can be run with a wide array of different models and model setups. As there are far too many models and deep learning architectures out there to include in OctaiPipe itself, we have instead added support for custom FL models. This guide explains the Base PyTorch model in OctaiPipe, and details how to build on it to make a custom model.
This guide together with the guide on configuring the FL Train Step can be used to set up and configure or customize federated learning in OctaiPipe.
Following is a step-by-step guide for how to build a custom model that is compatible with the FL pipeline in OctaiPipe. The model uses the Base PyTorch model as a template from which to build a model.
1. Using a Base Model#
Each model implemented for FL in OctaiPipe should inherit from a base model, that is implemented in OctaiPipe already. In this example, we inherit from the Base Pytorch Model.
The base PyTorch model implements the following methods:
__init__
: the initialization of the model_build_model
: setup of architecture for modelsetup_loaders
: a method for setting up dataloadersforward
: the process for forward propagation through networktrain_model
: method that runs training on datatest_model
: method that runs testing on trained model
Any of these methods can be overwritten by simply implmenting it in the child class.
Such, a custom architecture can be implemented by specifying a custom _build_model
method.
As a word of caution, it is useful to go through dependencies in the base class
when implementing methods as some methods rely on other methods in the class. For
example, the __init__
method calls the _build_model
method, so changing either could
have effects on the other.
It is also worth mentioning here that the base PyTorch model implements
either z-scoring or min-max scaling depending on arguments given to
__init__
. In order to get appropriate results during inference, input
data needs to be scaled accordingly. If no scaling method is selected,
features are not scaled.
Next we will go through the methods in the base class in more detail in order to see how it works and how we might implement a custom version.
2. Building our own custom methods#
The __init__
method in the current implementation sets up the model class with
things like input shape and number and type of evaluation metric. The key thing
to note about the __init__
method is that any arguments that the user wants to
hanf to the model class should go here. In the FL config file, the model_params
field in the model_specs is handed to the __init__
method as a dictionary. If
arguments need to be handed to other methods, the arguments from __init__
can be
saved as class attributes. It is important that any custom __init__
method initializes
the parent classes with super().__init__()
.
def __init__(self, metric_function):
super().__init__()
self._metric_function = metric_function
The _build_model
method specifies the components of the model. It is here that
specific layers of the model are defined. Here, the PyTorch Documentation
is more useful for defining the method. Likewise, the forward
method specifies
the pattern the data takes through the components defined in the _build_model
method. Implementing these classes together builds the architecture of the network.
def _build_model(self):
self.in_layer = nn.Linear(20, 40)
self.dropout = nn.Dropout(p=0.2)
self.out_layer = nn.Linear(40, 1)
def forward(self, x):
x = F.Relu(self.in_layer(x))
x = self.dropout(x)
x = self.out_layer(x)
return x
The setup_loaders
method is named for its functionality and sets up the PyTorch
dataloaders for the model. It loads the training and testing data and runs the
scaling on the features before making dataloaders for train, validation and test.
Customizing this method gives the user the ability to define their own data generators
and to implement their own preprocessing steps. PyTorch has more information on their
dataloaders on this page.
The key to the setup_loaders
method is that it takes the FL step as an input.
This means that all methods in OctaiPipe’s PipelineStep and FL Train Step are
available within the setup_loaders
method. The FL step has the training data configurations
stored in self._sources
and evaluation data configs stored in self._eval_sources
,
which can be used with the _load_data
method to get data from a datasource.
Thus, the user can define their own data ingestion functions within a custom
setup_loaders
. It is worth noting that the _load_data
method uses self._sources
to load data. To get evaluation data, assign self._eval_sources
as self._sources
and use _load_data
.
def setup_loaders(self, train_step, target_label):
X_train, X_test, y_train, y_test = train_step.load_datasets(target_label)
def make_loader(X, y):
X = X.to_numpy(dtype=np.float64)
y = y.to_numpy(dtype=np.float64)
tensor_x = torch.Tensor(X)
tensor_y = torch.Tensor(y)
data = TensorDataset(tensor_x, tensor_y)
loader = DataLoader(data, batch_size=16, shuffle=False)
return loader
split_idx = X_train.sample(frac=0.9).index
X_val = X_train.copy().loc[~split_idx]
y_val = y_train.copy().loc[~split_idx]
X_train = X_train.copy().loc[split_idx]
y_train = y_train.copy().loc[split_idx]
trainloader = make_loader(X_train, y_train)
valloader = make_loader(X_val, y_val)
testloader = make_loader(X_test, y_test)
return trainloader, valloader, testloader
The train_model
and test_model
methods implement the training and testing of
the model by running forward and backward propagation. This is where the loss function
and optimizers are implemented. For FL, the train_model
method is called in the
FL client’s fit
method and the test_model
method is called in the evaluate
method. This is the process runs federated learning.
def train_model(self, trainloader, valloader):
opt = torch.optim.SGD(self.parameters(), momentum=.9, lr=.00001)
criterion = nn.L1Loss()
for epoch in range(5):
train_loss = 0.0
for X_, y_ in trainloader:
X_, y_ = X_.to('cpu'), y_.to('cpu')
opt.zero_grad()
y_ = y_[:, None]
loss = criterion(self(X_), y_)
train_loss += loss
loss.backward()
opt.step()
val_loss, val_score = self.test_model(valloader)
print(f'Epoch {epoch}. Train loss: {train_loss}.'
f'Validation loss: {val_loss}. Validation score: {val_score}')
def test_model(self, testloader):
loss = 0.0
criterion = nn.L1Loss()
predictions = []
y_true = []
with torch.no_grad():
for X_, y_ in testloader:
X_, y_ = X_.to('cpu'), y_.to('cpu')
y_pred = self(X_)
y_ = y_[:, None]
loss += criterion(y_pred, y_).item()
predictions.append(y_pred.data)
y_true.append(y_.data)
score = self._metric_function(y_true, predictions)
return loss, score
Note
If you want to use the FedProx aggregation strategy with your PyTorch model, you need to add a clause to the train_model
method to include a proximal term in the loss function during training. This adjustment is necessary to account for the proximal term required by FedProx,
as described in the paper Federated Optimization in Heterogeneous Networks. The loss function modification would look like this:
def train_model(self, trainloader, valloader, proximal_mu: float | None):
global_params = copy.deepcopy(self).parameters()
opt = torch.optim.SGD(self.parameters(), momentum=.9, lr=.00001)
criterion = nn.L1Loss()
for epoch in range(5):
train_loss = 0.0
for X_, y_ in trainloader:
X_, y_ = X_.to('cpu'), y_.to('cpu')
opt.zero_grad()
y_ = y_[:, None]
proximal_term = 0.0
loss = criterion(self(X_), y_)
if proximal_mu is not None:
for local_weights, global_weights in zip(self.parameters(), global_params):
proximal_term += (local_weights - global_weights).norm(2)
loss += (proximal_mu / 2) * proximal_term
train_loss += loss
loss.backward()
opt.step()
val_loss, val_score = self.test_model(valloader)
print(f'Epoch {epoch}. Train loss: {train_loss}.'
f'Validation loss: {val_loss}. Validation score: {val_score}')
The proximal_mu value will be passed to the train_model
function automatically if using OctaiPipe’s PyTorch client and the FedProx strategy.
3. Using a custom model for FL in OctaiPipe#
To use a custom model for FL in OctaiPipe, the user needs to assign the model a unique
model type and point to the Python file with the model class in the model_specs
in the FL configs when running OctaiFL. An example is provided below:
1 model_specs:
2 type: improved_pytorch_model
3 load_existing: false
4 name: degradation_regressor_for_windmill
5 model_load_specs:
6 version: '000'
7 from_blob: false
8 model_params:
9 metric_function: custom_metric_function
10 custom_model:
11 file_path: path/to/model_class.py
Here, we have defined a type
that does not already exist in OctaiPipe. If this
is done, and there is not already a model with that type in the database, OctaiFL
expects the user to provide custom_model
specs. Here, the file_path
to the
model class is provided. If the model already exists, a new record will be created
with an incremented version number. If no model exists, version number will be
assigned as 1.
In model_load_specs, from_blob
is set to False
. This variable defines whether
a new version of a model that already exists on a device should be pulled from blob.
In other words, if this is false and we have already pulled a model with the same
name, the model will be taken from local storage on the device. To force pulling a
new model from blob, set this to True
.