Tutorial - Running FL XGBoost with OctaiPipe#

In this tutorial I will take you through a standard Federated Learning deployment using XGBoost tree models. The data used is a small subset of the higgs dataset. which will be static (i.e. will not update throughout training.

Step 1 - defining devices#

You will need to define devices you will use for this tutorial and their credentials as we will be sending data via ssh connection.

These details are not required by OctaiPipe but if you’d like to speed up the process of sending train and test datasets to your devices for this tutorial you will need to fill out the devices dictionary with the credentials of each of the devices you’d like to use.

[!NOTE] If you’d like to add the data to the device manually, you can download the sample higgs dataset here, split them and add two files /tmp/higgs_train_data.csv, /tmp/higgs_test_data.csv on each devices which will be used to train and evaluate the models. If you add the data yourself you can skip to step 3 - Running FL.

The devices dictionary below should use the following the format:

devices = {
    <device_id>: {
        'ip': <device's external IP>,
        'user': <user to log into device>
        'password': <password for logging into device>
    }, ...
}
  • The device_id should correspond to the device ID on the devices page on the front end - it is the device Id given when registering the devices. (If you have not registered any devices yet, you will need to do so. Check the documentation for instructions on how)

  • The device’s ip can be retrieved by running curl ifconfig.me on the device.

  • The user and password are the details used to log into the device

Device connections are checked in the cell below, if any are not able to connect, please investigate before proceeding.

[ ]:
# replace with real values (can define any number of devices)
devices = {
    'test-device-0': {
        'ip': 'XXX.XXX.XXX.XX',
        'user': 'octaipipe',
        'password': 'password'
    },
    'test-device-1': {
        'ip': 'XXX.XXX.XXX.XX',
        'user': 'octaipipe',
        'password': 'password'
    }
}

for device, creds in devices.items():
    online = 'online'
    devices[device]['ssh_command'] = f"sshpass -p {creds['password']} ssh -T -o StrictHostKeyChecking=no {creds['user']}@{creds['ip']}"
    devices[device]['scp_command'] = f"sshpass -p {creds['password']} scp -o StrictHostKeyChecking=no"
    result = ! sshpass -p {creds['password']} ssh -o StrictHostKeyChecking=no {creds['user']}@{creds['ip']} "echo {online}"
    print(device, '\t', result)
print(f'\nIf any devices are not "{online}" - troubleshoot.')

Step 2 - Download and send data to devices#

Here the higgs dataset is split into n chunks where n is the number of devces defined above. The train and test data will be sent to each of the devices at the paths /tmp/higgs_train_data.csv and /tmp/higgs_test_data.csv respectively.

Check the output of the cells to enure there are no issues downloading and sending the datasets.

[ ]:
# Download the dataset, split it and send to the devices
import pandas as pd
import numpy as np
import yaml

! mkdir -p datasets
! wget -c https://octaipipe.blob.core.windows.net/higgs-dataset/higgs_data.tar.gz -P /tmp/
! tar -xvf /tmp/higgs_data.tar.gz -C datasets

train_data = pd.read_csv('datasets/higgs_train_data.csv')
test_data = pd.read_csv('datasets/higgs_test_data.csv')

train_chunks = np.array_split(train_data, len(devices))
test_chunks = np.array_split(test_data, len(devices))
[ ]:
def send_df_as_csv_file(df, destination_file_path, device, creds):
    local_tmp_file = '/tmp/tmp.csv'
    df.to_csv(local_tmp_file, index=False)

    full_dest_path = f"{creds['user']}@{creds['ip']}:{destination_file_path}"

    result = ! {creds['scp_command']} {local_tmp_file} {full_dest_path}
    ! rm {local_tmp_file}
    if len(result) != 0:
        print(f'Potential issue sending file: {result}')
    else:
        print(f'Successfully sent file to {device}:{destination_file_path}.')

    return result


assert len(devices) == len(train_chunks) == len(test_chunks)
for idx, (device, creds) in enumerate(devices.items()):

    # Send test data and start live writing task
    train_destination_path = "/tmp/higgs_train_data.csv"
    result = send_df_as_csv_file(train_chunks[idx], train_destination_path, device, creds)

    # Send test data and start live writing task
    test_destination_path = "/tmp/higgs_test_data.csv"
    result = send_df_as_csv_file(test_chunks[idx], test_destination_path, device, creds)

Step 3 - Running FL#

Now it is time to start the FL deployment.

NOTE: before you run make sure that the devices section of the config file is updated to be a list of the device Ids you want to take part in the experiment.#

  • There is an example_xgboost_federated_learning.yml file in configs/ to familiarise yourself with how the federated learning config looks.

  • There is also a xgboost_federated_learning.yml which will be used to run the experiment. Feel free to play with this file, try out different configurations of input/output specs, model params, and strategies

[8]:
# Display federated learning config
with open("configs/example_xgboost_federated_learning.yml", 'r') as file:
    inference_config = yaml.safe_load(file)
print(yaml.dump(inference_config, sort_keys=False))
name: federated_learning
infrastructure:
  device_ids: []
  image_name: octaipipe.azurecr.io/octaipipe_lite-all_data_loaders:2.2.0
  server_image: octaipipe.azurecr.io/fl_server:2.2.0
input_data_specs:
  devices:
  - device: default
    datastore_type: csv
    settings:
      file_path: /tmp/higgs_train_data.csv
      delimiter: ','
      headers: true
      exclude: []
      index: []
evaluation_data_specs:
  devices:
  - device: default
    datastore_type: csv
    settings:
      file_path: /tmp/higgs_test_data.csv
      delimiter: ','
      headers: true
      exclude: []
      index: []
model_specs:
  load_existing: false
  name: xgboost_higgs_tutorial
  type: base_xgboost
  model_params:
    objective: binary:logistic
    eta: 0.15
    max_depth: 8
    eval_metric: auc
    nthread: 16
    num_parallel_tree: 1
    subsample: 1
    tree_method: hist
run_specs:
  backend: xgboost
  target_label: label
[ ]:
import logging
import os

os.environ['OCTAIPIPE_DEBUG'] = 'true'
logging.basicConfig(level=logging.INFO, format='%(message)s')

# For more verbose logs, uncomment the following line
# logging.basicConfig(level=logging.DEBUG, format='%(message)s')

Set up the octaifl context by passing the config file, name and escription to OctaiFl

[ ]:
from octaipipe.federated_learning.run_fl import OctaiFL

federated_learning_config = 'configs/xgboost_federated_learning.yml'

octaifl = OctaiFL(
    federated_learning_config,
    deployment_name='FL XGBoost tutorial deployment',
    deployment_description='Deployment part of FL XGBoost tutorial'
    )

You can check the current strategy by running the cell below and update it based on your requirements. (see the docs for more on strategy settings.

[ ]:
octaifl.strategy.get_config()

FL XGBoost in OctaiPipe is able to perform multiple training rounds on the device before model aggregation. In order to set this, I will set the num_local_rounds option in the strategy to 2.

If I wanted to reduce the impact of imbalanced dataset sizes on the devices I would set the normalized_learning_rate to True in the same way. In this tutorial, the datsets sent to devices are all the same size.

[ ]:
strategy = {
    'num_rounds': 20, "num_local_rounds": 2
}

octaifl.strategy.set_config(strategy)
octaifl.strategy.get_config()

Finally we can run the experiment

[ ]:
octaifl.run()

Checking the processes#

There are now two processes running, the server and the clients.

To dig depeer you can explore the fl server kubernetes deployment by finding its details via kubectl -n colab get all

You can also log into the device to get the client deployment logs by running docker logs -f <id of container>