Embarrassingly parallel for loops¶
Common usage¶
Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing:
>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
can be spread over 2 CPUs using the following:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
The output can be a generator that yields the results as soon as they’re available, even if the subsequent tasks aren’t completed yet. The order of the outputs always matches the order the inputs have been submitted with:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> parallel = Parallel(n_jobs=2, return_as="generator")
>>> output_generator = parallel(delayed(sqrt)(i ** 2) for i in range(10))
>>> print(type(output_generator))
<class 'generator'>
>>> print(next(output_generator))
0.0
>>> print(next(output_generator))
1.0
>>> print(list(output_generator))
[2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
This generator enables reducing the memory footprint of
joblib.Parallel
calls in case the results can benefit from on-the-fly
aggregation, as illustrated in
Returning a generator in joblib.Parallel.
Future releases are planned to also support returning a generator that yields
the results in the order of completion rather than the order of submission, by
using return_as="generator_unordered"
instead of return_as="generator"
.
In this case the order of the outputs will depend on the concurrency of workers
and will not be guaranteed to be deterministic, meaning the results can be
yielded with a different order every time the code is executed.
Thread-based parallelism vs process-based parallelism¶
By default joblib.Parallel
uses the 'loky'
backend module to start
separate Python worker processes to execute tasks concurrently on
separate CPUs. This is a reasonable default for generic Python programs
but can induce a significant overhead as the input and output data need
to be serialized in a queue for communication with the worker processes (see
Serialization & Processes).
When you know that the function you are calling is based on a compiled extension that releases the Python Global Interpreter Lock (GIL) during most of its computation then it is more efficient to use threads instead of Python processes as concurrent workers. For instance this is the case if you write the CPU intensive part of your code inside a with nogil block of a Cython function.
To hint that your code can efficiently use threads, just pass
prefer="threads"
as parameter of the joblib.Parallel
constructor.
In this case joblib will automatically use the "threading"
backend
instead of the default "loky"
backend:
>>> Parallel(n_jobs=2, prefer="threads")(
... delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
The parallel_config()
context manager helps selecting
a specific backend implementation or setting the default number of jobs:
>>> from joblib import parallel_config
>>> with parallel_config(backend='threading', n_jobs=2):
... Parallel()(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
The latter is especially useful when calling a library that uses
joblib.Parallel
internally without exposing backend selection as
part of its public API.
Note that the prefer="threads"
option was introduced in joblib 0.12.
In prior versions, the same effect could be achieved by hardcoding a
specific backend implementation such as backend="threading"
in the
call to joblib.Parallel
but this is now considered a bad pattern
(when done in a library) as it does not make it possible to override that
choice with the parallel_config()
context manager.
The loky backend may not always be available
Some rare systems do not support multiprocessing (for instance Pyodide). In this case the loky backend is not available and the default backend falls back to threading.
In addition to the builtin joblib backends, there are several cluster-specific backends you can use:
Dask backend for Dask clusters (see Using Dask for single-machine parallel computing for an example),
Ray backend for Ray clusters,
Joblib Apache Spark Backend to distribute joblib tasks on a Spark cluster.
Serialization & Processes¶
To share function definition across multiple python processes, it is necessary to rely on a serialization protocol. The standard protocol in python is pickle
but its default implementation in the standard library has several limitations. For instance, it cannot serialize functions which are defined interactively or in the __main__
module.
To avoid this limitation, the loky
backend now relies on cloudpickle
to serialize python objects. cloudpickle
is an alternative implementation of the pickle protocol which allows the serialization of a greater number of objects, in particular interactively defined functions. So for most usages, the loky backend
should work seamlessly.
The main drawback of cloudpickle
is that it can be slower than the pickle
module in the standard library. In particular, it is critical for large python dictionaries or lists, where the serialization time can be up to 100 times slower. There is two ways to alter the serialization process for the joblib
to temper this issue:
If you are on an UNIX system, you can switch back to the old
multiprocessing
backend. With this backend, interactively defined functions can be shared with the worker processes using the fastpickle
. The main issue with this solution is that usingfork
to start the process breaks the standard POSIX and can have weird interaction with third party libraries such asnumpy
andopenblas
.If you wish to use the
loky
backend with a different serialization library, you can set theLOKY_PICKLER=mod_pickle
environment variable to use themod_pickle
as the serialization library forloky
. The modulemod_pickle
passed as an argument should be importable asimport mod_pickle
and should contain aPickler
object, which will be used to serialize to objects. It can be set toLOKY_PICKLER=pickle
to use the pickling module from stdlib. The main drawback withLOKY_PICKLER=pickle
is that interactively defined functions will not be serializable anymore. To cope with this, you can use this solution together with thejoblib.wrap_non_picklable_objects()
wrapper, which can be used as a decorator to locally enable usingcloudpickle
for specific objects. This way, you can have fast pickling of all python objects and locally enable slow pickling for interactive functions. An example is given in loky_wrapper.
Reusing a pool of workers¶
Some algorithms require to make several consecutive calls to a parallel
function interleaved with processing of the intermediate results. Calling
joblib.Parallel
several times in a loop is sub-optimal because it will
create and destroy a pool of workers (threads or processes) several times which
can cause a significant overhead.
For this case it is more efficient to use the context manager API of the
joblib.Parallel
class to reuse the same pool of workers for several
calls to the joblib.Parallel
object:
>>> with Parallel(n_jobs=2) as parallel:
... accumulator = 0.
... n_iter = 0
... while accumulator < 1000:
... results = parallel(delayed(sqrt)(accumulator + i ** 2)
... for i in range(5))
... accumulator += sum(results) # synchronization barrier
... n_iter += 1
...
>>> (accumulator, n_iter)
(1136.596..., 14)
Note that the 'loky'
backend now used by default for process-based
parallelism automatically tries to maintain and reuse a pool of workers
by it-self even for calls without the context manager.
Avoiding over-subscription of CPU resources¶
The computation parallelism relies on the usage of multiple CPUs to perform the operation simultaneously. When using more processes than the number of CPU on a machine, the performance of each process is degraded as there is less computational power available for each process. Moreover, when many processes are running, the time taken by the OS scheduler to switch between them can further hinder the performance of the computation. It is generally better to avoid using significantly more processes or threads than the number of CPUs on a machine.
Some third-party libraries – e.g. the BLAS runtime used by numpy
–
internally manage a thread-pool to perform their computations. The default
behavior is generally to use a number of threads equals to the number of CPUs
available. When these libraries are used with joblib.Parallel
, each
worker will spawn its own thread-pools, resulting in a massive over-subscription
of resources that can slow down the computation compared to a sequential
one. To cope with this problem, joblib tells supported third-party libraries
to use a limited number of threads in workers managed by the 'loky'
backend: by default each worker process will have environment variables set to
allow a maximum of cpu_count() // n_jobs
so that the total number of
threads used by all the workers does not exceed the number of CPUs of the
host.
This behavior can be overridden by setting the proper environment variables to the desired number of threads. This override is supported for the following libraries:
OpenMP with the environment variable
'OMP_NUM_THREADS'
,OpenBLAS with the
'OPENBLAS_NUM_THREADS'
,MKL with the environment variable
'MKL_NUM_THREADS'
,Accelerated with the environment variable
'VECLIB_MAXIMUM_THREADS'
,Numexpr with the environment variable
'NUMEXPR_NUM_THREADS'
.
Since joblib 0.14, it is also possible to programmatically override the default
number of threads using the inner_max_num_threads
argument of the
parallel_config()
function as follows:
from joblib import Parallel, delayed, parallel_config
with parallel_config(backend="loky", inner_max_num_threads=2):
results = Parallel(n_jobs=4)(delayed(func)(x, y) for x, y in data)
In this example, 4 Python worker processes will be allowed to use 2 threads each, meaning that this program will be able to use up to 8 CPUs concurrently.
Custom backend API¶
New in version 0.10.
User can provide their own implementation of a parallel processing
backend in addition to the 'loky'
, 'threading'
,
'multiprocessing'
backends provided by default. A backend is
registered with the joblib.register_parallel_backend()
function by
passing a name and a backend factory.
The backend factory can be any callable that returns an instance of
ParallelBackendBase
. Please refer to the default backends source code as
a reference if you want to implement your own custom backend.
Note that it is possible to register a backend class that has some mandatory constructor parameters such as the network address and connection credentials for a remote cluster computing service:
class MyCustomBackend(ParallelBackendBase):
def __init__(self, endpoint, api_key):
self.endpoint = endpoint
self.api_key = api_key
...
# Do something with self.endpoint and self.api_key somewhere in
# one of the method of the class
register_parallel_backend('custom', MyCustomBackend)
The connection parameters can then be passed to the
parallel_config()
context manager:
with parallel_config(backend='custom', endpoint='http://compute',
api_key='42'):
Parallel()(delayed(some_function)(i) for i in range(10))
Using the context manager can be helpful when using a third-party library that
uses joblib.Parallel
internally while not exposing the backend
argument in its own API.
A problem exists that external packages that register new parallel backends must now be imported explicitly for their backends to be identified by joblib:
>>> import joblib
>>> with joblib.parallel_config(backend='custom'):
... ... # this fails
KeyError: 'custom'
# Import library to register external backend
>>> import my_custom_backend_library
>>> with joblib.parallel_config(backend='custom'):
... ... # this works
This can be confusing for users. To resolve this, external packages can
safely register their backends directly within the joblib codebase by creating
a small function that registers their backend, and including this function
within the joblib.parallel.EXTERNAL_PACKAGES
dictionary:
def _register_custom():
try:
import my_custom_library
except ImportError:
raise ImportError("an informative error message")
EXTERNAL_BACKENDS['custom'] = _register_custom
This is subject to community review, but can reduce the confusion for users when relying on side effects of external package imports.
Old multiprocessing backend¶
Prior to version 0.12, joblib used the 'multiprocessing'
backend as
default backend instead of 'loky'
.
This backend creates an instance of multiprocessing.Pool that forks the Python interpreter in multiple processes to execute each of the items of the list. The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.
Warning
Under Windows, the use of multiprocessing.Pool
requires to
protect the main loop of code to avoid recursive spawning of
subprocesses when using joblib.Parallel
. In other words, you
should be writing code like this when using the 'multiprocessing'
backend:
import ....
def function1(...):
...
def function2(...):
...
...
if __name__ == '__main__':
# do stuff with imports and functions defined about
...
No code should run outside of the "if __name__ ==
'__main__'"
blocks, only imports and definitions.
The 'loky'
backend used by default in joblib 0.12 and later does
not impose this anymore.
Bad interaction of multiprocessing and third-party libraries¶
Using the 'multiprocessing'
backend can cause a crash when using
third party libraries that manage their own native thread-pool if the
library is first used in the main process and subsequently called again
in a worker process (inside the joblib.Parallel
call).
Joblib version 0.12 and later are no longer subject to this problem thanks to the use of loky as the new default backend for process-based parallelism.
Prior to Python 3.4 the 'multiprocessing'
backend of joblib can only
use the fork
strategy to create worker processes under non-Windows
systems. This can cause some third-party libraries to crash or freeze.
Such libraries include Apple vecLib / Accelerate (used by NumPy under
OSX), some old version of OpenBLAS (prior to 0.2.10) or the OpenMP
runtime implementation from GCC which is used internally by third-party
libraries such as XGBoost, spaCy, OpenCV…
The best way to avoid this problem is to use the 'loky'
backend
instead of the multiprocessing
backend. Prior to joblib 0.12, it is
also possible to get joblib.Parallel
configured to use the
'forkserver'
start method on Python 3.4 and later. The start method
has to be configured by setting the JOBLIB_START_METHOD
environment
variable to 'forkserver'
instead of the default 'fork'
start
method. However the user should be aware that using the 'forkserver'
method prevents joblib.Parallel
to call function interactively
defined in a shell session.
You can read more on this topic in the multiprocessing documentation.
Under Windows the fork
system call does not exist at all so this problem
does not exist (but multiprocessing has more overhead).
Parallel reference documentation¶
- class joblib.Parallel(n_jobs=default(None), backend=default(None), return_as='list', verbose=default(0), timeout=None, pre_dispatch='2 * n_jobs', batch_size='auto', temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None))
Helper class for readable parallel mapping.
Read more in the User Guide.
- Parameters
- n_jobs: int, default=None
The maximum number of concurrently running jobs, such as the number of Python worker processes when
backend="loky"
or the size of the thread-pool whenbackend="threading"
. This argument is converted to an integer, rounded below for float. If -1 is given, joblib tries to use all CPUs. The number of CPUsn_cpus
is obtained withcpu_count()
. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, usingn_jobs=-2
will result in all CPUs but one being used. This argument can also go aboven_cpus
, which will cause oversubscription. In some cases, slight oversubscription can be beneficial, e.g., for tasks with large I/O operations. If 1 is given, no parallel computing code is used at all, and the behavior amounts to a simple python for loop. This mode is not compatible withtimeout
. None is a marker for ‘unset’ that will be interpreted as n_jobs=1 unless the call is performed under aparallel_config()
context manager that sets another value forn_jobs
. If n_jobs = 0 then a ValueError is raised.- backend: str, ParallelBackendBase instance or None, default=’loky’
Specify the parallelization backend implementation. Supported backends are:
“loky” used by default, can induce some communication and memory overhead when exchanging input and output data with the worker Python processes. On some rare systems (such as Pyiodide), the loky backend may not be available.
“multiprocessing” previous process-based backend based on multiprocessing.Pool. Less robust than loky.
“threading” is a very low-overhead backend but it suffers from the Python Global Interpreter Lock if the called function relies a lot on Python objects. “threading” is mostly useful when the execution bottleneck is a compiled extension that explicitly releases the GIL (for instance a Cython loop wrapped in a “with nogil” block or an expensive call to a library such as NumPy).
finally, you can register backends by calling
register_parallel_backend()
. This will allow you to implement a backend of your liking.
It is not recommended to hard-code the backend name in a call to
Parallel
in a library. Instead it is recommended to set soft hints (prefer) or hard constraints (require) so as to make it possible for library users to change the backend from the outside using theparallel_config()
context manager.- return_as: str in {‘list’, ‘generator’, ‘generator_unordered’}, default=’list’
If ‘list’, calls to this instance will return a list, only when all results have been processed and retrieved. If ‘generator’, it will return a generator that yields the results as soon as they are available, in the order the tasks have been submitted with. If ‘generator_unordered’, the generator will immediately yield available results independently of the submission order. The output order is not deterministic in this case because it depends on the concurrency of the workers.
- prefer: str in {‘processes’, ‘threads’} or None, default=None
Soft hint to choose the default backend if no specific backend was selected with the
parallel_config()
context manager. The default process-based backend is ‘loky’ and the default thread-based backend is ‘threading’. Ignored if thebackend
parameter is specified.- require: ‘sharedmem’ or None, default=None
Hard constraint to select the backend. If set to ‘sharedmem’, the selected backend will be single-host and thread-based even if the user asked for a non-thread based backend with
parallel_config()
.- verbose: int, default=0
The verbosity level: if non zero, progress messages are printed. Above 50, the output is sent to stdout. The frequency of the messages increases with the verbosity level. If it more than 10, all iterations are reported.
- timeout: float or None, default=None
Timeout limit for each task to complete. If any task takes longer a TimeOutError will be raised. Only applied when n_jobs != 1
- pre_dispatch: {‘all’, integer, or expression, as in ‘3*n_jobs’}, default=’2*n_jobs’
The number of batches (of tasks) to be pre-dispatched. Default is ‘2*n_jobs’. When batch_size=”auto” this is reasonable default and the workers should never starve. Note that only basic arithmetic are allowed here and no modules can be used in this expression.
- batch_size: int or ‘auto’, default=’auto’
The number of atomic tasks to dispatch at once to each worker. When individual evaluations are very fast, dispatching calls to workers can be slower than sequential computation because of the overhead. Batching fast computations together can mitigate this. The
'auto'
strategy keeps track of the time it takes for a batch to complete, and dynamically adjusts the batch size to keep the time on the order of half a second, using a heuristic. The initial batch size is 1.batch_size="auto"
withbackend="threading"
will dispatch batches of a single task at a time as the threading backend has very little overhead and using larger batch size has not proved to bring any gain in that case.- temp_folder: str or None, default=None
Folder to be used by the pool for memmapping large arrays for sharing memory with worker processes. If None, this will try in order:
a folder pointed by the JOBLIB_TEMP_FOLDER environment variable,
/dev/shm if the folder exists and is writable: this is a RAM disk filesystem available by default on modern Linux distributions,
the default system temporary folder that can be overridden with TMP, TMPDIR or TEMP environment variables, typically /tmp under Unix operating systems.
Only active when
backend="loky"
or"multiprocessing"
.- max_nbytes int, str, or None, optional, default=’1M’
Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays. Only active when
backend="loky"
or"multiprocessing"
.- mmap_mode: {None, ‘r+’, ‘r’, ‘w+’, ‘c’}, default=’r’
Memmapping mode for numpy arrays passed to workers. None will disable memmapping, other modes defined in the numpy.memmap doc: https://numpy.org/doc/stable/reference/generated/numpy.memmap.html Also, see ‘max_nbytes’ parameter documentation for more details.
Notes
This object uses workers to compute in parallel the application of a function to many different arguments. The main functionality it brings in addition to using the raw multiprocessing or concurrent.futures API are (see examples for details):
More readable code, in particular since it avoids constructing list of arguments.
- Easier debugging:
informative tracebacks even when the error happens on the client side
using ‘n_jobs=1’ enables to turn off parallel computing for debugging without changing the codepath
early capture of pickling errors
An optional progress meter.
Interruption of multiprocesses jobs with ‘Ctrl-C’
Flexible pickling control for the communication to and from the worker processes.
Ability to use shared memory efficiently with worker processes for large numpy-based datastructures.
Note that the intended usage is to run one call at a time. Multiple calls to the same Parallel object will result in a
RuntimeError
Examples
A simple example:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Reshaping the output when the function has several return values:
>>> from math import modf >>> from joblib import Parallel, delayed >>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10)) >>> res, i = zip(*r) >>> res (0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5) >>> i (0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
The progress meter: the higher the value of verbose, the more messages:
>>> from time import sleep >>> from joblib import Parallel, delayed >>> r = Parallel(n_jobs=2, verbose=10)( ... delayed(sleep)(.2) for _ in range(10)) [Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.6s [Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.8s [Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 1.4s finished
Traceback example, note how the line of the error is indicated as well as the values of the parameter passed to the function that triggered the exception, even though the traceback happens in the child process:
>>> from heapq import nlargest >>> from joblib import Parallel, delayed >>> Parallel(n_jobs=2)( ... delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3)) ... ----------------------------------------------------------------------- Sub-process traceback: ----------------------------------------------------------------------- TypeError Mon Nov 12 11:37:46 2012 PID: 12934 Python 2.7.3: /usr/bin/python ........................................................................ /usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None) 419 if n >= size: 420 return sorted(iterable, key=key, reverse=True)[:n] 421 422 # When key is none, use simpler decoration 423 if key is None: --> 424 it = izip(iterable, count(0,-1)) # decorate 425 result = _nlargest(n, it) 426 return map(itemgetter(0), result) # undecorate 427 428 # General case, slowest method TypeError: izip argument #1 must support iteration _______________________________________________________________________
Using pre_dispatch in a producer/consumer situation, where the data is generated on the fly. Note how the producer is first called 3 times before the parallel loop is initiated, and then called to generate new data on the fly:
>>> from math import sqrt >>> from joblib import Parallel, delayed >>> def producer(): ... for i in range(6): ... print('Produced %s' % i) ... yield i >>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')( ... delayed(sqrt)(i) for i in producer()) Produced 0 Produced 1 Produced 2 [Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s Produced 3 [Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s Produced 4 [Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s Produced 5 [Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s remaining: 0.0s [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished
- dispatch_next()
Dispatch more data for parallel processing
This method is meant to be called concurrently by the multiprocessing callback. We rely on the thread-safety of dispatch_one_batch to protect against concurrent consumption of the unprotected iterator.
- dispatch_one_batch(iterator)
Prefetch the tasks for the next batch and dispatch them.
The effective size of the batch is computed here. If there are no more jobs to dispatch, return False, else return True.
The iterator consumption and dispatching is protected by the same lock so calling this function should be thread safe.
- format(obj, indent=0)
Return the formatted representation of the object.
- print_progress()
Display the process of the parallel execution only a fraction of time, controlled by self.verbose.
- joblib.delayed(function)¶
Decorator used to capture the arguments of a function.
- joblib.parallel_config(backend=default(None), *, n_jobs=default(None), verbose=default(0), temp_folder=default(None), max_nbytes=default('1M'), mmap_mode=default('r'), prefer=default(None), require=default(None), inner_max_num_threads=None, **backend_params)
Set the default backend or configuration for
Parallel
.This is an alternative to directly passing keyword arguments to the
Parallel
class constructor. It is particularly useful when calling into library code that uses joblib internally but does not expose the various parallel configuration arguments in its own API.- Parameters
- backend: str or ParallelBackendBase instance, default=None
If
backend
is a string it must match a previously registered implementation using theregister_parallel_backend()
function.By default the following backends are available:
‘loky’: single-host, process-based parallelism (used by default),
‘threading’: single-host, thread-based parallelism,
‘multiprocessing’: legacy single-host, process-based parallelism.
‘loky’ is recommended to run functions that manipulate Python objects. ‘threading’ is a low-overhead alternative that is most efficient for functions that release the Global Interpreter Lock: e.g. I/O-bound code or CPU-bound code in a few calls to native code that explicitly releases the GIL. Note that on some rare systems (such as pyodide), multiprocessing and loky may not be available, in which case joblib defaults to threading.
In addition, if the
dask
anddistributed
Python packages are installed, it is possible to use the ‘dask’ backend for better scheduling of nested parallel calls without over-subscription and potentially distribute parallel calls over a networked cluster of several hosts.It is also possible to use the distributed ‘ray’ backend for distributing the workload to a cluster of nodes. See more details in the Examples section below.
Alternatively the backend can be passed directly as an instance.
- n_jobs: int, default=None
The maximum number of concurrently running jobs, such as the number of Python worker processes when
backend="loky"
or the size of the thread-pool whenbackend="threading"
. This argument is converted to an integer, rounded below for float. If -1 is given, joblib tries to use all CPUs. The number of CPUsn_cpus
is obtained withcpu_count()
. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. For instance, usingn_jobs=-2
will result in all CPUs but one being used. This argument can also go aboven_cpus
, which will cause oversubscription. In some cases, slight oversubscription can be beneficial, e.g., for tasks with large I/O operations. If 1 is given, no parallel computing code is used at all, and the behavior amounts to a simple python for loop. This mode is not compatible with timeout. None is a marker for ‘unset’ that will be interpreted as n_jobs=1 unless the call is performed under aparallel_config()
context manager that sets another value forn_jobs
. If n_jobs = 0 then a ValueError is raised.- verbose: int, default=0
The verbosity level: if non zero, progress messages are printed. Above 50, the output is sent to stdout. The frequency of the messages increases with the verbosity level. If it more than 10, all iterations are reported.
- temp_folder: str or None, default=None
Folder to be used by the pool for memmapping large arrays for sharing memory with worker processes. If None, this will try in order:
a folder pointed by the
JOBLIB_TEMP_FOLDER
environment variable,/dev/shm
if the folder exists and is writable: this is a RAM disk filesystem available by default on modern Linux distributions,the default system temporary folder that can be overridden with
TMP
,TMPDIR
orTEMP
environment variables, typically/tmp
under Unix operating systems.
- max_nbytes int, str, or None, optional, default=’1M’
Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays.
- mmap_mode: {None, ‘r+’, ‘r’, ‘w+’, ‘c’}, default=’r’
Memmapping mode for numpy arrays passed to workers. None will disable memmapping, other modes defined in the numpy.memmap doc: https://numpy.org/doc/stable/reference/generated/numpy.memmap.html Also, see ‘max_nbytes’ parameter documentation for more details.
- prefer: str in {‘processes’, ‘threads’} or None, default=None
Soft hint to choose the default backend. The default process-based backend is ‘loky’ and the default thread-based backend is ‘threading’. Ignored if the
backend
parameter is specified.- require: ‘sharedmem’ or None, default=None
Hard constraint to select the backend. If set to ‘sharedmem’, the selected backend will be single-host and thread-based.
- inner_max_num_threads: int, default=None
If not None, overwrites the limit set on the number of threads usable in some third-party library threadpools like OpenBLAS, MKL or OpenMP. This is only used with the
loky
backend.- backend_params: dict
Additional parameters to pass to the backend constructor when backend is a string.
Notes
Joblib tries to limit the oversubscription by limiting the number of threads usable in some third-party library threadpools like OpenBLAS, MKL or OpenMP. The default limit in each worker is set to
max(cpu_count() // effective_n_jobs, 1)
but this limit can be overwritten with theinner_max_num_threads
argument which will be used to set this limit in the child processes.New in version 1.3.
Examples
>>> from operator import neg >>> with parallel_config(backend='threading'): ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) ... [-1, -2, -3, -4, -5]
To use the ‘ray’ joblib backend add the following lines:
>>> from ray.util.joblib import register_ray >>> register_ray() >>> with parallel_config(backend="ray"): ... print(Parallel()(delayed(neg)(i + 1) for i in range(5))) [-1, -2, -3, -4, -5]
- joblib.wrap_non_picklable_objects(obj, keep_wrapper=True)¶
Wrapper for non-picklable object to use cloudpickle to serialize them.
Note that this wrapper tends to slow down the serialization process as it is done with cloudpickle which is typically slower compared to pickle. The proper way to solve serialization issues is to avoid defining functions and objects in the main scripts and to implement __reduce__ functions for complex classes.
- joblib.register_parallel_backend(name, factory, make_default=False)¶
Register a new Parallel backend factory.
The new backend can then be selected by passing its name as the backend argument to the
Parallel
class. Moreover, the default backend can be overwritten globally by setting make_default=True.The factory can be any callable that takes no argument and return an instance of
ParallelBackendBase
.Warning: this function is experimental and subject to change in a future version of joblib.
New in version 0.10.
- class joblib.parallel.ParallelBackendBase(nesting_level=None, inner_max_num_threads=None, **kwargs)¶
Helper abc which defines all methods a ParallelBackend must implement
- class joblib.parallel.AutoBatchingMixin(**kwargs)¶
A helper class for automagically batching jobs.