parfeval

Execute function asynchronously on parallel pool worker

Description

example

F = parfeval(p,fcn,numout,in1,in2,...) requests asynchronous execution of the function fcn on a worker contained in the parallel pool p, expecting numout output arguments and supplying as input arguments in1,in2,.... The asynchronous evaluation of fcn does not block MATLAB. F is a parallel.FevalFuture object, from which the results can be obtained when the worker has completed evaluating fcn. The evaluation of fcn always proceeds unless you explicitly cancel execution by calling cancel(F). To request multiple function evaluations, you must call parfeval multiple times. (However, parfevalOnAll can run the same function on all workers.)

example

F = parfeval(fcn,numout,in1,in2,...) requests asynchronous execution on the current parallel pool. If no pool exists, it starts a new parallel pool, unless your parallel preferences disable automatic creation of pools.

Examples

collapse all

When you use parfeval or parfevalOnAll to run computations in the background, you create objects called futures. You can use the State property of a future to find out whether it is running, queued or finished. You can also use the FevalQueue property of a parallel pool to access running and queued futures. To cancel futures, you can use the cancel function. In this example, you:

  • Use cancel to cancel futures directly.

  • Check completion errors on completed futures.

  • Use the FevalQueue property to access futures.

Add Work to Queue

Create a parallel pool p with two workers.

p = parpool(2);
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 2).

When you use parfeval to run computations in the background, the function creates and adds a future for each computation to the pool queue. Tasks remain in the queue until a worker becomes idle. When a worker becomes idle, it starts to compute a task if the queue is not empty. When a worker completes a task, the task is removed from the queue and the worker becomes idle.

Use parfeval to create an array of futures f by instructing workers to execute the function pause. Use an argument of 1 for the third future, and an argument of Inf for all other futures.

for n = 1:5
    if n == 3
        f(n) = parfeval(@pause,0,1);
    else
        f(n) = parfeval(@pause,0,Inf);
    end
end

Each use of parfeval returns a future object that represents the execution of a function on a worker. Except for the third future, every future will take an infinite amount of time to compute. The future created by parfeval(@pause,0,Inf) is an extreme case of a future which can slow down a queue.

Cancel Futures Directly

You can use the State property to obtain the status of futures. Construct a cell array of the state of each future in f.

{f.State}
ans = 1×5 cell
    {'running'}    {'running'}    {'queued'}    {'queued'}    {'queued'}

Every task except for the third pauses forever.

Cancel the second future directly with cancel.

cancel(f(2));
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'running'}    {'queued'}    {'queued'}

After you cancel the second future, the third future runs. Wait until the third future completes, then examine the states again.

wait(f(3));
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'finished'}    {'running'}    {'queued'}

The third future now has the state 'finished'.

Check Completion Errors

When a future completes, its State property becomes 'finished'. To distinguish between futures which are cancelled and complete normally, use the Error property.

fprintf("f(2): %s\n", f(2).Error.message)
f(2): Execution of the future was cancelled.
fprintf("f(3): %s\n", f(3).Error.message)
f(3): 

The code cancels the second future, as the message property indicates. The second future was cancelled, as stated in the message property. The third future completes without error, and therefore does not have an error message.

Cancel Futures in Pool Queue

You can use the FevalQueue property to access the futures in the pool queue.

p.FevalQueue
ans = 
 FevalQueue with properties: 

        Number Queued: 1
       Number Running: 2

The queue has two properties: RunningFutures and QueuedFutures. The RunningFutures property is an array of futures corresponding to tasks that are currently running.

disp(p.FevalQueue.RunningFutures)
 1x2 FevalFuture array:
 
         ID              State  FinishDateTime  Function  Error
       --------------------------------------------------------
    1    12            running                    @pause       
    2    15            running                    @pause       

The QueuedFutures property is an array of futures corresponding to tasks that are currently queued and not running.

disp(p.FevalQueue.QueuedFutures)
 FevalFuture with properties: 

                   ID: 16
             Function: @pause
       CreateDateTime: 15-Jul-2020 17:29:37
        StartDateTime: 
     Running Duration: 0 days 0h 0m 0s
                State: queued
                Error: none

You can cancel a single future or an array of futures. Cancel all the futures in QueuedFutures.

cancel(p.FevalQueue.QueuedFutures);
{f.State}
ans = 1×5 cell
    {'running'}    {'finished'}    {'finished'}    {'running'}    {'finished'}

RunningFutures and QueuedFutures are sorted from newest to oldest, regardless of whether f is in order from newest to oldest. Each future has a unique ID property for the lifetime of the client. Check the ID property of each of the futures in f.

disp(f)
 1x5 FevalFuture array:
 
         ID              State        FinishDateTime  Function  Error
       --------------------------------------------------------------
    1    12            running                          @pause       
    2    13  finished (unread)  15-Jul-2020 17:29:37    @pause  Error
    3    14  finished (unread)  15-Jul-2020 17:29:39    @pause       
    4    15            running                          @pause       
    5    16  finished (unread)  15-Jul-2020 17:29:39    @pause  Error

Compare the result against the ID property of each of the RunningFutures.

for j = 1:length(p.FevalQueue.RunningFutures)
    rf = p.FevalQueue.RunningFutures(j);
    fprintf("p.FevalQueue.RunningFutures(%i): ID = %i\n", j, rf.ID)
end
p.FevalQueue.RunningFutures(1): ID = 12
p.FevalQueue.RunningFutures(2): ID = 15

Here, RunningFutures is an array containing f(1) and f(4). If you cancel RunningFutures(2), you cancel the fourth future f(4).

Sometimes, futures are not available in the workspace, for example, if you execute the same piece of code twice before it finishes, or if you use parfeval in a function. You can cancel futures that are not available in the workspace.

Clear f from the workspace.

clear f

You can use RunningFutures and QueuedFutures to access futures that have not yet completed. Use RunningFutures to cancel f(4).

rf2 = p.FevalQueue.RunningFutures(2);
cancel(rf2)
rf2.State
ans = 
'finished'

To cancel all the futures still in the queue, use the following code.

cancel([p.FevalQueue.RunningFutures p.FevalQueue.QueuedFutures])

Use parfeval to request asynchronous execution of a function on a worker.

For example, submit a single request to the parallel pool. Retrieve the outputs by using fetchOutputs.

f = parfeval(@magic,1,10);
value = fetchOutputs(f);

You can also submit a vector of multiple future requests in a for-loop and collect the results as they become available. For efficiency, preallocate an array of future objects before.

f(1:10) = parallel.FevalFuture;
for idx = 1:10
    f(idx) = parfeval(@magic,1,idx);
end

Retrieve the individual future outputs as they become available by using fetchNext.

magicResults = cell(1,10);
for idx = 1:10
    [completedIdx,value] = fetchNext(f);
    magicResults{completedIdx} = value;
    fprintf('Got result with index: %d.\n', completedIdx);
end

This example shows how to perform a parallel parameter sweep with parfeval and send results back during computations with a DataQueue object. parfeval does not block MATLAB, so you can continue working while computations take place.

The example performs a parameter sweep on the Lorenz system of ordinary differential equations, on the parameters σ and ρ, and shows the chaotic nature of this system.

ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

Create Parameter Grid

Define the range of parameters that you want to explore in the parameter sweep.

gridSize = 40;
sigma = linspace(5, 45, gridSize);
rho = linspace(50, 100, gridSize);
beta = 8/3;

Create a 2-D grid of parameters by using the meshgrid function.

[rho,sigma] = meshgrid(rho,sigma);

Create a figure object, and set 'Visible' to true so that it opens in a new window, outside of the live script. To visualize the results of the parameter sweep, create a surface plot. Note that initializing the Z component of the surface with NaN creates an empty plot.

figure('Visible',true);
surface = surf(rho,sigma,NaN(size(sigma)));
xlabel('\rho','Interpreter','Tex')
ylabel('\sigma','Interpreter','Tex')

Set Up Parallel Environment

Create a pool of parallel workers by using the parpool function.

parpool;
Starting parallel pool (parpool) using the 'local' profile ...
Connected to the parallel pool (number of workers: 6).

To send data from the workers, create a DataQueue object. Set up a function that updates the surface plot each time a worker sends data by using the afterEach function. The updatePlot function is a supporting function defined at the end of the example.

Q = parallel.pool.DataQueue;
afterEach(Q,@(data) updatePlot(surface,data));

Perform Parallel Parameter Sweep

After you define the parameters, you can perform the parallel parameter sweep.

parfeval works more efficiently when you distribute the workload. To distribute the workload, group the parameters to explore into partitions. For this example, split into uniform partitions of size step by using the colon operator (:). The resulting array partitions contains the boundaries of the partitions. Note that you must add the end point of the last partition.

step = 100;
partitions = [1:step:numel(sigma), numel(sigma)+1]
partitions = 1×17

           1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601

For best performance, try to split into partitions that are:

  • Large enough that the computation time is large compared to the overhead of scheduling the partition.

  • Small enough that there are enough partitions to keep all workers busy.

To represent function executions on parallel workers and hold their results, use future objects.

f(1:numel(partitions)-1) = parallel.FevalFuture;

Offload computations to parallel workers by using the parfeval function. parameterSweep is a helper function defined at the end of this script that solves the Lorenz system on a partition of the parameters to explore. It has one output argument, so you must specify 1 as the number of outputs in parfeval.

for ii = 1:numel(partitions)-1
    f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
end

parfeval does not block MATLAB, so you can continue working while computations take place. The workers compute in parallel and send intermediate results through the DataQueue as soon as they become available.

If you want to block MATLAB until parfeval completes, use the wait function on the future objects. Using the wait function is useful when subsequent code depends on the completion of parfeval.

wait(f);

After parfeval finishes the computations, wait finishes and you can execute more code. For example, plot the contour of the resulting surface. Use the fetchOutputs function to retrieve the results stored in the future objects.

results = reshape(fetchOutputs(f),gridSize,[]);
contourf(rho,sigma,results)
xlabel('\rho','Interpreter','Tex')
ylabel('\sigma','Interpreter','Tex')

If your parameter sweep needs more computational resources and you have access to a cluster, you can scale up your parfeval computations. For more information, see Scale Up from Desktop to Cluster.

Define Helper Functions

Define a helper function that solves the Lorenz system on a partition of the parameters to explore. Send intermediate results to the MATLAB client by using the send function on the DataQueue object.

function results = parameterSweep(first,last,sigma,rho,beta,Q)
    results = zeros(last-first,1);
    for ii = first:last-1
        lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
        [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
        result = a(end,3);
        send(Q,[ii,result]);
        results(ii-first+1) = result;
    end
end

Define another helper function that updates the surface plot when new data arrives.

function updatePlot(surface,data)
    surface.ZData(data(1)) = data(2);
    drawnow('limitrate');
end

You can perform asynchronous computations on workers using parfeval and leave the user interface responsive. Use afterEach to update the user interface when intermediate computations are ready. Use afterAll to update the user interface when all the computations are ready.

Create a simple user interface using a waitbar.

h = waitbar(0, 'Waiting...');

Use parfeval to carry out time-consuming computations in the workers, for example, eigenvalues of random matrices. The computations happen asynchronously and the user interface updates during computation. With default preferences, parfeval creates a parpool automatically if there is not one already created.

for idx = 1:100
    f(idx) = parfeval(@(n) real(eig(randn(n))), 1, 5e2); 
end

Compute the largest value in each of the computations when they become ready using afterEach. Update the proportion of finished futures in the waitbar when each of them completes using afterEach.

maxFuture = afterEach(f, @max, 1);
updateWaitbarFuture = afterEach(f, @(~) waitbar(sum(strcmp('finished', {f.State}))/numel(f), h), 1);

Close the waitbar when all the computations are done. Use afterAll on updateWaitbarFuture to continue automatically with a close operation. afterAll obtains the figure handle from updateWaitbarFuture and executes its function on it.

closeWaitbarFuture = afterAll(updateWaitbarFuture, @(h) delete(h), 0);

Show a histogram after all the maximum values are computed. Use afterAll on maxFuture to continue the operation automatically. afterAll obtains the maximum values from maxFuture and calls histogram on them.

showsHistogramFuture = afterAll(maxFuture, @histogram, 0);

Input Arguments

collapse all

Parallel pool of workers, specified as a parallel.Pool object. You can create a parallel pool by using the parpool function.

Data Types: parallel.Pool

Function to execute on a worker, specified as a function handle.

Example: fcn = @sum

Data Types: function_handle

Number of output arguments that are expected from fcn.

Data Types: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

Function arguments to pass to fcn, specified as a comma-separated list of variables or expressions.

Output Arguments

collapse all

Future object, returned as a parallel.FevalFuture, that represents the execution of fcn on a parallel worker and holds its results. Use fetchOutputs or fetchNext to collect the results.

Introduced in R2013b