Tutorial - Running FL XGBoost with OctaiPipe#
In this tutorial I will take you through a standard Federated Learning deployment using XGBoost tree models. The data used is a small subset of the higgs dataset. which will be static (i.e. will not update throughout training.
Step 1 - defining devices#
You will need to define devices you will use for this tutorial and their credentials as we will be sending data via ssh connection.
These details are not required by OctaiPipe but if you’d like to speed up the process of sending train and test datasets to your devices for this tutorial you will need to fill out the devices dictionary with the credentials of each of the devices you’d like to use.
[!NOTE] If you’d like to add the data to the device manually, you can download the sample higgs dataset here, split them and add two files
/tmp/higgs_train_data.csv
,/tmp/higgs_test_data.csv
on each devices which will be used to train and evaluate the models. If you add the data yourself you can skip tostep 3 - Running FL
.
The devices
dictionary below should use the following the format:
devices = {
<device_id>: {
'ip': <device's external IP>,
'user': <user to log into device>
'password': <password for logging into device>
}, ...
}
The
device_id
should correspond to the device ID on the devices page on the front end - it is the device Id given when registering the devices. (If you have not registered any devices yet, you will need to do so. Check the documentation for instructions on how)The device’s ip can be retrieved by running
curl ifconfig.me
on the device.The
user
andpassword
are the details used to log into the device
Device connections are checked in the cell below, if any are not able to connect, please investigate before proceeding.
[ ]:
# replace with real values (can define any number of devices)
devices = {
'test-device-0': {
'ip': 'XXX.XXX.XXX.XX',
'user': 'octaipipe',
'password': 'password'
},
'test-device-1': {
'ip': 'XXX.XXX.XXX.XX',
'user': 'octaipipe',
'password': 'password'
}
}
for device, creds in devices.items():
online = 'online'
devices[device]['ssh_command'] = f"sshpass -p {creds['password']} ssh -T -o StrictHostKeyChecking=no {creds['user']}@{creds['ip']}"
devices[device]['scp_command'] = f"sshpass -p {creds['password']} scp -o StrictHostKeyChecking=no"
result = ! sshpass -p {creds['password']} ssh -o StrictHostKeyChecking=no {creds['user']}@{creds['ip']} "echo {online}"
print(device, '\t', result)
print(f'\nIf any devices are not "{online}" - troubleshoot.')
Step 2 - Download and send data to devices#
Here the higgs dataset is split into n
chunks where n
is the number of devces defined above. The train and test data will be sent to each of the devices at the paths /tmp/higgs_train_data.csv
and /tmp/higgs_test_data.csv
respectively.
Check the output of the cells to enure there are no issues downloading and sending the datasets.
[ ]:
# Download the dataset, split it and send to the devices
import pandas as pd
import numpy as np
import yaml
! mkdir -p datasets
! wget -c https://octaipipe.blob.core.windows.net/higgs-dataset/higgs_data.tar.gz -P /tmp/
! tar -xvf /tmp/higgs_data.tar.gz -C datasets
train_data = pd.read_csv('datasets/higgs_train_data.csv')
test_data = pd.read_csv('datasets/higgs_test_data.csv')
train_chunks = np.array_split(train_data, len(devices))
test_chunks = np.array_split(test_data, len(devices))
[ ]:
def send_df_as_csv_file(df, destination_file_path, device, creds):
local_tmp_file = '/tmp/tmp.csv'
df.to_csv(local_tmp_file, index=False)
full_dest_path = f"{creds['user']}@{creds['ip']}:{destination_file_path}"
result = ! {creds['scp_command']} {local_tmp_file} {full_dest_path}
! rm {local_tmp_file}
if len(result) != 0:
print(f'Potential issue sending file: {result}')
else:
print(f'Successfully sent file to {device}:{destination_file_path}.')
return result
assert len(devices) == len(train_chunks) == len(test_chunks)
for idx, (device, creds) in enumerate(devices.items()):
# Send test data and start live writing task
train_destination_path = "/tmp/higgs_train_data.csv"
result = send_df_as_csv_file(train_chunks[idx], train_destination_path, device, creds)
# Send test data and start live writing task
test_destination_path = "/tmp/higgs_test_data.csv"
result = send_df_as_csv_file(test_chunks[idx], test_destination_path, device, creds)
Step 3 - Running FL#
Now it is time to start the FL deployment.
NOTE: before you run make sure that the devices section of the config file is updated to be a list of the device Ids you want to take part in the experiment.#
There is an
example_xgboost_federated_learning.yml
file inconfigs/
to familiarise yourself with how the federated learning config looks.There is also a
xgboost_federated_learning.yml
which will be used to run the experiment. Feel free to play with this file, try out different configurations of input/output specs, model params, and strategies
[8]:
# Display federated learning config
with open("configs/example_xgboost_federated_learning.yml", 'r') as file:
inference_config = yaml.safe_load(file)
print(yaml.dump(inference_config, sort_keys=False))
name: federated_learning
infrastructure:
device_ids: []
image_name: octaipipe.azurecr.io/octaipipe_lite-all_data_loaders:2.2.0
server_image: octaipipe.azurecr.io/fl_server:2.2.0
input_data_specs:
devices:
- device: default
datastore_type: csv
settings:
file_path: /tmp/higgs_train_data.csv
delimiter: ','
headers: true
exclude: []
index: []
evaluation_data_specs:
devices:
- device: default
datastore_type: csv
settings:
file_path: /tmp/higgs_test_data.csv
delimiter: ','
headers: true
exclude: []
index: []
model_specs:
load_existing: false
name: xgboost_higgs_tutorial
type: base_xgboost
model_params:
objective: binary:logistic
eta: 0.15
max_depth: 8
eval_metric: auc
nthread: 16
num_parallel_tree: 1
subsample: 1
tree_method: hist
run_specs:
backend: xgboost
target_label: label
[ ]:
import logging
import os
os.environ['OCTAIPIPE_DEBUG'] = 'true'
logging.basicConfig(level=logging.INFO, format='%(message)s')
# For more verbose logs, uncomment the following line
# logging.basicConfig(level=logging.DEBUG, format='%(message)s')
Set up the octaifl context by passing the config file, name and escription to OctaiFl
[ ]:
from octaipipe.federated_learning.run_fl import OctaiFL
federated_learning_config = 'configs/xgboost_federated_learning.yml'
octaifl = OctaiFL(
federated_learning_config,
deployment_name='FL XGBoost tutorial deployment',
deployment_description='Deployment part of FL XGBoost tutorial'
)
You can check the current strategy by running the cell below and update it based on your requirements. (see the docs for more on strategy settings.
[ ]:
octaifl.strategy.get_config()
FL XGBoost in OctaiPipe is able to perform multiple training rounds on the device before model aggregation. In order to set this, I will set the num_local_rounds
option in the strategy to 2.
If I wanted to reduce the impact of imbalanced dataset sizes on the devices I would set the normalized_learning_rate
to True in the same way. In this tutorial, the datsets sent to devices are all the same size.
[ ]:
strategy = {
'num_rounds': 20, "num_local_rounds": 2
}
octaifl.strategy.set_config(strategy)
octaifl.strategy.get_config()
Finally we can run the experiment
[ ]:
octaifl.run()
Checking the processes#
There are now two processes running, the server and the clients.
To dig depeer you can explore the fl server kubernetes deployment by finding its details via kubectl -n colab get all
You can also log into the device to get the client deployment logs by running docker logs -f <id of container>