Execute function asynchronously on parallel pool worker
requests asynchronous execution of the function F
= parfeval(p
,fcn
,numout
,in1,in2,...
)fcn
on a worker contained
in the parallel pool p
, expecting numout
output
arguments and supplying as input arguments in1,in2,...
. The
asynchronous evaluation of fcn
does not block MATLAB.
F
is a parallel.FevalFuture object, from which the results can be obtained when the
worker has completed evaluating fcn
. The evaluation of
fcn
always proceeds unless you explicitly cancel execution by calling
cancel(F)
. To request multiple function evaluations, you must call
parfeval
multiple times. (However, parfevalOnAll
can
run the same function on all workers.)
requests asynchronous execution on the current parallel pool. If no pool exists, it starts a
new parallel pool, unless your parallel preferences disable automatic creation of
pools.F
= parfeval(fcn
,numout
,in1,in2,...
)
parfeval
FuturesWhen you use parfeval
or parfevalOnAll
to run computations in the background, you create objects called futures. You can use the State
property of a future to find out whether it is running, queued or finished. You can also use the FevalQueue
property of a parallel pool to access running and queued futures. To cancel futures, you can use the cancel
function. In this example, you:
Use cancel
to cancel futures directly.
Check completion errors on completed futures.
Use the FevalQueue
property to access futures.
Add Work to Queue
Create a parallel pool p
with two workers.
p = parpool(2);
Starting parallel pool (parpool) using the 'local' profile ... Connected to the parallel pool (number of workers: 2).
When you use parfeval
to run computations in the background, the function creates and adds a future for each computation to the pool queue. Tasks remain in the queue until a worker becomes idle. When a worker becomes idle, it starts to compute a task if the queue is not empty. When a worker completes a task, the task is removed from the queue and the worker becomes idle.
Use parfeval
to create an array of futures f
by instructing workers to execute the function pause
. Use an argument of 1
for the third future, and an argument of Inf
for all other futures.
for n = 1:5 if n == 3 f(n) = parfeval(@pause,0,1); else f(n) = parfeval(@pause,0,Inf); end end
Each use of parfeval
returns a future object that represents the execution of a function on a worker. Except for the third future, every future will take an infinite amount of time to compute. The future created by parfeval(@pause,0,Inf)
is an extreme case of a future which can slow down a queue.
Cancel Futures Directly
You can use the State
property to obtain the status of futures. Construct a cell array of the state of each future in f.
{f.State}
ans = 1×5 cell
{'running'} {'running'} {'queued'} {'queued'} {'queued'}
Every task except for the third pauses forever.
Cancel the second future directly with cancel
.
cancel(f(2)); {f.State}
ans = 1×5 cell
{'running'} {'finished'} {'running'} {'queued'} {'queued'}
After you cancel the second future, the third future runs. Wait until the third future completes, then examine the states again.
wait(f(3)); {f.State}
ans = 1×5 cell
{'running'} {'finished'} {'finished'} {'running'} {'queued'}
The third future now has the state 'finished'
.
Check Completion Errors
When a future completes, its State
property becomes 'finished'
. To distinguish between futures which are cancelled and complete normally, use the Error
property.
fprintf("f(2): %s\n", f(2).Error.message)
f(2): Execution of the future was cancelled.
fprintf("f(3): %s\n", f(3).Error.message)
f(3):
The code cancels the second future, as the message property indicates. The second future was cancelled, as stated in the message
property. The third future completes without error, and therefore does not have an error message.
Cancel Futures in Pool Queue
You can use the FevalQueue
property to access the futures in the pool queue.
p.FevalQueue
ans = FevalQueue with properties: Number Queued: 1 Number Running: 2
The queue has two properties: RunningFutures
and QueuedFutures
. The RunningFutures
property is an array of futures corresponding to tasks that are currently running.
disp(p.FevalQueue.RunningFutures)
1x2 FevalFuture array: ID State FinishDateTime Function Error -------------------------------------------------------- 1 12 running @pause 2 15 running @pause
The QueuedFutures
property is an array of futures corresponding to tasks that are currently queued and not running.
disp(p.FevalQueue.QueuedFutures)
FevalFuture with properties: ID: 16 Function: @pause CreateDateTime: 15-Jul-2020 17:29:37 StartDateTime: Running Duration: 0 days 0h 0m 0s State: queued Error: none
You can cancel a single future or an array of futures. Cancel all the futures in QueuedFutures
.
cancel(p.FevalQueue.QueuedFutures); {f.State}
ans = 1×5 cell
{'running'} {'finished'} {'finished'} {'running'} {'finished'}
RunningFutures
and QueuedFutures
are sorted from newest to oldest, regardless of whether f
is in order from newest to oldest. Each future has a unique ID
property for the lifetime of the client. Check the ID
property of each of the futures in f
.
disp(f)
1x5 FevalFuture array: ID State FinishDateTime Function Error -------------------------------------------------------------- 1 12 running @pause 2 13 finished (unread) 15-Jul-2020 17:29:37 @pause Error 3 14 finished (unread) 15-Jul-2020 17:29:39 @pause 4 15 running @pause 5 16 finished (unread) 15-Jul-2020 17:29:39 @pause Error
Compare the result against the ID
property of each of the RunningFutures
.
for j = 1:length(p.FevalQueue.RunningFutures) rf = p.FevalQueue.RunningFutures(j); fprintf("p.FevalQueue.RunningFutures(%i): ID = %i\n", j, rf.ID) end
p.FevalQueue.RunningFutures(1): ID = 12 p.FevalQueue.RunningFutures(2): ID = 15
Here, RunningFutures
is an array containing f(1)
and f(4)
. If you cancel RunningFutures(2)
, you cancel the fourth future f(4)
.
Sometimes, futures are not available in the workspace, for example, if you execute the same piece of code twice before it finishes, or if you use parfeval
in a function. You can cancel futures that are not available in the workspace.
Clear f
from the workspace.
clear f
You can use RunningFutures
and QueuedFutures
to access futures that have not yet completed. Use RunningFutures
to cancel f(4)
.
rf2 = p.FevalQueue.RunningFutures(2); cancel(rf2) rf2.State
ans = 'finished'
To cancel all the futures still in the queue, use the following code.
cancel([p.FevalQueue.RunningFutures p.FevalQueue.QueuedFutures])
Use parfeval
to request asynchronous execution
of a function on a worker.
For example, submit a single request to the parallel pool. Retrieve the outputs by
using fetchOutputs
.
f = parfeval(@magic,1,10); value = fetchOutputs(f);
You can also submit a vector of multiple future requests in a
for
-loop and collect the results as they become available. For
efficiency, preallocate an array of future objects before.
f(1:10) = parallel.FevalFuture; for idx = 1:10 f(idx) = parfeval(@magic,1,idx); end
Retrieve the individual future outputs as they become available by using
fetchNext
.
magicResults = cell(1,10); for idx = 1:10 [completedIdx,value] = fetchNext(f); magicResults{completedIdx} = value; fprintf('Got result with index: %d.\n', completedIdx); end
parfeval
This example shows how to perform a parallel parameter sweep with parfeval
and send results back during computations with a DataQueue
object. parfeval
does not block MATLAB, so you can continue working while computations take place.
The example performs a parameter sweep on the Lorenz system of ordinary differential equations, on the parameters and , and shows the chaotic nature of this system.
Create Parameter Grid
Define the range of parameters that you want to explore in the parameter sweep.
gridSize = 40; sigma = linspace(5, 45, gridSize); rho = linspace(50, 100, gridSize); beta = 8/3;
Create a 2-D grid of parameters by using the meshgrid
function.
[rho,sigma] = meshgrid(rho,sigma);
Create a figure object, and set 'Visible'
to true
so that it opens in a new window, outside of the live script. To visualize the results of the parameter sweep, create a surface plot. Note that initializing the Z
component of the surface with NaN
creates an empty plot.
figure('Visible',true); surface = surf(rho,sigma,NaN(size(sigma))); xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
Set Up Parallel Environment
Create a pool of parallel workers by using the parpool
function.
parpool;
Starting parallel pool (parpool) using the 'local' profile ... Connected to the parallel pool (number of workers: 6).
To send data from the workers, create a DataQueue
object. Set up a function that updates the surface plot each time a worker sends data by using the afterEach
function. The updatePlot
function is a supporting function defined at the end of the example.
Q = parallel.pool.DataQueue; afterEach(Q,@(data) updatePlot(surface,data));
Perform Parallel Parameter Sweep
After you define the parameters, you can perform the parallel parameter sweep.
parfeval
works more efficiently when you distribute the workload. To distribute the workload, group the parameters to explore into partitions. For this example, split into uniform partitions of size step
by using the colon operator (:
). The resulting array partitions
contains the boundaries of the partitions. Note that you must add the end point of the last partition.
step = 100; partitions = [1:step:numel(sigma), numel(sigma)+1]
partitions = 1×17
1 101 201 301 401 501 601 701 801 901 1001 1101 1201 1301 1401 1501 1601
For best performance, try to split into partitions that are:
Large enough that the computation time is large compared to the overhead of scheduling the partition.
Small enough that there are enough partitions to keep all workers busy.
To represent function executions on parallel workers and hold their results, use future objects.
f(1:numel(partitions)-1) = parallel.FevalFuture;
Offload computations to parallel workers by using the parfeval
function. parameterSweep
is a helper function defined at the end of this script that solves the Lorenz system on a partition of the parameters to explore. It has one output argument, so you must specify 1
as the number of outputs in parfeval
.
for ii = 1:numel(partitions)-1 f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q); end
parfeval
does not block MATLAB, so you can continue working while computations take place. The workers compute in parallel and send intermediate results through the DataQueue
as soon as they become available.
If you want to block MATLAB until parfeval
completes, use the wait
function on the future objects. Using the wait
function is useful when subsequent code depends on the completion of parfeval
.
wait(f);
After parfeval
finishes the computations, wait
finishes and you can execute more code. For example, plot the contour of the resulting surface. Use the fetchOutputs
function to retrieve the results stored in the future objects.
results = reshape(fetchOutputs(f),gridSize,[]); contourf(rho,sigma,results) xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')
If your parameter sweep needs more computational resources and you have access to a cluster, you can scale up your parfeval
computations. For more information, see Scale Up from Desktop to Cluster.
Define Helper Functions
Define a helper function that solves the Lorenz system on a partition of the parameters to explore. Send intermediate results to the MATLAB client by using the send
function on the DataQueue
object.
function results = parameterSweep(first,last,sigma,rho,beta,Q) results = zeros(last-first,1); for ii = first:last-1 lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)]; [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]); result = a(end,3); send(Q,[ii,result]); results(ii-first+1) = result; end end
Define another helper function that updates the surface plot when new data arrives.
function updatePlot(surface,data) surface.ZData(data(1)) = data(2); drawnow('limitrate'); end
afterEach
and afterAll
You can perform asynchronous computations on workers using parfeval
and leave the user interface responsive. Use afterEach
to update the user interface when intermediate computations are ready. Use afterAll
to update the user interface when all the computations are ready.
Create a simple user interface using a waitbar
.
h = waitbar(0, 'Waiting...');
Use parfeval
to carry out time-consuming computations in the workers, for example, eigenvalues of random matrices. The computations happen asynchronously and the user interface updates during computation. With default preferences, parfeval
creates a parpool
automatically if there is not one already created.
for idx = 1:100 f(idx) = parfeval(@(n) real(eig(randn(n))), 1, 5e2); end
Compute the largest value in each of the computations when they become ready using afterEach
. Update the proportion of finished futures in the waitbar when each of them completes using afterEach
.
maxFuture = afterEach(f, @max, 1);
updateWaitbarFuture = afterEach(f, @(~) waitbar(sum(strcmp('finished', {f.State}))/numel(f), h), 1);
Close the waitbar when all the computations are done. Use afterAll
on updateWaitbarFuture
to continue automatically with a close operation. afterAll
obtains the figure handle from updateWaitbarFuture
and executes its function on it.
closeWaitbarFuture = afterAll(updateWaitbarFuture, @(h) delete(h), 0);
Show a histogram after all the maximum values are computed. Use afterAll
on maxFuture
to continue the operation automatically. afterAll
obtains the maximum values from maxFuture
and calls histogram
on them.
showsHistogramFuture = afterAll(maxFuture, @histogram, 0);
p
— Parallel poolparallel.Pool
objectParallel pool of workers, specified as a parallel.Pool
object. You
can create a parallel pool by using the parpool
function.
Data Types: parallel.Pool
fcn
— Function to executeFunction to execute on a worker, specified as a function handle.
Example: fcn = @sum
Data Types: function_handle
numout
— Number of output argumentsNumber of output arguments that are expected from fcn
.
Data Types: single
| double
| int8
| int16
| int32
| int64
| uint8
| uint16
| uint32
| uint64
in1,in2,...
— Function argumentsFunction arguments to pass to fcn
, specified as a
comma-separated list of variables or expressions.
F
— Futureparallel.FevalFuture
Future object, returned as a parallel.FevalFuture
, that represents
the execution of fcn
on a parallel worker and holds its results.
Use fetchOutputs
or fetchNext
to collect the results.
afterAll
| afterEach
| cancel
| fetchNext
| fetchOutputs
| isequal
| parallel.FevalFuture | parallel.pool.Constant
| parfevalOnAll
| parpool
| ticBytes
| tocBytes
| wait
You have a modified version of this example. Do you want to open this example with your edits?