Parallel processing of data stream
Show older comments
I am trying to process big data in parallel using parallel toolbox. The idea is quite simple. First, I would have a function running in main process (client) that reads the data batch by batch and put them into a queue (let's call it input queue). Next, I would create bunch of workers that load the data from this queue, process it and send the results to different, output queue. Finally, I would have a final worker that reads the data from this output queue serialize it and most probably save it (or whatewer).
The problem with this approach I am having is following. Ideally I would have a single input queue. However in order for the workers to be able to pull the queue it has to be created within such worker. Ok, let's make separate queues for individual workers. However now, the length of such queues is invisible to the main thread which I need for two reasons. First, I would send the data to the most empty queue to equalize load on all workers. Second, I would like to monitor the length of the queues to not overfill them (like having a fix-length buffer).
Can you reccommend me a pattern to deal with such problems. I believe it is somehow typical problem that many people actualy solve. Thanks.
Accepted Answer
More Answers (2)
Thomas Falch
on 15 May 2025
Starting in R2025a you can use the new "any-destination" PollableDataQueue to achieve this more easily. Unlike the old PollableDataQueue the any-destination PollableDataQueue can be created on any worker/client, and be used to send to any client/worker. You can create one like this:
queue = parallel.pool.PollableDataQueue(Destination="any");
You can find an example of a workflow very similar to yours here: Perform Data Acquisition and Processing on Pool Workers - MATLAB & Simulink
Strider
on 28 Mar 2024
I do not see a need for multiple ques or having the client load everything in serial. I would let the workers do the loading and saving. If you have to send back to client to save, as in this case, then the below should work for you.
I have never done anything with concern over the que length. I suppose you could have some kind of check in the mySavefunc, or just control the size by limiting the number of pool workers if out of memory is an issue.
saveQue = parallel.pool.DataQueue;
files = dir('*.csv'); % assuming some pattern you know
fn = fullfile({files.folder},{files.name});
afterEach(@mySavefunc, saveQue);
parfor k = 1:numel(fn)
% load it in a worker
rawData = myLoadfun(fn{k});
% do the work
processedData = myProcessfun(rawData);
% send back to client
send(saveQue,processedData);
end
1 Comment
Categories
Find more on Startup and Shutdown in Help Center and File Exchange
Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!