Tutorial - Running FL XGBoost with OctaiPipe#

In this tutorial I will take you through a standard Federated Learning deployment using XGBoost tree models. The data used is a small subset of the higgs dataset which can be downloaded here. The data in this tutorial will be static (i.e. will not update throughout training).

We will go through the following steps:

  1. Download and split data

  2. Send data to devices

  3. Introduce problematic data

Note on transferring tutorial data to target devices > To facilitate straightforward data addition to target devices for this tutorial, Step 2 is optional and enables automatic data transfer via SSH. If you have SSH access to the target devices, you can use this step to send the split data produced in step one directly. If you prefer to add data manually or lack SSH access, transfer the split data chunks manually to each device. Ensure the train and test chunks for each device, produced in Step 1 and located in the ./datasets/devices directory, are placed at /tmp/higgs_train_data.csv and /tmp/higgs_test_data.csv on the devices respectively.

Step 1 - Download and split data#

Here the higgs dataset is split into n chunks where n is the number of target devices.

[ ]:
# Update this with the number of target devices you'll be running on
number_of_devices: int = 0
[ ]:
# Download the dataset, split it and send to the devices
import pandas as pd
import numpy as np

assert number_of_devices, "Please update the number_of_devices variable with the number of devices you'll be running on"
! mkdir -p datasets/devices/
! wget -c https://octaipipe.blob.core.windows.net/higgs-dataset/higgs_data.tar.gz -P /tmp/
! tar -xvf /tmp/higgs_data.tar.gz -C datasets

train_data = pd.read_csv('datasets/higgs_train_data.csv')
test_data = pd.read_csv('datasets/higgs_test_data.csv')

train_chunks = np.array_split(train_data, number_of_devices)
test_chunks = np.array_split(test_data, number_of_devices)

# write all chunks to ./datasets/devices
for i in range(number_of_devices):
    train_chunks[i].to_csv(f'datasets/devices/train_data_{i}.csv', index=False)
    test_chunks[i].to_csv(f'datasets/devices/test_data_{i}.csv', index=False)

print('Data has been downloaded and split into chunks for each device.\nContents of ./datatsets/devices/:')
! ls -ltr datasets/devices/

Step 2 - Send data to devices#

This is only possible if you have SSH access to the target devices

You will need to define devices you will use for this tutorial and their credentials as we will be sending data via ssh connection.

These details are not required by OctaiPipe but if you’d like to speed up the process of sending train and test datasets to your devices for this tutorial you will need to fill out the devices dictionary with the credentials of each of the devices you’d like to use.

The devices dictionary below should use the following the format:

devices = {
    <device_id>: {
        'ip': <device's external IP>,
        'user': <user to log into device>
        'password': <password for logging into device>
    }, ...
}
  • The device_id should correspond to the device ID on the devices page on the front end - it is the device Id given when registering the devices. (If you have not registered any devices yet, you will need to do so).

  • The device’s ip can be retrieved by running curl ifconfig.me on the device.

  • The user and password are the details used to log into the device

Device connections are checked in the cell below, if any are not able to connect, please investigate before proceeding.

For this you will need to install sshpass apt-get -y install sshpass

[ ]:
# replace with real values (can define any number of devices)
devices = {
    'test-device-0': {
        'ip': 'XXX.XXX.XXX.XX',
        'user': 'octaipipe',
        'password': 'password'
    },
    'test-device-1': {
        'ip': 'XXX.XXX.XXX.XX',
        'user': 'octaipipe',
        'password': 'password'
    }
}

for device, creds in devices.items():
    online = 'online'
    devices[device]['ssh_command'] = f"sshpass -p {creds['password']} ssh -T -o StrictHostKeyChecking=no {creds['user']}@{creds['ip']}"
    devices[device]['scp_command'] = f"sshpass -p {creds['password']} scp -o StrictHostKeyChecking=no"
    result = ! {devices[device]['ssh_command']} echo {online}
    # ! {devices[device]['ssh_command']} mkdir -p /tmp
    print(device, '\t', result)
print(f'\nIf any devices are not "{online}" you must troubleshoot the connection before proceeding.')

The train and test data will be sent to each of the devices at the paths /tmp/higgs_train_data.csv and /tmp/higgs_test_data.csv respectively. Check the output of the cells to enure there are no issues downloading and sending the datasets.

[ ]:
import os


def send_df_as_csv_file(device_index, creds):
    train_destination_path = f"/tmp/higgs_train_data.csv"
    test_destination_path = f"/tmp/higgs_test_data.csv"

    train_chunk =  f'./datasets/devices/train_data_{device_index}.csv'
    test_chunk = f'./datasets/devices/test_data_{device_index}.csv'

    assert os.path.exists(train_chunk), f"File {train_chunk} does not exist. Ensure you have done Step 1"
    assert os.path.exists(test_chunk), f"File {test_chunk} does not exist. Ensure you have done Step 1"

    full_train_dest_path = f"{creds['user']}@{creds['ip']}:{train_destination_path}"
    full_test_dest_path = f"{creds['user']}@{creds['ip']}:{test_destination_path}"

    destinations = [full_train_dest_path, full_test_dest_path]
    files = [train_chunk, test_chunk]

    for file_path, destination in zip(files, destinations):
        result = ! {creds['scp_command']} {file_path} {destination}
        if len(result) != 0:
            print(f'Potential issue sending file: {result}')
        else:
            print(f'Successfully sent file to {destination}.')

for idx, (device, creds) in enumerate(devices.items()):
    print(f"Sending train and test files to {device}. May take time depending on your connection ...")
    send_df_as_csv_file(idx, creds)

Step 3 - Running FL#

Now the devices are ready to take part in an FL experiment.

NOTE: before you run make sure that the devices section of the config file is updated to be a list of the device Ids you want to take part in the experiment.#

  • There is an example_xgboost_federated_learning.yml file in configs/ to familiarise yourself with how the federated learning config looks.

  • There is also a xgboost_federated_learning.yml which will be used to run the experiment. Feel free to play with this file, try out different configurations of input/output specs, model params, and strategies

[ ]:
import yaml
# Display federated learning config
with open("configs/example_xgboost_federated_learning.yml", 'r') as file:
    inference_config = yaml.safe_load(file)
print(yaml.dump(inference_config, sort_keys=False))
[ ]:
import logging
import os

os.environ['OCTAIPIPE_DEBUG'] = 'true'
logging.basicConfig(level=logging.INFO, format='%(message)s')

# For more verbose logs, uncomment the following line
# logging.basicConfig(level=logging.DEBUG, format='%(message)s')

Set up the octaifl context by passing the config file, name and escription to OctaiFl

[3]:
from octaipipe.federated_learning.run_fl import OctaiFL

federated_learning_config = 'configs/xgboost_federated_learning.yml'

octaifl = OctaiFL(
    federated_learning_config,
    deployment_name='FL XGBoost tutorial deployment',
    deployment_description='Deployment part of FL XGBoost tutorial'
    )

You can check the current strategy by running the cell below and update it based on your requirements. (see the docs for more on strategy settings.

[ ]:
octaifl.strategy.get_config()

FL XGBoost in OctaiPipe is able to perform multiple training rounds on the device before model aggregation. In order to set this, I will set the num_local_rounds option in the strategy to 2.

If I wanted to reduce the impact of imbalanced dataset sizes on the devices I would set the normalized_learning_rate to True in the same way. In this tutorial, the datsets sent to devices are all the same size.

[ ]:
strategy = {
    'num_rounds': 20, "num_local_rounds": 2
}

octaifl.strategy.set_config(strategy)
octaifl.strategy.get_config()

Finally we can run the experiment

[ ]:
octaifl.run()

Checking the processes#

There are now two processes running, the server and the clients.

To dig depeer you can explore the fl server kubernetes deployment by finding its details via kubectl -n colab get all

You can also log into the device to get the client deployment logs by running docker logs -f <id of container>