Parallel backend customization API

User can provide their own implementation of a parallel processing backend in addition to the 'loky', 'threading', 'multiprocessing' backends provided by default.

A new backend is defined by providing a backend factory, which is then registered with the joblib.register_parallel_backend() function by passing a name and the factory.

For instance, to register a custom backend named 'custom' with a factory MyCustomBackend, you can call:

# Register the backend so it can be used with parallel_config
register_parallel_backend('custom', MyCustomBackend)

This backend can then be used within the parallel_config() context manager, as:

from joblib import Parallel, delayed, parallel_config

with parallel_config("custom"):
    res = Parallel(2)(delayed(id)(i) for i in range(10))

Minimal backend factory specification

The backend factory can be any callable that returns an instance of ParallelBackendBase. The complete specification of the backend factory is referenced in the default backends source code. Here, we provide minimal details to implement a custom backend factory and the next section describes various possible customizations.

The key methods for the backend factory are:

  • configure: this method is called when creating a particular instance of the Parallel object. It can allocate the resources needed for the parallel computation and return the effective number of jobs that can be run in parallel.

  • effective_n_jobs: This method is called to determine the number of jobs that can be run in parallel. It can be used to limit the number of jobs depending on the specified n_jobs and the system it runs on.

  • terminate: this method is called when the parallel computation is finished. It can be used to clean up the resources allocated by the backend.

  • submit: this method is called to launch the computation for a given task in the Parallel call. The task is a single callable that takes no argument and the method should return a future-like object that allows tracking the task’s progress. The callback argument passed to this method is a callable that should be called when the task is completed, typically through the add_done_callback method of the future-like object.

  • retrieve_result_callback: This method is called within the callback function passed to submit. It is called with the same arguments provided by the backend to the callback functionality – typically the future-like object. It should retrieve and return the result of the function executed in parallel.

Below is a minimal example of a custom backend factory that uses a ThreadPoolExecutor to run the tasks in parallel:

from concurrent.futures import ThreadPoolExecutor

from joblib import ParallelBackendBase
from joblib import register_parallel_backend


class MyCustomBackend(ParallelBackendBase):

    supports_retrieve_callback = True

    def configure(self, n_jobs=1, parallel=None, **backend_kwargs):
        """Configure the backend for a specific instance of Parallel."""
        self.n_jobs = n_jobs

        n_jobs = self.effective_n_jobs(n_jobs)
        self._executor = ThreadPoolExecutor(n_jobs)

        # Return the effective number of jobs
        return n_jobs

    def terminate(self):
        """Clean-up the resources associated with the backend."""
        self._executor.shutdown()
        self._executor = None

    def effective_n_jobs(self, n_jobs):
        """Determine the number of jobs that can be run in parallel."""
        return n_jobs

    def submit(self, func, callback):
        """Schedule a function to be run and return a future-like object.

        This method should return a future-like object that allow tracking
        the progress of the task.

        If ``supports_retrieve_callback`` is False, the return value of this
        method is passed to ``retrieve_result`` instead of calling
        ``retrieve_result_callback``.

        Parameters
        ----------
        func: callable
            The function to be run in parallel.

        callback: callable
            A callable that will be called when the task is completed. This callable
            is a wrapper around ``retrieve_result_callback``. This should be added
            to the future-like object returned by this method, so that the callback
            is called when the task is completed.

            For future-like backends, this can be achieved with something like
            ``future.add_done_callback(callback)``.

        Returns
        -------
        future: future-like
            A future-like object to track the execution of the submitted function.
        """
        future = self._executor.submit(func)
        future.add_done_callback(callback)
        return future

    def retrieve_result_callback(self, future):
        """Called within the callback function passed to `submit`.

        This method can customise how the result of the function is retrieved
        from the future-like object.

        Parameters
        ----------
        future: future-like
            The future-like object returned by the `submit` method.

        Returns
        -------
        result: object
            The result of the function executed in parallel.
        """
        return future.result()

Extra customizations

The backend API offers several hooks that can be used to customize its behavior.

Passing extra arguments to the backend

It is possible to register a backend class that has some mandatory constructor parameters such as the network address and connection credentials for a remote cluster computing service:

class MyCustomBackend(ParallelBackendBase):

    def __init__(self, endpoint, api_key, nesting_level=0):
       super().__init__(nesting_level=nesting_level)
       self.endpoint = endpoint
       self.api_key = api_key

    ...
    # Do something with self.endpoint and self.api_key somewhere in
    # one of the method of the class

register_parallel_backend('custom', MyCustomBackend)

The connection parameters can then be passed to the parallel_config() context manager:

with parallel_config(backend='custom', endpoint='http://compute',
                     api_key='42'):
    Parallel()(delayed(some_function)(i) for i in range(10))

Using the context manager can be helpful when using a third-party library that uses joblib.Parallel internally while not exposing the backend argument in its own API.

Cancelling tasks

If the backend allow to cancel tasks, the method abort_everything can be implemented to abort all the tasks that are currently running as soon as one of the tasks raises an exception. This can be useful to avoid wasting computational resources when the call will fail.

This method have an extra parameters ensure_ready that informs the backend whether the error was part of a single call to Parallel or in a context manager block. In the case of a single call (ensure_ready=False), there is no need to re-spawn workers for future calls, while in the case of a context (ensure_ready=True), one could call configure to re-allocate computational resources.

def abort_everything(self, ensure_ready=True):
    """Abort any running tasks

    This is called when an exception has been raised when executing a task
    and all the remaining tasks will be ignored and can therefore be
    aborted to spare computation resources.

    If ensure_ready is True, the backend should be left in an operating
    state as future tasks might be re-submitted via that same backend
    instance.

    If ensure_ready is False, the implementer of this method can decide
    to leave the backend in a closed / terminated state as no new task
    are expected to be submitted to this backend.

    Setting ensure_ready to False is an optimization that can be leveraged
    when aborting tasks via killing processes from a local process pool
    managed by the backend it-self: if we expect no new tasks, there is no
    point in re-creating new workers.
    """
    pass

Setting up Nested Parallelism

The backend can also provide a method get_nested_backend that will be used to setup the default backend to be used in nested parallel calls. By default, the default backend is set to a thread-based backend for the first level and then falls back to a sequential backend to avoid spawning too many threads on the host.

def get_nested_backend(self):
    """Backend instance to be used by nested Parallel calls.

    By default a thread-based backend is used for the first level of
    nesting. Beyond, switch to sequential backend to avoid spawning too
    many threads on the host.
    """
    nesting_level = getattr(self, "nesting_level", 0) + 1
    return LokyBackend(nesting_level=nesting_level), None

Another nested parallelism that needs to be controlled is the numbers of thread in third-party C-level threadpools, e.g. OpenMP, MKL, or BLAS. In joblib, this is controlled with the inner_max_num_threads argument that can be provided to the backend in the parallel_config() context manager. To support this argument, the backend should set the supports_inner_max_num_threads class attribute to True and accept the argument in the constructor to set this up in the workers. A helper to set this in the workers is to use environment variables provided by self._prepare_worker_env(n_jobs).

Third-party backend registration

To be used seamlessly within parallel_config(), external packages need to register their parallel backends so they are identified by joblib. This requires importing extra packages as in:

>>> import joblib
>>> with joblib.parallel_config(backend='custom'):
...     ...  # this fails
KeyError: 'custom'

# Import library to register external backend
>>> import my_custom_backend_library
>>> with joblib.parallel_config(backend='custom'):
...     ... # this works

This can be confusing for users. To resolve this, external packages can safely register their backends directly within the joblib codebase by creating a small function that registers their backend, and including this function within the joblib.parallel.EXTERNAL_BACKENDS dictionary:

def _register_custom():
    try:
        import my_custom_library
    except ImportError:
        raise ImportError("an informative error message")

EXTERNAL_BACKENDS['custom'] = _register_custom

This is subject to community review, but can reduce the confusion for users when relying on side effects of external package imports.