Model Factory with Python Multithreading

Horizontal

This accelerator shows a simple example of how to use the Python threading library to build a model factory.

Build with Free Trial

It is well-known that only one thread can execute Python code at once in CPython (even though certain performance-oriented libraries might overcome this limitation) because of the Global Interpreter Lock (GIL). Despite this disadvantage, multithreading is still an appropriate approach if you want to run multiple I/O-bound tasks simultaneously.

The DataRobot platform makes it possible to create model factories. A model factory is a system or set of procedures that automatically generate predictive models with little to no human intervention. More details can be found here. The third party frameworks from the Python ecosystem can also be used for model factories’ building (for example, Dask). One of the best Dask practices is not to overuse Dask when its distributed parallelism is not really needed, especially if you don’t use large amount of data.

The application of model factories improves the throughput of DataRobot cluster over models’ training phase leveraging better usage of the DataRobot modeling workers. That allows decreasing the training time of the models increasing the efficiency of data science teams who need to train tens and hundreds of different models. Usually the performance gain can reach 2-3 times for the training time in comparison to the sequential project training.

This accelerator shows a simple example of how to use the Python threading library to build a model factory.

Setup

Import dependencies

In [1]:

import concurrent.futures as f
import datetime

import datarobot as dr
from datarobot import AUTOPILOT_MODE

print(dr.__version__)

3.1.1

Set the number of pool workers and the model target.

In [2]:

THREAD_POOL_WORKERS = 5
TARGET = "SalePrice"

Connect to DataRobot

Read more about different options for connecting to DataRobot from the client.

In [3]:
dr.Client(config_path="drconfig.yaml")
Out [3]:
<datarobot.rest.RESTClientObject at 0x103479c90>

Create a dataset in the AI Catalog

In [4]:
training_dataset_file_path = "https://s3.amazonaws.com/datarobot_public_datasets/ai_accelerators/house_train_dataset.csv"
training_dataset = dr.Dataset.create_from_url(training_dataset_file_path)

Create a DataRobot project

In [5]:

project = dr.Project.create_from_dataset(
    training_dataset.id, project_name="Sequential Project"
)

Modeling

Start Autopilot for one project

In [6]:

project.analyze_and_model(target=TARGET, worker_count=-1, mode=AUTOPILOT_MODE.QUICK)
Out [6]:

Project(Sequential Project)
In [7]:

project.wait_for_autopilot(check_interval=60)
In progress: 8, queued: 0 (waited: 0s)
In progress: 8, queued: 0 (waited: 1s)
In progress: 8, queued: 0 (waited: 2s)
In progress: 8, queued: 0 (waited: 3s)
In progress: 8, queued: 0 (waited: 5s)
In progress: 8, queued: 0 (waited: 7s)
In progress: 8, queued: 0 (waited: 11s)
In progress: 8, queued: 0 (waited: 18s)
In progress: 8, queued: 0 (waited: 31s)
In progress: 0, queued: 0 (waited: 58s)
In progress: 0, queued: 0 (waited: 110s)
In progress: 1, queued: 0 (waited: 170s)
In progress: 0, queued: 0 (waited: 231s)
In progress: 0, queued: 0 (waited: 292s)

Start Autopilot for one project with advanced options

You can decrease training time if there is no need to prepare model for the deployment and train blenders. It can be useful during the ML experimentation phase.

In [8]:

advanced_options = dr.AdvancedOptions(
    prepare_model_for_deployment=False, blend_best_models=False
)
In [9]:

project = dr.Project.create_from_dataset(
    training_dataset.id, project_name="Sequential Project (advanced options)"
)
In [10]:

project.analyze_and_model(
    target=TARGET,
    worker_count=-1,
    mode=AUTOPILOT_MODE.QUICK,
    advanced_options=advanced_options,
)
Out [10]:

Project(Sequential Project (advanced options))
In [11]:

project.wait_for_autopilot(check_interval=60)
In progress: 8, queued: 0 (waited: 0s)
In progress: 8, queued: 0 (waited: 1s)
In progress: 8, queued: 0 (waited: 2s)
In progress: 8, queued: 0 (waited: 3s)
In progress: 8, queued: 0 (waited: 4s)
In progress: 8, queued: 0 (waited: 7s)
In progress: 8, queued: 0 (waited: 11s)
In progress: 8, queued: 0 (waited: 18s)
In progress: 8, queued: 0 (waited: 31s)
In progress: 2, queued: 0 (waited: 58s)
In progress: 2, queued: 0 (waited: 110s)
In progress: 0, queued: 0 (waited: 170s)

You can see that the training time decreased from 292s to 170s (42% gain).

Modeling five projects in parallel

Create a list with five DataRobot projects that will be trained in parallel.

In [12]:

project_list = []
for n in range(1, 6):
    project_name = f"Parallel Project - {n}"
    project = dr.Project.create_from_dataset(
        training_dataset.id, project_name=project_name
    )
    project_list.append(project)
print(project_list)
[Project(Parallel Project - 1), Project(Parallel Project - 2), Project(Parallel Project - 3), Project(Parallel Project - 4), Project(Parallel Project - 5)]

This function kicks off an independent training process for every project (5 projects created in this example) in each thread.

In [13]:
def thread_function(project, start_time):
    print(f"Start training of project '{project.project_name}'...\n")
    project.analyze_and_model(
        target=TARGET, worker_count=-1, mode=AUTOPILOT_MODE.QUICK, max_wait=14400
    )
    project.wait_for_autopilot(check_interval=60)

    return datetime.datetime.now() - start_time

Submit tasks to executor

The ThreadPoolExecutor subclass with the predefined number of threads will be used to submit tasks for the asynchronous execution. The context manager should be used for the correct resources’ management.

In [14]:

with f.ThreadPoolExecutor(max_workers=THREAD_POOL_WORKERS) as executor:
    allFutures = {
        executor.submit(thread_function, pr, datetime.datetime.now()): pr
        for pr in project_list
    }

    for future in f.as_completed(allFutures):
        pr = allFutures[future]
        try:
            elapsed_time = future.result()
        except Exception as exc:
            print(
                f"Training of project '{pr.project_name}' generated an exception: {exc}"
            )
        else:
            print(f"Training of project '{pr.project_name}' finished in {elapsed_time}")
Start training of project 'Parallel Project - 1'...

Start training of project 'Parallel Project - 2'...

Start training of project 'Parallel Project - 3'...

Start training of project 'Parallel Project - 4'...

Start training of project 'Parallel Project - 5'...

In progress: 8, queued: 0 (waited: 0s)
In progress: 8, queued: 0 (waited: 0s)
In progress: 4, queued: 4 (waited: 0s)
In progress: 8, queued: 0 (waited: 1s)
In progress: 8, queued: 0 (waited: 1s)
In progress: 4, queued: 4 (waited: 1s)
In progress: 8, queued: 0 (waited: 2s)
In progress: 8, queued: 0 (waited: 2s)
In progress: 4, queued: 4 (waited: 2s)
In progress: 8, queued: 0 (waited: 3s)
In progress: 8, queued: 0 (waited: 3s)
In progress: 4, queued: 4 (waited: 3s)
In progress: 8, queued: 0 (waited: 4s)
In progress: 8, queued: 0 (waited: 5s)
In progress: 4, queued: 4 (waited: 4s)
In progress: 0, queued: 8 (waited: 0s)
In progress: 0, queued: 8 (waited: 0s)
In progress: 0, queued: 8 (waited: 1s)
In progress: 0, queued: 8 (waited: 1s)
In progress: 8, queued: 0 (waited: 7s)
In progress: 8, queued: 0 (waited: 7s)
In progress: 4, queued: 4 (waited: 7s)
In progress: 0, queued: 8 (waited: 2s)
In progress: 0, queued: 8 (waited: 2s)
In progress: 0, queued: 8 (waited: 3s)
In progress: 0, queued: 8 (waited: 3s)
In progress: 0, queued: 8 (waited: 4s)
In progress: 0, queued: 8 (waited: 4s)
In progress: 8, queued: 0 (waited: 11s)
In progress: 8, queued: 0 (waited: 11s)
In progress: 4, queued: 4 (waited: 10s)
In progress: 0, queued: 8 (waited: 7s)
In progress: 0, queued: 8 (waited: 7s)
In progress: 0, queued: 8 (waited: 11s)
In progress: 0, queued: 8 (waited: 11s)
In progress: 8, queued: 0 (waited: 18s)
In progress: 8, queued: 0 (waited: 18s)
In progress: 4, queued: 4 (waited: 18s)
In progress: 0, queued: 8 (waited: 18s)
In progress: 0, queued: 8 (waited: 18s)
In progress: 8, queued: 0 (waited: 31s)
In progress: 8, queued: 0 (waited: 31s)
In progress: 4, queued: 4 (waited: 31s)
In progress: 0, queued: 8 (waited: 31s)
In progress: 0, queued: 8 (waited: 31s)
In progress: 1, queued: 0 (waited: 57s)
In progress: 1, queued: 0 (waited: 58s)
In progress: 5, queued: 0 (waited: 57s)
In progress: 8, queued: 0 (waited: 57s)
In progress: 5, queued: 3 (waited: 57s)
In progress: 6, queued: 10 (waited: 109s)
In progress: 0, queued: 16 (waited: 110s)
In progress: 0, queued: 0 (waited: 109s)
In progress: 3, queued: 0 (waited: 109s)
In progress: 1, queued: 0 (waited: 109s)
In progress: 6, queued: 0 (waited: 170s)
In progress: 0, queued: 0 (waited: 170s)
In progress: 10, queued: 6 (waited: 170s)
In progress: 0, queued: 16 (waited: 170s)
In progress: 0, queued: 16 (waited: 170s)
In progress: 0, queued: 0 (waited: 231s)
In progress: 0, queued: 0 (waited: 231s)
In progress: 0, queued: 0 (waited: 231s)
In progress: 2, queued: 0 (waited: 231s)
In progress: 14, queued: 0 (waited: 231s)
In progress: 0, queued: 0 (waited: 292s)
In progress: 0, queued: 0 (waited: 292s)
In progress: 0, queued: 0 (waited: 291s)
In progress: 0, queued: 0 (waited: 291s)
In progress: 0, queued: 0 (waited: 292s)
In progress: 1, queued: 0 (waited: 352s)
In progress: 1, queued: 0 (waited: 352s)
In progress: 1, queued: 0 (waited: 352s)
In progress: 1, queued: 0 (waited: 352s)
In progress: 1, queued: 0 (waited: 352s)
In progress: 0, queued: 0 (waited: 413s)
In progress: 0, queued: 0 (waited: 413s)
In progress: 0, queued: 0 (waited: 413s)
In progress: 0, queued: 0 (waited: 413s)
In progress: 0, queued: 0 (waited: 413s)
In progress: 0, queued: 0 (waited: 474s)
In progress: 0, queued: 0 (waited: 474s)
In progress: 0, queued: 0 (waited: 474s)
Training of project 'Parallel Project - 2' finished in 0:08:38.625724
Training of project 'Parallel Project - 3' finished in 0:08:38.687918
Training of project 'Parallel Project - 4' finished in 0:08:38.785352
In progress: 0, queued: 0 (waited: 474s)
In progress: 0, queued: 0 (waited: 474s)
Training of project 'Parallel Project - 1' finished in 0:08:43.753684
Training of project 'Parallel Project - 5' finished in 0:08:43.793173

The training time for a multithreaded approach will depend on multiple factors (CPU/RAM load, network bandwidth, etc.) and will vary for the different runs. The average training time is 8min 40s.

Conclusion

Three experiments are performed in this AI Accelerator:

  • Training one project (training time: 292s)
  • Training one project with advanced options (training time: 170s)
  • Training five projects in parallel (training time: 520s)

Training five projects sequentially would take 1460s, while training five projects in parallel took 520s (64% gain i.e. 2.8 times faster). Combining parallel training with advanced project options can also decrease overall training time.

Taking into account the above mentioned numbers, you can conclude that building model factories using multithreaded approach can be really helpful during the ML experimentation phase especially if there is a need to train models for the use cases with thousands of SKUs. The main advantage of the presented approach is an absence of the third party libraries, the full process is based on the Python threading library.

Get Started with Free Trial

Experience new features and capabilities previously only available in our full AI Platform product.

Get Started with Python Multithreading

Explore more Industry Agnostic AI Accelerators

Explore more AI Accelerators