Error with multiprocessing and callbacks

4 views (last 30 days)
Zachary
Zachary on 12 Sep 2023
Answered: Alan on 3 Oct 2023
Hello,
I have a somewhat complex program that I have been attempting to debug but cannot seem to find the source of this error. I am collecting data from several IMU sensors in real-time using a callback function whenever data is available. When certain conditions are met, I process a chunk of data at a time; this chunk processing involves several integrations which was causing issues with timing and collecting data packets, so I moved the chunk processing to a separate thread. This part all works.
Where the program breaks down is when I try to graph data after processing the chunk. I have included excerpts of the code below, I tried to trim out all of the extraneous code and only include what seems to be relevant to the problem based on my testing.
Top-level function below. This starts the IMUs, creates the figure for plotting data, and registers the callback function for when data is available. This all worked fine before I switched to multiprocessing, and also works fine if I comment out the line to create the figure.
function [dataPlot1] = mainMTwRTdataViewerCOPY
close all
%% Launching activex server
%setting up IMU devices for data collection
% create figure for showing data
[t, dataPlot1, dataPlot2, linePlot1, linePlot2] = createFigForDisplay;
%setting up variables for data
%starting IMU data collection
% register onLiveDataAvailable event
h.registerevent({'onLiveDataAvailable',@handleData});
h.setCallbackOption(h.XsComCallbackOptions_XSC_LivePacket, h.XsComCallbackOptions_XSC_None);
input('\n Calibration started. \n');
stopAll;
Callback function below. This stores the incoming data in a time series and checks the values to see if the conditions exist to process a chunk of data. If the conditions are right, it first fetches the output of the previous chunk and stores that in a timeseries (this is where the error occurs). It then starts the next chunk processing.
function handleData(varargin)
% callback function for event: onLiveDataAvailable
dataPacket = varargin{3}{2};
deviceFound = varargin{3}{1};
iDev = find(cellfun(@(x) x==deviceFound, devicesUsed));
%if data packed exists and has appropriate fields, record the data into the device array
if dataPacket && ~isempty(iDev)
%store data from incoming data packet
% collect 5 seconds of stationary data for calibration
if packetCounter(iDev) == selectedUpdateRate*5
%process calibration data, then process first chunk
fevalFuture{iDev} = parfeval(backgroundPool,@chunkProcess,2,seconds(time{iDev}(zupt_index{iDev}(1):zupt_index{iDev}(2))-time{iDev}(1)),accTimeSer{iDev}(:,zupt_index{iDev}(1):zupt_index{iDev}(2)),zuptTimeSer{iDev}(zupt_index{iDev}(1):zupt_index{iDev}(2)),[0;0;0]);
end
%after 5 seconds, start real-time processing
if packetCounter(iDev) > selectedUpdateRate*5
%process packet data, check for conditions to process chunk
if ... %if conditions are right to process a chunk...
if strcmp(fevalFuture{iDev}.State, 'finished') %first get data from first processed chunk
[samp_vel,samp_pos] = fetchOutputs(fevalFuture{iDev});
vel{iDev} = [vel{iDev} samp_vel];
pos{iDev} = [pos{iDev} samp_pos];
% draw
dataPlot1{iDev} = pos{iDev}(max(1,end-2000):end);
dataPlot2{iDev} = oriTimeSer{iDev}(max(1,end-2000):end);
t{iDev} = seconds(time{iDev}(max(1,end-2000):end)-time{iDev}(1));
% plot last ~2000 data points (~15s @ 120Hz)
% set(get(linePlot1{iDev}(1),'parent'),'xlim',[t{iDev}(1) t{iDev}(end)]);
% for i=1:3
% set(linePlot1{iDev}(i),'xData',t{iDev},'ydata',dataPlot1{iDev}(i,:));
% set(linePlot2{iDev}(i),'xData',t{iDev},'ydata',dataPlot2{iDev}(i,:));
% end
% drawnow % need to call this to make sure plot is constantly updating
else
error('Data chunk not finished processing')
end
%then start next chunk processing
fevalFuture{iDev} = parfeval(backgroundPool,@chunkProcess,2,seconds(time{iDev}(zupt_index{iDev}(1):zupt_index{iDev}(2))-time{iDev}(1)),accTimeSer{iDev}(:,zupt_index{iDev}(1):zupt_index{iDev}(2)),zuptTimeSer{iDev}(zupt_index{iDev}(1):zupt_index{iDev}(2)),pos{iDev}(:,end));
end
end
h.dataPacketHandled(deviceFound, dataPacket);
end
end
The function that creates the figure is below. The error only occurs when the lines setting linePlot1{iDev} = ... and linePlot2{iDev} = ... are uncommented. If I comment out those lines, the code works, but obviously I am not able to plot anything.
function [t, dataPlot1, dataPlot2, linePlot1, linePlot2] = createFigForDisplay
[dataPlot1{1:4}] = deal([]);
[dataPlot2{1:4}] = deal([]);
[linePlot1{1:4}] = deal([]);
[linePlot2{1:4}] = deal([]);
[t{1:4}] = deal([]);
figure('name','Joint pos and orientation')
hold on;
iPlot = 0;
loc = {'Pelvis','Thigh','Shank','Foot'};
for iDev = 1:4
iPlot = iPlot+1;
ax = subplot(2,2,iPlot);
yyaxis left
linePlot1{iDev} = plot(ax, 0,[NaN NaN NaN]);
ylabel('pos (m)')
yyaxis right
linePlot2{iDev} = plot(ax, 0,[NaN NaN NaN]);
ylabel('euler (deg)')
title([loc{iDev} ' Data']), xlabel('seconds')
legend(ax,'Dx','Dy','Dz','Rx','Ry','Rz');
end
end
The function that gets called in parallel to process the chunk, chunkProcess, has every single line commented out, but still produces an error. I put a breakpoint on the line that fetches the output from the parallel process and looked at the FevalFuture object. The following error is contained in the struct:
K>> fevalFuture{iDev}.Error
ans =
ParallelException with properties:
identifier: 'parallel:threadpool:DisallowedClass'
message: 'Use of class or function Line is not supported on a thread-based worker.'
cause: {}
remotecause: {[1×1 MException]}
stack: [0×1 struct]
Correction: []
Like I mentioned above, I commented out all of the code in the function that is getting called in a separate thread and still see this error. Is there something I am missing, or is it just not possible to have a figure open and also run parallel threads for some reason? I am very inexperinced at parallel computing so it's also possible that I made some basic oversight.
Thanks ahead of time for any insight, and if there is any removed portions of the code you would like to see please let me know

Answers (1)

Alan
Alan on 3 Oct 2023
I understand that you are trying to process data from sensor and plot it, and you are facing an error while trying to parallelize the data processing and plotting.
The “plot()” function that you are using to plot the data returns an object of type “matlab.graphics.chart.primitive.Line” which is unfortunately not supported by thread-based environments. For a list of compatible functions, please check out the following documentation: https://www.mathworks.com/help/matlab/referencelist.html?type=function&capability=threadsupport
While thread-based environments are better from a memory-usage standpoint, process-based environments provide support for a wider range of functions. Please check out the following guidelines while selecting between the different environments for parallelization: https://www.mathworks.com/help/parallel-computing/choose-between-thread-based-and-process-based-environments.html

Products


Release

R2023a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!