Python Interface#

The Python interface for OctaiPipe helps users keep track of and organize their work from a Jupyter notebook. It can also help development with example configurations and custom setups.

Your OctaiPipe installation includes a Jupyter notebook server image to use in the Kubeflow Developer Environment.

Each part of the interface sits within one of five OctaiPipe submodules. The submodules and their functions are explained below.

Deployments interface#

The deployments interface helps users keep track of both cloud and edge deployments. You can also find deployments by device ID, in case you forgot your deployment ID or simply want to see what has been deployed on a device. It is also possible to stop an edge deployment using the kill_deployment function.

octaipipe.deployments.find_all_deployments() List[dict]#

Calls the deployment API and returns all available deployments.

Returns:

all deployments in database

Return type:

response (List[dict])

octaipipe.deployments.find_all_deployments_by_status(status: str) List[dict]#

Returns all deployments with a specific status to the user.

Parameters:

status (str) – status to return all deployments for. Can take the following: * “Running” * “Preparing” * “Deploying” * “Completed” * “Failed”

Returns:

all deployment records with specific status

Return type:

response (List[dict])

octaipipe.deployments.find_deployments_by_device_id(device_id: str) List[dict]#

Returns all deployment records for a specific device ID. This is only implemented for device deployments as cloud deployments do not have a device.

Parameters:

device_id (str) – device ID to return records for

Returns:

all deployment records for device

Return type:

response (List[dict])

octaipipe.deployments.new_edge_deployment(config_path: str) str#

Create new edge deployment

Parameters:

config_path (str) – path to edge deployment config

Returns:

ID of deployment

Return type:

deployment_id (str)

octaipipe.deployments.new_cloud_deployment(config_path: str) str#

Create new cloud deployment

Parameters:

deployment_config (str) – path to cloud deployment config

Returns:

ID of deployment

Return type:

deployment_id (str)

octaipipe.deployments.kill_deployment(deployment_id: str) str#

Terminates deployment for a specific deployment ID.

Parameters:

deployment_id (str) – ID of deployment to terminates

Returns:

ID of deployment that was terminated

Return type:

deployment_id (str)

Develop interface#

The develop interface helps users develop with OctaiPipe by giving access to example configuration files as well as example custom pipeline steps and custom models. Users can also view all custom pipeline steps and models in the database.

octaipipe.develop.validate_config_file(config_path: str) None#

Validates a config file based on schema of current version. If the validation fails, all validation errors are printed. If the validation passes, a success message is printed. For any other issues, the function returns the status code and the message from the OctaiPipe Portal backend.

Parameters:

config_path (str) – the filepath to read the config file from.

Returns:

None

octaipipe.develop.get_example_step_config(step: str) str#

Gets an example configuration file for a specified pipeline step. Saves file in ‘./configs/step_configs’. Note that running method multiple times overwrites file.

Parameters:

step (str) –

the pipeline step to get an example config for. This can take:

  • ”preprocessing”

  • ”feature_engineering”

  • ”model_training”

  • ”model_evaluation”

  • ”data_drift”

  • ”clustering”

  • ”model_inference”

  • ”automl”

Returns:

filepath that we save config example to

Return type:

filepath (str)

octaipipe.develop.get_example_cloud_deployment_config() str#

Gets an example configuration file for a generic cloud deployment. File is saved to ‘./configs/cloud_deployment_config.yml’. Note that due to the rich nature of cloud deployments, it is also worth having a look at the OctaiPipe cloud deployment documentation. https://docs.octaipipe.ai/Cloud_Deployment.html

Returns:

filepath that we save config example to

Return type:

filepath (str)

octaipipe.develop.get_example_edge_deployment_config() str#

Gets example configuration file for deploying to the edge using Octaideploy. File is saved to ‘./configs/edge_deployment_config.yml’. Note that running method multiple times overwrites file.

Returns:

filepath which we save config example to

Return type:

filepath (str)

octaipipe.develop.get_example_fl_config() str#

Gets example configuration file for running federated learning. File is saved to ‘./configs/federated_learning_config.yml’. Note that running method multiple times overwrites file.

Returns:

filepath which we save config example to

Return type:

filepath (str)

octaipipe.develop.get_custom_model_template() str#

Gets a template .py file for a custom model. This is a skeleton of what a custom model might look like. Saved to ‘./example_files/custom_model_file.py’

Returns:

filepath which we save template to

Return type:

filepath (str)

octaipipe.develop.get_custom_step_template() str#

Gets a template .py file for a custom pipeline step. This is a skeleton of what a custom pipeline step might look like. Saved to ‘./example_files/custom_step_file.py’

Returns:

filepath which we save template to

Return type:

filepath (str)

octaipipe.develop.find_all_custom_models() List[dict]#

Calls the custom model API and returns all available records.

Returns:

all custom models in database

Return type:

response (List[dict])

octaipipe.develop.find_all_custom_steps() List[dict]#

Calls the custom pipeline step API and returns all available records.

Returns:

all custom pipeline steps in database

Return type:

response (List[dict])

octaipipe.develop.run_step_local(step_name: str, config_path: str, local_step_path: str = None) None#

This function helps users run native OctaiSteps and custom steps in the local workspace. This functionality is especially useful for local debugging of steps. See documentation of OctaiPipe custom steps at: https://docs.octaipipe.ai/usage/steps/custom_step.html

Parameters:
  • step_name (str) – The name of the pipeline step. This can be a native OctaiPipe step or a custom step.

  • config_path (str) – path to the configuration file for the pipeline step in relation to current working directory.

  • local_step_path (str, optional) – local path to step .py file if the step is a custom step not yet registered to the database.

Returns:

None

octaipipe.develop.new_custom_step(file_path: str, step_name: str) None#

Function to register new custom step to database as well as save the custom step Python file to cloud storage. The file from the current working directory is given by file_path along with the desired name of the step as step_name. The step name has to be unique. See documentation of custom steps at: https://docs.octaipipe.ai/usage/steps/custom_step.html

Parameters:
  • file_path (str) – file path to custom step Python file relative to current working directory

  • step_name (str) – desired name of custom step

Returns:

None

octaipipe.develop.update_custom_step(file_path: str, step_name: str) None#

Function to update a custom step The file from the current working directory is given by file_path along with the desired name of the step as step_name. The step name should be a pre-registered step. See documentation of custom steps at: https://docs.octaipipe.ai/usage/steps/custom_step.html

Parameters:
  • file_path (str) – file path to custom step Python file relative to current working directory

  • step_name (str) – name of custom step to update

Returns:

None

Devices interface#

The devices interface helps users manage devices by seeing devices in the database as well as adding or removing existing devices.

octaipipe.devices.find_all_devices() List[dict]#

Returns all devices stored in database as list of dictionaries, where each dictionary represents one device record.

Returns:

all device records in database

Return type:

List[dict]

octaipipe.devices.find_devices_by_group_name(group_name: str) List[str]#

Returns all devices in database matching a provided group_name ‘group_name’. Each dictionary in returned list represents one device record.

Parameters:

group_name (str) – Name of group to get all device records for

Returns:

all device records matching group_name

Return type:

List[dict]

octaipipe.devices.find_all_device_groups() List[dict]#

Returns all device groups in database. Each dictionary in returned list represents one device group.

Returns:

all device groups in database

Return type:

List[dict]

octaipipe.devices.get_device_by_id(id: str) dict#

Returns device record from database by device ID.

Parameters:

id (str) – device ID for record to return

Returns:

device record matching ID

Return type:

device (dict)

octaipipe.devices.new_device(devices: Union[List[dict], dict, str]) str#

Adds new device record to database.

Parameters:

device (Union[List[dict], dict, str]) –

device record(s) to add to database. Can be one record, list of records or path to CSV with records. The following fields can be included in dictionary:

  • ”id”: str (required) - creates if not provided

  • ”description”: str

  • ”architecture”: str

  • ”os”: str

  • ”status”: str

  • ”influxUrl”: str

  • ”influxToken”: str

  • ”influxOrg”: str

  • ”grafanaUrl”: str

  • ”influxOrgId”: str

Returns:

path to zip file with docker compose files

Return type:

zip_file_path (str)

octaipipe.devices.update_device(devices: Union[List[dict], dict, str]) list#

Updates device record(s)

Parameters:

device (Union[List[dict], dict, str]) –

device record(s) to update to database. Can be one record, list of records or path to CSV with records. The following fields can be included in dictionary:

  • ”id”: str (required) - creates if not provided

  • ”description”: str

  • ”architecture”: str

  • ”os”: str

  • ”status”: str

  • ”influxUrl”: str

  • ”influxToken”: str

  • ”influxOrg”: str

  • ”grafanaUrl”: str

  • ”influxOrgId”: str

Returns:

device IDs that were updated

Return type:

list

octaipipe.devices.new_group(group: Union[List[dict], dict]) None#

Creates a new device group in the database.

Parameters:

group (Union[List[dict], dict]) –

device group(s) to be added to database. This can be a dictionary for creating a single group or a list of group records for creating multiple groups. Can take the following inputs:

  • ”groupName”: str

  • ”groupDescription”: str

Returns:

None

octaipipe.devices.delete_device(device_id: Union[List[str], str]) None#

Deletes device record from database given device ID provided.

Parameters:

device_id (Union[List[str], str]) – ID of device(s) to delete from database. Can be one device ID or list of device IDs.

Returns:

None

octaipipe.devices.delete_group(group_name: Union[List[str], str]) None#

Deletes device group record from database given group name provided.

Parameters:

group_name (Union[List[str], str]) – name of group(s) to delete from database. Can be list of group names of single group name as str.

Returns:

None

octaipipe.devices.add_device_to_group(device_id: Union[List[str], str], group_name: str) None#

Adds device(s) to group in database. Can either supply one device as string or multiple devices as list of strings to add to single group.

Parameters:
  • device_id (Union[List[str], str]) – single device ID or list of device IDs to add to group.

  • group_name (str) – name of group to add device(s) to.

Returns:

None

octaipipe.devices.remove_device_from_group(device_id: Union[List[str], str], group_name: str) None#

Removes device(s) from group in database. Can either supply one device as string or multiple devices as list of strings to remove from single group.

Parameters:
  • device_id (Union[List[str], str]) – single device ID or list of device IDs to remove from group.

  • group_name (str) – name of group to remove device(s) from.

Returns:

None

Experiments interface#

The experiments interface helps users organize their experiments. You can see experiments in the database and get specific records, which are also linked to the models they produced. It is also possible to create using this submodule.

octaipipe.experiments.find_all_experiments() List[dict]#

Calls the experiment API and returns all available experiments.

Returns:

all experiments in database

Return type:

response (List[dict])

octaipipe.experiments.get_experiment_by_id(id: str) dict#

Gets experiment matching provided id.

Parameters:

id (str) – experiment ID to return record for

Returns:

record matching ID

Return type:

response (dict)

octaipipe.experiments.get_experiment_by_model_id(model_id: str) dict#

Gets experiment matching model ID provided.

Parameters:

model_id (str) – model ID to return experiment record for

Returns:

record matching model ID

Return type:

response (dict)

octaipipe.experiments.new_fl_experiment(experiment: dict) None#

Adds new FL experiment record to database.

Parameters:

experiment (dict) – record to add to database. Record can consist of the following fields: - “experimentId” (str) - required (created if not provided) - “experimentDescription” (str) - “date” (str) - “userId” (str) - “communicationRound” (int) - “currentStatus” (str) - “createDatetime” (str) - “experimentStatus” (int) - “flConfigFile” (str) - required - “flServer” (str) - “flStrategy” (str)

Returns:

None

octaipipe.experiments.new_experiment_from_previous(prev_exp_id: str, new_exp: dict) None#

Creates new experiment from existing record. Pulls previous record and replaces fields provided by user in new_exp to create new record.

Parameters:
  • prev_exp_id (str) – experiment ID of previous experiment.

  • new_exp (dict) –

    new experiment record. No fields are required as missing fields are taken from previous record and ID is created if not provided. Can take the following fields:

    • ”experimentId” (str)

    • ”experimentDescription” (str)

    • ”date” (str)

    • ”userId” (str)

    • ”communicationRound” (int)

    • ”currentStatus” (str)

    • ”createDatetime” (str)

    • ”experimentStatus” (int)

    • ”flConfigFile” (str)

    • ”flServer” (str)

    • ”flStrategy” (str)

Returns:

None

Explainability interface#

The explainability interface helps users retrieve metrics and centroids from a k-FED model to help explain the model as well as the clusters defined.

octaipipe.explainability.get_explainability_record_by_model_id(model_id: str) dict#

Gets full database record with metrics and centroid information for a k-FED model. This does not return the metrics or centroids themselves, but the IDs required to pull that information.

Parameters:

model_id (str) – ID of model to get explainability record for

Returns:

explainability record as dictionary

Return type:

dict

octaipipe.explainability.get_global_metrics_by_model_id(model_id: str) DataFrame#

Gets global metrics for a k-FED model. This contains:

Parameters:

model_id (str) – ID of model to get global metrics for

Returns:

data frame with global metrics

Return type:

metrics (pd.DataFrame)

octaipipe.explainability.get_local_metrics_by_model_id(model_id: str) DataFrame#

Gets local metrics for all devices which have used a k-FED model for evaluation. This includes:

  • device ID (if exists)

  • adjusted random score (optional)

  • adjusted mutual info score (optional)

  • homogeneity score (optional)

  • completeness score (optional)

  • v-measure score (optional)

  • mean wcss

  • global cluster proportions (dict dumped to str)

Parameters:

model_id (str) – ID of model to get local metrics for

Returns:

data frame with local metrics

Return type:

metric_df (pd.DataFrame)

octaipipe.explainability.get_global_cluster_proportions_by_model_id(model_id: str) DataFrame#

Get the proportions of cluster assignment on edge devices for global clusters. This is part of local metrics record, but this function returns only the global cluster proportions in a separate data frame.

Parameters:

model_id (str) – ID of model to get global cluster proportions for

Returns:

data frame with global cluster proportions

Return type:

metric_df (pd.DataFrame)

octaipipe.explainability.get_global_centroids_by_model_id(model_id: str) str#

Gets the global centroids for a k-FED model. Retrieves the NPY file from blob storage and saves it to ~/model_metrics/{model_id}/global_centroids/centroid_{global_centroid_id}.npy This can be loaded using np.load()

Parameters:

model_id (str) – ID of model to get global centroid of

Returns:

path to which global centroid is saved

Return type:

global_centroid_path (str)

octaipipe.explainability.get_local_centroids_by_model_id(model_id: str) str#

Gets the local centroids for a k-FED model. Retrieves the NPY files for all devices local centroids from blob storage and saves them to ~/model_metrics/{model_id}/local_centroids/centroid_{local_centroid_id}.npy These can be loaded using np.load()

Parameters:

model_id (str) – ID of model to get local centroids of

Returns:

directory in which centroids have been saved

Return type:

centroid_directory (str)

octaipipe.explainability.get_local_centroids_by_model_id_device(model_id: str) tuple#

Gets the local centroids for a k-FED model. Retrieves the NPY files for all devices local centroids from blob storage and saves them to ~/model_metrics/{model_id}/local_centroids/centroid_{device_id}.npy These can be loaded using np.load()

Parameters:

model_id (str) – ID of model to get local centroids of

Returns:

tuple with centroid deirectory (str) and list of device

IDs for which centroids were retrieved

Return type:

tuple (tuple)

octaipipe.explainability.get_local_centroid_distances_by_model_id(model_id: str) DataFrame#

For each device included in a k-FED model, this computes the Euclidean distance between a local centroid and the closest global centroid. Also shows the average distance for all centroids for a device. Note that multiple local centroids for a device might belong to the same global centroid. The table simply shows the distance to whatever is the closest global centroid.

Parameters:

model_id (str) – ID of model to compute values for

Returns:

DF with distances

Return type:

df_out (pd.DataFrame)

octaipipe.explainability.add_labels_for_model(model_id: str, labels: dict) dict#

Adds labels to a model record that can be used for example in inference to get text label outputs instead of label encoded numbers.

Parameters:
  • model_id (str) – model to add labels for

  • labels (dict) – Dictionary of labels for a model, can include all or some of the relevant labels, e.g.: {“0”: “normal_state”, “1”: “failure”}

Returns:

Records successfully posted to backend

Return type:

labels (dict)

octaipipe.explainability.update_labels_for_model(model_id: str, labels: dict) dict#

Updates existing labels for a model record that can be used for example in inference to get text label outputs instead of label encoded numbers.

Parameters:
  • model_id (str) – model to add labels for

  • labels (dict) – Dictionary of labels for a model, can include all or some of the relevant labels, e.g.: {“0”: “normal_state”, “1”: “failure”}

Returns:

Records successfully posted to backend

Return type:

labels (dict)

octaipipe.explainability.get_labels_for_model(model_id: str) dict#

Gets all labels set for model.

Parameters:

model_id (str) – model to add labels for

Returns:

labels set for model as {key: value}

Return type:

labels (dict)

octaipipe.explainability.delete_labels_for_model(model_id: str) None#

Deletes all labels set for model.

Parameters:

model_id (str) – model to add labels for

Returns:

labels set for model as {key: value}

Return type:

labels (dict)

Models interface#

The models interface helps users organize their models. You can see models in the database and get specific records. It is also possible to create new model records using this submodule.

octaipipe.models.find_all_models() list#

Calls the model API and returns all available models.

Returns:

all models in database

Return type:

list (dict)

octaipipe.models.find_models_by_name(name: str) list#

Calls the model API and returns models with name ‘name’.

Parameters:

name (str) – name of model in records to return

Returns:

models in database matching name ‘name’

Return type:

list (dict)

octaipipe.models.find_models_by_experiment_id(experiment_id: str) list#

Calls the model API and returns models with experiment id ‘experiment_id’.

Parameters:

experiment_id (str) – experiment id in records to return

Returns:

models in database matching experiment id

’experiment_id’

Return type:

list (dict)

octaipipe.models.get_model_by_name_and_version(name: str, version: str)#

Calls the model API and returns model with name ‘name’ and version ‘version’. Should only result in one record returned, else raises LookupError.

Parameters:
  • name (str) – name of model in records to return

  • version (str) – version of model in records to return

Returns:

model in database matching name ‘name’ and version

’version’

Return type:

response (dict)

octaipipe.models.get_model_by_id(id: str) dict#

Calls the model API and returns model with id ‘id’.

Parameters:

id (str) – id of model to return

Returns:

model in database matching id ‘id’

Return type:

response (dict)

octaipipe.models.new_model(model: dict)#

Adds model to the database.

Parameters:

model (dict) –

model record to add to database. The following fields can be included in dictionary:

  • ”modelId”: str (required) - created if not provided

  • ”modelName”: str (required)

  • ”version”: str (required)

  • ”framework”: str

  • ”modelType”: str

  • ”solutionType”: str

  • ”nClasses”: int

  • ”targetLabel”: str

  • ”columnNames”: str

  • ”savingTime”: str

  • ”blobName”: str

  • ”trainingQuery”: str

  • ”configFields”: str

  • ”experimentId”: str

  • ”numRounds”: int

  • ”fedLearnStrategy”: str

  • ”modelDescription”: str

  • ”fedLearnSavingTime”: 0

  • ”automlConfig”: str

  • ”metric”: str

  • ”score”: float

  • ”kubeflowExperiment”: str

  • ”kubernetesNamespace”: str

Returns:

None

octaipipe.models.download_model_file(model_id: str, as_onnx: bool = False) str#

Downloads model file for model with ID model_id and places it in folder from current working directory named ./model_files. The “as_onnx” argument specifies whether the downloaded model file should be the ONNX file or original model file (e.g. sklearn model as joblib file). All models except PyTorch models are saved as joblib files in original state. PyTorch models are saved as .npz weights. Model files are named by their model ID and an “onnx” suffix if ONNX file.

Parameters:
  • model_id (str) – ID of model in database.

  • as_onnx (bool, optional) – whether to download ONNX version or original model file.

Returns:

filepath to model.

Return type:

filepath (str)