NumPy memmap in joblib.Parallel

This example illustrates some features enabled by using a memory map (numpy.memmap) within joblib.Parallel. First, we show that dumping a huge data array ahead of passing it to joblib.Parallel speeds up computation. Then, we show the possibility to provide write access to original data.

Speed up processing of a large data array

We create a large data array for which the average is computed for several slices.

import numpy as np

data = np.random.random((int(1e7),))
window_size = int(5e5)
slices = [slice(start, start + window_size)
          for start in range(0, data.size - window_size, int(1e5))]

The slow_mean function introduces a time.sleep() call to simulate a more expensive computation cost for which parallel computing is beneficial. Parallel may not be beneficial for very fast operation, due to extra overhead (workers creations, communication, etc.).

import time


def slow_mean(data, sl):
    """Simulate a time consuming processing."""
    time.sleep(0.01)
    return data[sl].mean()

First, we will evaluate the sequential computing on our problem.

tic = time.time()
results = [slow_mean(data, sl) for sl in slices]
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))
Elapsed time computing the average of couple of slices 0.98 s

joblib.Parallel is used to compute in parallel the average of all slices using 2 workers.

from joblib import Parallel, delayed


tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s'
      .format(toc - tic))
Elapsed time computing the average of couple of slices 0.70 s

Parallel processing is already faster than the sequential processing. It is also possible to remove a bit of overhead by dumping the data array to a memmap and pass the memmap to joblib.Parallel.

import os
from joblib import dump, load

folder = './joblib_memmap'
try:
    os.mkdir(folder)
except FileExistsError:
    pass

data_filename_memmap = os.path.join(folder, 'data_memmap')
dump(data, data_filename_memmap)
data = load(data_filename_memmap, mmap_mode='r')

tic = time.time()
results = Parallel(n_jobs=2)(delayed(slow_mean)(data, sl) for sl in slices)
toc = time.time()
print('\nElapsed time computing the average of couple of slices {:.2f} s\n'
      .format(toc - tic))
Elapsed time computing the average of couple of slices 0.51 s

Therefore, dumping large data array ahead of calling joblib.Parallel can speed up the processing by removing some overhead.

Writable memmap for shared memory joblib.Parallel

slow_mean_write_output will compute the mean for some given slices as in the previous example. However, the resulting mean will be directly written on the output array.

def slow_mean_write_output(data, sl, output, idx):
    """Simulate a time consuming processing."""
    time.sleep(0.005)
    res_ = data[sl].mean()
    print("[Worker %d] Mean for slice %d is %f" % (os.getpid(), idx, res_))
    output[idx] = res_

Prepare the folder where the memmap will be dumped.

Pre-allocate a writable shared memory map as a container for the results of the parallel computation.

output = np.memmap(output_filename_memmap, dtype=data.dtype,
                   shape=len(slices), mode='w+')

data is replaced by its memory mapped version. Note that the buffer has already been dumped in the previous section.

data = load(data_filename_memmap, mmap_mode='r')

Fork the worker processes to perform computation concurrently

Parallel(n_jobs=2)(delayed(slow_mean_write_output)(data, sl, output, idx)
                   for idx, sl in enumerate(slices))
[None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None]

Compare the results from the output buffer with the expected results

print("\nExpected means computed in the parent process:\n {}"
      .format(np.array(results)))
print("\nActual means computed by the worker processes:\n {}"
      .format(output))
Expected means computed in the parent process:
 [0.49969502 0.49974854 0.50006537 0.50030753 0.50034505 0.50032336
 0.50026086 0.50000087 0.49978799 0.50023447 0.50046521 0.50030388
 0.50032568 0.50052593 0.50016337 0.49984107 0.49982123 0.50004543
 0.49999345 0.50012406 0.50011315 0.4998125  0.49963104 0.49912847
 0.49933424 0.49929975 0.49953367 0.4996653  0.49990625 0.49990497
 0.50004955 0.50006521 0.49987123 0.49984162 0.49968302 0.49971152
 0.49955959 0.49967502 0.49959222 0.49956587 0.49931289 0.49907112
 0.4991957  0.49949689 0.4995176  0.49971149 0.50065802 0.50013745
 0.49988925 0.49990504 0.49997196 0.49962077 0.4999344  0.50014626
 0.50017754 0.50008372 0.50020196 0.50053593 0.50053088 0.50044691
 0.50058719 0.5004136  0.50003649 0.49986364 0.49968287 0.49960646
 0.49962993 0.49994235 0.50032503 0.50047262 0.50043531 0.50049752
 0.50024755 0.49985755 0.49994162 0.49989015 0.50007751 0.50036545
 0.50082722 0.50085856 0.50094555 0.50060472 0.50034287 0.50012409
 0.50018633 0.50000964 0.49995803 0.49989549 0.50004147 0.50007115
 0.50013099 0.50032093 0.50069431 0.50017957 0.50015598]

Actual means computed by the worker processes:
 [0.49969502 0.49974854 0.50006537 0.50030753 0.50034505 0.50032336
 0.50026086 0.50000087 0.49978799 0.50023447 0.50046521 0.50030388
 0.50032568 0.50052593 0.50016337 0.49984107 0.49982123 0.50004543
 0.49999345 0.50012406 0.50011315 0.4998125  0.49963104 0.49912847
 0.49933424 0.49929975 0.49953367 0.4996653  0.49990625 0.49990497
 0.50004955 0.50006521 0.49987123 0.49984162 0.49968302 0.49971152
 0.49955959 0.49967502 0.49959222 0.49956587 0.49931289 0.49907112
 0.4991957  0.49949689 0.4995176  0.49971149 0.50065802 0.50013745
 0.49988925 0.49990504 0.49997196 0.49962077 0.4999344  0.50014626
 0.50017754 0.50008372 0.50020196 0.50053593 0.50053088 0.50044691
 0.50058719 0.5004136  0.50003649 0.49986364 0.49968287 0.49960646
 0.49962993 0.49994235 0.50032503 0.50047262 0.50043531 0.50049752
 0.50024755 0.49985755 0.49994162 0.49989015 0.50007751 0.50036545
 0.50082722 0.50085856 0.50094555 0.50060472 0.50034287 0.50012409
 0.50018633 0.50000964 0.49995803 0.49989549 0.50004147 0.50007115
 0.50013099 0.50032093 0.50069431 0.50017957 0.50015598]

Clean-up the memmap

Remove the different memmap that we created. It might fail in Windows due to file permissions.

import shutil

try:
    shutil.rmtree(folder)
except:  # noqa
    print('Could not clean-up automatically.')

Total running time of the script: (0 minutes 2.631 seconds)

Gallery generated by Sphinx-Gallery