Federated Learning - Anomaly Detection#

This tutorial goes through how to train a time-series anomaly detection model using federated learning. The dataset used is the Water Quality dataset from the GECCO Challenge 2018 used in for example Lai, et al. (2021) for time-series anomaly detection benchmarking.

Two models will be trained. One using the built-in OctaiPipe base PyTorch model and one using a custom model architecture.

The guide goes through the following steps:

  1. Register your devices to the device database

  2. Install infrastructure on your devices (including an influx instance)

  3. Load Anomaly Detection data and split among devices

  4. Set up FL config

  5. Train base PyTorch FL model

  6. Define custom model architecture

  7. Train custom FL model

Register devices to database#

For this guide, we assume that you have a basic understanding of device registration (see for example the end-to-end example).

In this example, we will use the two allocated sandbox Virtual Machines and split the dataset over these two devices. If you wish to use your own devices, follow the end-to-end example and device registration documentation.

The VMs used shut down at 8pm GMT every day but can be started at any time and will run until the clock next hits 8pm. Running the function will return the current device records in the database. To use them in this tutorial, pull the device record using device_client.get_devices(<insert-device-id>) and update the devices, changing only the influxOrg, influxOrgId, influxUrl, influxToken and grafanaUrl.

[ ]:
from octaipipe.helper_funcs.example_utils import start_sandbox_vms
from octaipipe.client.device_client import DeviceClient


# Initialize device client
dev_client = DeviceClient()

# Start sandbox VMs
device_records: list = start_sandbox_vms()

# Update device records for each sandbox VM
# credentials come from infrastructure install step below
for device in device_records:
    new_device_data = dict()
    new_device_data['deviceId'] = device['deviceId']
    new_device_data['influxOrg'] = 'testOrg'
    new_device_data['influxUrl'] = f'http://{device["ip"]}:8086/'
    new_device_data['influxToken'] = 'token123asd123asd'
    new_device_data['influxOrgId'] = 'temp'
    new_device_data['grafanaUrl'] = f'http://{device["ip"]}:3000/'

    dev_client.update_devices(new_device_data)

Install infrastructure on your devices#

This step uses the device client to install relevant infrastructure on each device. This includes things like curl, docker, docker-compose, and necessary Python packages needed outside the OctaiPipe docker images.

To show how to connect to devices and send custom commands, we also install InfluxDB using docker on the devices.

[ ]:
from octaipipe.client.device_client import DeviceClient


dev_client = DeviceClient()
device_list: list = ['octaipipe-sandbox-device-0',
                     'octaipipe-sandbox-device-1']

custom_cmd = '''
docker run -p 8086:8086 --name influxdb -v influxdb2:/var/lib/influxdb2 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=testUser \
-e DOCKER_INFLUXDB_INIT_PASSWORD=badpass123 \
-e DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=token123asd123asd \
-e DOCKER_INFLUXDB_INIT_ORG=testOrg \
-e DOCKER_INFLUXDB_INIT_BUCKET=test-bucket -d influxdb:latest
'''

for dev in device_list:
    dev_client.install_infrastructure(dev)
    dev_client.connect_device(dev)
    dev_client.execute_command(custom_cmd)

If you have already installed infrastructure on the devices and added them to the database, you can simply run the following command on the devices to start the Influx instances again.

[ ]:
from octaipipe.client.device_client import DeviceClient


dev_client = DeviceClient()
device_list: list = ['octaipipe-sandbox-device-0',
                     'octaipipe-sandbox-device-1']

custom_cmd = 'docker start influxdb'

for dev in device_list:
    dev_client.connect_device(dev)
    dev_client.execute_command(custom_cmd)

Load Anomaly Detection data and split among devices#

In this step we load the dataset, split it in two and distribute it across the two devices to simulate a federated context. In a real-world setting, the data would already be stored on each device separately. We also change the timestep to start on January 1st 2023.

The dataset is available from the GECCO challenge 2018. It is also available in the Tutorials folder in the OctaiPipe Jupyter instance. If you are running this from the Tutorials folder it should run without any further edits.

The dataset consists of 9 features, such as pH level and temperature as well as a target variable coding whether a significant event is happening or not. Our goal is to define a model which detect anomalies in water quality.

We load both the train and test data here separately and export them to the devices.

[ ]:
from octaipipe.helper_funcs.example_utils import load_gecco


# Load dataset
trainpath: str = './waterDataTraining.RDS'
data_train = load_gecco(trainpath)

# Split into two datasets
data_train.set_index('_time', inplace=True)
data_train_0 = data_train.iloc[:int(data_train.shape[0]/2),:].copy()
data_train_1 = data_train.iloc[int(data_train.shape[0]/2):,:].copy()
[ ]:
from octaipipe.helper_funcs.example_utils import write_data_to_devices


bucket: str = "test-bucket"
measurement: str = "water-quality"

write_data_to_devices(devices='octaipipe-sandbox-device-0',
                      data=data_train_0,
                      bucket=bucket,
                      measurement=measurement)

write_data_to_devices(devices='octaipipe-sandbox-device-1',
                      data=data_train_1,
                      bucket=bucket,
                      measurement=measurement)
[ ]:
from octaipipe.helper_funcs.example_utils import load_gecco


# Load dataset
testpath: str = './waterDataTesting.RDS'
data_test = load_gecco(testpath)

# Split into two datasets
data_test.set_index('_time', inplace=True)
data_test_0 = data_test.iloc[:int(data_test.shape[0]/2),:].copy()
data_test_1 = data_test.iloc[int(data_test.shape[0]/2):,:].copy()
[ ]:
from octaipipe.helper_funcs.example_utils import write_data_to_devices


bucket: str = "test-bucket"
measurement: str = "water-quality"

write_data_to_devices(devices='octaipipe-sandbox-device-0',
                      data=data_test_0,
                      bucket=bucket,
                      measurement=measurement)

write_data_to_devices(devices='octaipipe-sandbox-device-1',
                      data=data_test_1,
                      bucket=bucket,
                      measurement=measurement)

Set up FL config#

Next we will set up the Federated Learning configuration file. For federated learning, like with any feature of OctaiPipe, we declare two main things: (1) what to run and (2) how to run it. In this case, federated learning is the what and the configuration file is the how. The OctaiPipe documentation goes through FL in greater detail, but we will still go through how to fill out the config step-by-step in this guide.

The FL config file is a YAML file consisting of 7 main fields: name, infrastructure, input_data_specs, evaluation_data_specs, model_specs and run_specs.

The configuration file we will use for this example can be found in the configs folder in the the Jupyter Tutorials folder. It is also pasted below.

Name#

In config: name

This is the name of the file, and can technically be anything, but for simplicity’s sake we have named the FL config federated_learning.

Infrastructure#

In config: infrastructure

This section specifies overall infrastructure of the FL exeperiment, in particular:

  • device_ids: list of device IDs in registry of devices to run FL on

  • device_group: device group(s) to run FL on. This can be str of one group or list of many

  • image_name: the octaipipe image to run FL against. Defaults to latest torch image

  • server_image: the image to use for the FL server. Defaults to latest image

  • env: dictionary of environment variable names and values to set on devices

Input Data Specs#

In config: input_data_specs

This is the specifications for the input data of the FL model. To get details on input specs in general, see data layer documantation. The FL configurations differ from the regular OctaiPipe data specs in that they can be specified for each device individually. This is to allow for more heterogenous datasets on the devices.

Under input_data_specs, the user can define diffrent specs per device in the devices list. If a device is not found in the list, the data specs for the default device will be used.

If the same specs should be used for every device, data specs can be defined as for a regular OctaiStep.

Evaluation data specs#

In config: evaluation_data_specs

This works the same way as the input data specs but defines the portion of data on the device to use for evaluating the model each training round.

Model specs#

In config: model_specs

The specifications for the model to use. Details on FL models in OctaiPipe can be found in the custom model documentation.

The fields in model specs are as follows:

  • type: model type, e.g. base_pytorch for OctaiPipe base PyTorch model

  • name: name of model (how it will be saved in DB)

  • model_params: input parameters to initializing the model, e.g. batch_size

  • custom_model: whether to use a custom FL model, specified in Python file_path

    • file_path: local path to model file to use

Run specs#

In config: run_specs

The runtime specifications for the FL step.

  • target_label: the name(s) of the target variable. Can be string or list of strings for multiclass. Leave empty for unsupervised models

  • cycle_id: ID for sequenced data, if not None, splits data for train-val by cycles not rows

  • backend: the backend to use for FL, e.g. pytorch, sklearn, xgboost

YAML config#

./configs/federated_learning_ad.yml

name: federated_learning

infrastructure:
  device_ids: [octaipipe-sandbox-device-0, octaipipe-sandbox-device-1]
  device_group:
  image_name: octaipipe.azurecr.io/master_torch:latest
  server_image: octaipipe.azurecr.io/fl_server:latest
  env: {}

input_data_specs:
  devices:
    - device: default
      datastore_type: influxdb
      query_type: dataframe
      query_template_path: ./configs/data/influx_query.txt
      query_values:
        start: "2023-02-18T21:52:00.000Z"
        stop: "2023-04-08T07:54:00.000Z"
        bucket: test-bucket
        measurement: water-quality
        tags: {}
      data_converter: {}
    - device: octaipipe-sandbox-device-0
      datastore_type: influxdb
      query_type: dataframe
      query_template_path: ./configs/data/influx_query.txt
      query_values:
        start: "2023-01-01T10:49:00.000Z"
        stop: "2023-02-18T21:51:00.000Z"
        bucket: test-bucket
        measurement: water-quality
        tags: {}
      data_converter: {}

evaluation_data_specs:
  devices:
    - device: default
      datastore_type: influxdb
      query_type: dataframe
      query_template_path: ./configs/data/influx_query.txt
      query_values:
        start: "2023-05-26T18:58:00.000Z"
        stop: "2023-07-14T06:00:00.000Z"
        bucket: test-bucket
        measurement: water-quality
        tags: {}
      data_converter: {}
    - device: octaipipe-sandbox-device-0
      datastore_type: influxdb
      query_type: dataframe
      query_template_path: ./configs/data/influx_query.txt
      query_values:
        start: "2023-04-08T07:55:00.000Z"
        stop: "2023-05-26T18:57:00.000Z"
        bucket: test-bucket
        measurement: water-quality
        tags: {}
      data_converter: {}

model_specs:
  type: base_torch
  name: water_quality_ad
  model_params:
    loss_fn: cross_entropy
    scaling: standard
    metric: f1_score
    epochs: 10
    batch_size: 32

run_specs:
  target_label: event
  cycle_id:
  backend: pytorch

Train base PyTorch FL model#

Next, we run FL training with the configured base OctaiPipe PyTorch model.

Running FL can be easily done with just a few commands. In this example, we also explicitly set the FL Strategy so that we start running training once two devices have connected to the server. This is already the default configuration but we set it here as an example.

[ ]:
from octaipipe.federated_learning.run_fl import OctaiFL


config_path = './configs/federated_learning_ad.yml'
strategy = {'min_available_clients': 2}

fl = OctaiFL(config_path, strategy)
fl.strategy.set_config()
fl.run()

We can view the results of FL by looking at three main registries: experiments, models, and evaluation.

Use the experiment ID from the FL logs to view the experiment and model record, and then use the model ID from the model record to get the evaluation data.

[ ]:
from octaipipe import experiments


exps = experiments.get_experiment_by_id('INSERT_EXP_ID')
exps
[ ]:
from octaipipe import models


mods = models.find_models_by_experiment_id('INSERT_EXP_ID')
mods
[ ]:
from octaipipe.client.evaluation_client import EvaluationClient
import pandas as pd


eval_client = EvaluationClient()
evals = eval_client.get_evaluation_model_id('INSERT_MODEL_ID')
pd.DataFrame(evals).sort_values('communicationRound')

Define custom model architecture#

As opposed to many hats, machine learning models are seldom one-size fits all; users will often need to define their own neural network architectures and data processing utils to fit their dataset or solution. In order to enable this, OctaiPipe allows users to define custom models.

A custom model is defined as a class in a Python file. For details, see the custom model documentation.

The custom model we define here implements a custom model architecture using the _build_model and forward methods. It also implements a custom data loading mechanism by adding lagged features and undersampling to make the dataset less imbalanced.

The contents of the custom model file can be found in the Tutorials folder in configs/dnn_ad_model.py

Train custom FL model#

To train the custom model we have built, we edit the FL config file in the model_specs section. The model_specs for the custom model are below:

model_specs:
  type: custom_ad_torch
  name: water_quality_ad_custom
  model_params:
    loss_fn: cross_entropy
    scaling: standard
    metric: f1_score
    epochs: 100
    batch_size: 32
    num_lags: 3
  custom_model:
    file_path: ./configs/dnn_ad_model.py

Note that we have changed the model type to our own custom model type. We have added the custom_model section to specify the local file_path of the Python file containing the model class. We have also added the extra num_lags argument in model_params as this gets handed to the __init__() method of the model class.

To run FL, we generate a new FL config file from the previous one and simply run OctaiFL as before.

[ ]:
import yaml


with open('./configs/federated_learning_ad.yml', 'r') as f:
    config = yaml.safe_load(f)

model_specs = {
    'type': 'custom_ad_torch',
    'name': 'water_quality_ad_custom',
    'model_params': {
        'loss_fn': 'cross_entropy',
        'scaling': 'standard',
        'metric': 'f1_score',
        'epochs': 100,
        'batch_size': 32,
        'num_lags': 3
    },
    'custom_model': {
        'file_path': './configs/dnn_ad_model.py'
    }
}

config['model_specs'] = model_specs

with open('./configs/federated_learning_ad_custom.yml', 'w') as f:
    yaml.safe_dump(config, f)
[ ]:
from octaipipe.federated_learning.run_fl import OctaiFL


config_path: str = './configs/federated_learning_ad_custom.yml'
strategy = {'min_available_clients': 2}

fl = OctaiFL(config_path, strategy)
fl.run()

Now, let’s again look at the experiment, model and evaluation in the registry.

[ ]:
from octaipipe import experiments


exps = experiments.get_experiment_by_id('INSERT_EXP_ID')
exps
[ ]:
from octaipipe import models


mods = models.find_models_by_experiment_id('INSERT_EXP_ID')
mods
[ ]:
from octaipipe.client.evaluation_client import EvaluationClient
import pandas as pd


eval_client = EvaluationClient()
evals = eval_client.get_evaluation_model_id('INSERT_MODEL_ID')
pd.DataFrame(evals).sort_values('communicationRound')

Results for this differ slightly every time given how the network initializes, but this solution would normally result in an f1-score of around 0.03-0.04. Comparing this to the submissions of the GECCO challenge 2018, we place somewhere around 10th. The aim here is not to win the competition, but rather to show how very simple adjustments to model architecture and data processing can improve model performance.

If you want to see if you can do better, try to edit the architecture or data manipulation in configs/dnn_ad_model.py.

Hint: Swap out the downsampling to focal loss criterion or consider using convolutional layers instead of fully connected layers.

[ ]: