Note
Go to the end to download the full example code
Checkpoint using joblib.Memory and joblib.Parallel¶
This example illustrates how to cache intermediate computing results using
joblib.Memory
within joblib.Parallel
.
Embed caching within parallel processing¶
It is possible to cache a computationally expensive function executed during a parallel process.
costly_compute
emulates such time consuming function.
import time
def costly_compute(data, column):
"""Emulate a costly function by sleeping and returning a column."""
time.sleep(2)
return data[column]
def data_processing_mean(data, column):
"""Compute the mean of a column."""
return costly_compute(data, column).mean()
Create some data. The random seed is fixed to generate deterministic data across Python session. Note that this is not necessary for this specific example since the memory cache is cleared at the end of the session.
import numpy as np
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)
It is first possible to make the processing without caching or parallel processing.
Sequential processing
Elapsed time for the entire processing: 8.01 s
costly_compute
is expensive to compute and it is used as an intermediate
step in data_processing_mean
. Therefore, it is interesting to store the
intermediate results from costly_compute
using joblib.Memory
.
from joblib import Memory
location = './cachedir'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)
Now, we define data_processing_mean_using_cache
which benefits from the
cache by calling costly_compute_cached
def data_processing_mean_using_cache(data, column):
"""Compute the mean of a column."""
return costly_compute_cached(data, column).mean()
Then, we execute the same processing in parallel and caching the intermediate results.
from joblib import Parallel, delayed
start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_mean_using_cache)(data, col)
for col in range(data.shape[1]))
stop = time.time()
print('\nFirst round - caching the data')
print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))
First round - caching the data
Elapsed time for the entire processing: 4.50 s
By using 2 workers, the parallel processing gives a x2 speed-up compared to
the sequential case. By executing again the same process, the intermediate
results obtained by calling costly_compute_cached
will be loaded from the
cache instead of executing the function.
Second round - reloading from the cache
Elapsed time for the entire processing: 0.02 s
Reuse intermediate checkpoints¶
Having cached the intermediate results of the
costly_compute_cached
function, they are reusable by calling the function. We define a new processing which will take the maximum of the array returned bycostly_compute_cached
instead of previously the mean.
def data_processing_max_using_cache(data, column):
"""Compute the max of a column."""
return costly_compute_cached(data, column).max()
start = time.time()
results = Parallel(n_jobs=2)(
delayed(data_processing_max_using_cache)(data, col)
for col in range(data.shape[1]))
stop = time.time()
print('\nReusing intermediate checkpoints')
print('Elapsed time for the entire processing: {:.2f} s'
.format(stop - start))
Reusing intermediate checkpoints
Elapsed time for the entire processing: 0.02 s
The processing time only corresponds to the execution of the max
function. The internal call to costly_compute_cached
is reloading the
results from the cache.
Clean-up the cache folder¶
memory.clear(warn=False)
Total running time of the script: ( 0 minutes 12.566 seconds)