Federated Learning - Anomaly Detection#
This tutorial goes through how to train a time-series anomaly detection model using federated learning. The dataset used is the Water Quality dataset from the GECCO Challenge 2018 used in for example Lai, et al. (2021) for time-series anomaly detection benchmarking.
Two models will be trained. One using the built-in OctaiPipe base PyTorch model and one using a custom model architecture.
The guide goes through the following steps:
Register devices#
If you haven’t already, register your edge devices/computers via OctaiPipe web portal. Search “register devices” in the doucmentation. This example usus influxdb as a data source, but you can configure any data source you prefer.
Load Anomaly Detection data and split among devices#
In this step we load the dataset, split it in two and distribute it across the two devices to simulate a federated context. In a real-world setting, the data would already be stored on each device separately. We also change the timestep to start on January 1st 2023.
The dataset is available from the GECCO challenge 2018. It is also available in the Tutorials folder in the OctaiPipe Jupyter instance. If you are running this from the Tutorials folder it should run without any further edits.
The dataset consists of 9 features, such as pH level and temperature as well as a target variable coding whether a significant event is happening or not. Our goal is to define a model which detect anomalies in water quality.
We load both the train and test data here separately and export them to the devices.
[ ]:
from octaipipe.helper_funcs.example_utils import load_gecco
# Load dataset
trainpath: str = './waterDataTraining.RDS'
data_train = load_gecco(trainpath)
# Split into two datasets
data_train.set_index('_time', inplace=True)
data_train_0 = data_train.iloc[:int(data_train.shape[0]/2),:].copy()
data_train_1 = data_train.iloc[int(data_train.shape[0]/2):,:].copy()
[ ]:
data_train_1.head(3)
[ ]:
data_train_1.tail(3)
[ ]:
from octaipipe.helper_funcs.example_utils import write_data_to_devices
bucket: str = "test-bucket"
measurement: str = "water-quality"
write_data_to_devices(devices='demo-device-0',
data=data_train_0,
bucket=bucket,
measurement=measurement)
write_data_to_devices(devices='demo-device-1',
data=data_train_1,
bucket=bucket,
measurement=measurement)
[ ]:
from octaipipe.helper_funcs.example_utils import load_gecco
# Load dataset
testpath: str = './waterDataTesting.RDS'
data_test = load_gecco(testpath)
# Split into two datasets
data_test.set_index('_time', inplace=True)
data_test_0 = data_test.iloc[:int(data_test.shape[0]/2),:].copy()
data_test_1 = data_test.iloc[int(data_test.shape[0]/2):,:].copy()
[ ]:
data_test_1.head(2)
[ ]:
data_test_0.tail(2)
[ ]:
from octaipipe.helper_funcs.example_utils import write_data_to_devices
bucket: str = "test-bucket"
measurement: str = "water-quality"
write_data_to_devices(devices='demo-device-0',
data=data_test_0,
bucket=bucket,
measurement=measurement)
write_data_to_devices(devices='demo-device-1',
data=data_test_1,
bucket=bucket,
measurement=measurement)
Set up FL config#
Next we will set up the Federated Learning configuration file. For federated learning, like with any feature of OctaiPipe, we declare two main things: (1) what to run and (2) how to run it. In this case, federated learning is the what and the configuration file is the how. The OctaiPipe documentation goes through FL in greater detail, but we will still go through how to fill out the config step-by-step in this guide.
The FL config file is a YAML file consisting of 7 main fields: name
, infrastructure
, input_data_specs
, evaluation_data_specs
, model_specs
and run_specs
.
The configuration file we will use for this example can be found in the configs
folder in the the Jupyter Tutorials
folder. It is also pasted below.
Name#
In config: name
This is the name of the file, and can technically be anything, but for simplicity’s sake we have named the FL config federated_learning
.
Infrastructure#
In config: infrastructure
This section specifies overall infrastructure of the FL exeperiment, in particular:
device_ids
: list of device IDs in registry of devices to run FL ondevice_group
: device group(s) to run FL on. This can be str of one group or list of manyimage_name
: the octaipipe image to run FL against. Defaults to latest torch imageserver_image
: the image to use for the FL server. Defaults to latest imageenv
: dictionary of environment variable names and values to set on devices
Input Data Specs#
In config: input_data_specs
This is the specifications for the input data of the FL model. To get details on input specs in general, see data layer documantation. The FL configurations differ from the regular OctaiPipe data specs in that they can be specified for each device individually. This is to allow for more heterogenous datasets on the devices.
Under input_data_specs
, the user can define diffrent specs per device in the devices
list. If a device is not found in the list, the data specs for the default
device will be used.
If the same specs should be used for every device, data specs can be defined as for a regular OctaiStep.
Evaluation data specs#
In config: evaluation_data_specs
This works the same way as the input data specs but defines the portion of data on the device to use for evaluating the model each training round.
Model specs#
In config: model_specs
The specifications for the model to use. Details on FL models in OctaiPipe can be found in the custom model documentation.
The fields in model specs are as follows:
type
: model type, e.g. base_pytorch for OctaiPipe base PyTorch modelname
: name of model (how it will be saved in DB)model_params
: input parameters to initializing the model, e.g. batch_sizecustom_model
: whether to use a custom FL model, specified in Python file_pathfile_path
: local path to model file to use
Run specs#
In config: run_specs
The runtime specifications for the FL step.
target_label
: the name(s) of the target variable. Can be string or list of strings for multiclass. Leave empty for unsupervised modelscycle_id
: ID for sequenced data, if not None, splits data for train-val by cycles not rowsbackend
: the backend to use for FL, e.g. pytorch, sklearn, xgboost
YAML config#
Below is a filled out config that will work for this tutorial. Create a folder called ‘configs’ and save the below yaml file as ‘fl_config.yml’. You can choose any filename you’d like, just make sure to reference it correctly in the jupyter cells below.
./configs/fl_config.yml
name: federated_learning
infrastructure:
device_ids: [demo-device-0, demo-device-1]
device_group:
image_name: octaipipe.azurecr.io/octaipipe-all_data_loaders:saferai-197-sandbox-vms-in-docs
server_image: octaipipe.azurecr.io/fl_server:latest
env: {}
input_data_specs:
devices:
- data_converter: {}
datastore_type: influxdb
device: demo-device-0
query_template_path: ./configs/influx_query.txt
query_type: dataframe
query_values:
bucket: test-bucket
measurement: water-quality
start: '2023-01-01T00:00:00.000Z'
stop: '2023-02-19T00:00:01.000Z'
tags: {}
- data_converter: {}
datastore_type: influxdb
device: demo-device-1
query_template_path: ./configs/influx_query.txt
query_type: dataframe
query_values:
bucket: test-bucket
measurement: water-quality
start: '2023-02-19T00:00:01.000Z'
stop: '2023-04-08T08:00:00.000Z'
tags: {}
evaluation_data_specs:
devices:
- data_converter: {}
datastore_type: influxdb
device: demo-device-0
query_template_path: ./configs/influx_query.txt
query_type: dataframe
query_values:
bucket: test-bucket
measurement: water-quality
start: '2023-04-08T08:00:00.000Z'
stop: '2023-05-26T19:00:00.000Z'
tags: {}
- data_converter: {}
datastore_type: influxdb
device: demo-device-1
query_template_path: ./configs/influx_query.txt
query_type: dataframe
query_values:
bucket: test-bucket
measurement: water-quality
start: '2023-05-26T19:00:00.000Z'
stop: '2023-07-14T06:00:00.000Z'
tags: {}
model_specs:
type: base_torch
name: water_quality_ad
model_params:
loss_fn: cross_entropy
scaling: standard
metric: f1_score
epochs: 10
batch_size: 32
run_specs:
target_label: event
cycle_id:
backend: pytorch
Additionally, create a file ./configs/influx_query.txt
with the following contents:
from(bucket:[bucket])
|> range(start: [start], stop: [stop])
|> filter(fn:(r) => r._measurement == [measurement])
|> filter(fn:(r) => [fields])
|> keep(columns: ["_time", "_field", "_value"])
|> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
|> drop(columns: ["_start", "_stop", "_batch",
"table", "result", "_measurement",
"_result", "id"])
Train base PyTorch FL model#
Next, we run FL training with the configured base OctaiPipe PyTorch model.
Running FL can be easily done with just a few commands. In this example, we also explicitly set the FL Strategy so that we start running training once two devices have connected to the server. This is already the default configuration but we set it here as an example.
[ ]:
import logging
from octaipipe.federated_learning.run_fl import OctaiFL
logging.basicConfig(level=logging.INFO, format='%(message)s')
config_path = './configs/fl_config.yml'
strategy = {'min_available_clients': 2}
fl = OctaiFL(config_path)
fl.strategy.set_config(strategy)
fl.run()
We can view the results of FL by looking at three main registries: experiments, models, and evaluation.
Use the experiment ID from the FL logs to view the experiment and model record, and then use the model ID from the model record to get the evaluation data.
[ ]:
from octaipipe import experiments
exps = experiments.get_experiment_by_id('35a57e30')
exps
[ ]:
from octaipipe import models
mods = models.find_models_by_experiment_id('35a57e30')
mods
[ ]:
from octaipipe_core.client.evaluation_client import EvaluationClient
import pandas as pd
eval_client = EvaluationClient()
evals = eval_client.get_evaluation_model_id('5b386bc2')
pd.DataFrame(evals).sort_values('communicationRound')
Define custom model architecture#
As opposed to many hats, machine learning models are seldom one-size fits all; users will often need to define their own neural network architectures and data processing utils to fit their dataset or solution. In order to enable this, OctaiPipe allows users to define custom models.
A custom model is defined as a class in a Python file. For details, see the custom model documentation.
The custom model we define here implements a custom model architecture using the _build_model
and forward
methods. It also implements a custom data loading mechanism by adding lagged features and undersampling to make the dataset less imbalanced.
The contents of the custom model file can be found in the Tutorials folder in configs/dnn_ad_model.py
Train custom FL model#
To train the custom model we have built, we edit the FL config file in the model_specs
section. The model_specs
for the custom model are below:
model_specs:
type: custom_ad_torch
name: water_quality_ad_custom
model_params:
loss_fn: cross_entropy
scaling: standard
metric: f1_score
epochs: 100
batch_size: 32
num_lags: 3
custom_model:
file_path: ./configs/dnn_ad_model.py
Note that we have changed the model type
to our own custom model type. We have added the custom_model
section to specify the local file_path
of the Python file containing the model class. We have also added the extra num_lags
argument in model_params
as this gets handed to the __init__()
method of the model class.
To run FL, we generate a new FL config file from the previous one and simply run OctaiFL as before.
[ ]:
import yaml
with open('./configs/fl_config.yml', 'r') as f:
config = yaml.safe_load(f)
model_specs = {
'type': 'custom_ad_torch',
'name': 'water_quality_ad_custom',
'model_params': {
'loss_fn': 'cross_entropy',
'scaling': 'standard',
'metric': 'f1_score',
'epochs': 100,
'batch_size': 32,
'num_lags': 3
},
'custom_model': {
'file_path': './dnn_ad_model.py'
}
}
config['model_specs'] = model_specs
with open('./configs/fl_config_custom_model.yml', 'w') as f:
yaml.safe_dump(config, f)
[ ]:
from octaipipe.federated_learning.run_fl import OctaiFL
config_path: str = './configs/fl_config_custom_model.yml'
strategy = {'min_available_clients': 2}
fl = OctaiFL(config_path, strategy)
fl.run()
Now, let’s again look at the experiment, model and evaluation in the registry.
[ ]:
from octaipipe import experiments
exps = experiments.get_experiment_by_id('cc35e5a7')
exps
[ ]:
from octaipipe import models
mods = models.find_models_by_experiment_id('cc35e5a7')
mods
[ ]:
from octaipipe_core.client.evaluation_client import EvaluationClient
import pandas as pd
eval_client = EvaluationClient()
evals = eval_client.get_evaluation_model_id('c4292d2b')
pd.DataFrame(evals).sort_values('communicationRound')
Results for this differ slightly every time given how the network initializes, but this solution would normally result in an f1-score of around 0.03-0.04. Comparing this to the submissions of the GECCO challenge 2018, we place somewhere around 10th. The aim here is not to win the competition, but rather to show how very simple adjustments to model architecture and data processing can improve model performance.
If you want to see if you can do better, try to edit the architecture or data manipulation in configs/dnn_ad_model.py
.
Hint: Swap out the downsampling to focal loss criterion or consider using convolutional layers instead of fully connected layers.