package com.mathworks.toolbox.distcomp.pmode.parfor;

import com.mathworks.mvm.exec.FutureFevalResult;
import com.mathworks.mvm.exec.MatlabFevalRequest;
import com.mathworks.toolbox.distcomp.pmode.SendingWriterManager;
import com.mathworks.toolbox.distcomp.pmode.SessionConstants;
import com.mathworks.toolbox.distcomp.pmode.SessionProfilingListener;
import com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup;
import com.mathworks.toolbox.distcomp.pmode.shared.Instance;
import com.mathworks.toolbox.distcomp.util.ByteBufferHandle;
import com.mathworks.toolbox.distcomp.util.FutureWaiter;
import com.mathworks.toolbox.distcomp.util.MatlabRefStore;
import com.mathworks.toolbox.parallel.pctutil.concurrent.NamedThreadFactory;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/parfor/ParforExecutorImpl.class */
public class ParforExecutorImpl implements ParforExecutor {
    private static final String CLASS;
    private static long sSeparateCompletedMessageThreshold;
    private Future<?> fIntervalProcessing;
    private static final long INVALID_LOOP_ID = -1;
    private final CommunicationGroup fCommGroup;
    private final SessionProfilingListener fProfileListener;
    private final SendingWriterManager fSendingWriterManager;
    static final /* synthetic */ boolean $assertionsDisabled;
    private State fState = State.IDLE;
    private long fLoopId = -1;
    private long fNextValidLoopId = -1;
    private final BlockingQueue<ParforInterval> fOutstandingIntervals = new LinkedBlockingQueue();
    private final Queue<ParforBroadcastDataMessage> fBroadcastData = new ArrayDeque();
    private final ReentrantLock fLock = new ReentrantLock();
    private final ExecutorService fExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.createDaemonThreadFactory(CLASS + " fExecutor-", Log.LOGGER));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/parfor/ParforExecutorImpl$IntervalProcessingInitialAction.class */
    public enum IntervalProcessingInitialAction {
        SEND_BROADCAST_DATA_TO_MATLAB,
        TAKE_NO_ACTION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/parfor/ParforExecutorImpl$ParforExecutionCancelledException.class */
    public static final class ParforExecutionCancelledException extends Exception {
        private static final long serialVersionUID = 2654406696298487909L;

        private ParforExecutionCancelledException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/parfor/ParforExecutorImpl$ParforInterval.class */
    public static final class ParforInterval {
        private final Instance fSrc;
        private final ParforIntervalMessage fMessage;

        private ParforInterval(ParforIntervalMessage parforIntervalMessage, Instance instance) {
            this.fSrc = instance;
            this.fMessage = parforIntervalMessage;
        }

        Instance getSource() {
            return this.fSrc;
        }

        long getMessageSequenceNumber() {
            return this.fMessage.getSequenceNumber();
        }

        long getParforID() {
            return this.fMessage.getParforID();
        }

        int getTag() {
            return this.fMessage.getTag();
        }

        Object[] getArgsForMATLAB() {
            Object[] allData = this.fMessage.getAllData();
            Object[] objArr = new Object[3];
            objArr[0] = allData[0];
            objArr[1] = allData[1];
            objArr[2] = Boolean.valueOf(getIntervalType() == IntervalType.FINAL);
            return objArr;
        }

        public String toString() {
            return "ParforInterval[parforID=" + this.fMessage.getParforID() + ", tag=" + this.fMessage.getTag() + ", sequence=" + this.fMessage.getSequenceNumber() + "]";
        }

        IntervalType getIntervalType() {
            return this.fMessage.getIntervalType();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/parfor/ParforExecutorImpl$State.class */
    public enum State {
        IDLE,
        RECEIVING_BROADCAST,
        EXECUTING_INTERVALS
    }

    private static void logAndAssert(boolean z, String str) {
        if (z) {
            return;
        }
        Log.LOGGER.warning(CLASS + "assertion about to fail: " + str);
        if (!$assertionsDisabled) {
            throw new AssertionError(str);
        }
    }

    public static long setSeparateCompletedMessageThreshold(long j) {
        long j2 = sSeparateCompletedMessageThreshold;
        sSeparateCompletedMessageThreshold = j;
        return j2;
    }

    public ParforExecutorImpl(CommunicationGroup communicationGroup, SessionProfilingListener sessionProfilingListener) {
        this.fCommGroup = communicationGroup;
        this.fProfileListener = sessionProfilingListener;
        this.fSendingWriterManager = new SendingWriterManager(this.fCommGroup);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.Dispatcher
    public void dispatch(ParforMessage parforMessage, Instance instance) {
        Log.LOGGER.fine(CLASS + " dispatching: " + parforMessage + " from state: " + this.fState + " for loop ID: " + this.fLoopId);
        if (parforMessage instanceof ParforBroadcastDataMessage) {
            handleBroadcastData((ParforBroadcastDataMessage) parforMessage, instance);
            return;
        }
        if (parforMessage instanceof ParforIntervalMessage) {
            handleInterval((ParforIntervalMessage) parforMessage, instance);
        } else if (parforMessage instanceof ParforInterruptMessage) {
            handleInterrupt((ParforInterruptMessage) parforMessage, instance);
        } else {
            Log.LOGGER.warning(CLASS + " failed to dispatch.");
        }
    }

    private List<ParforInterval> interruptAndCleanup() {
        if (!$assertionsDisabled && !this.fLock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        ArrayList arrayList = new ArrayList();
        if (this.fState == State.EXECUTING_INTERVALS || this.fState == State.RECEIVING_BROADCAST) {
            Log.LOGGER.info(CLASS + " actually processing interrupt for loop ID " + this.fLoopId);
            this.fBroadcastData.clear();
            this.fOutstandingIntervals.drainTo(arrayList);
            this.fIntervalProcessing.cancel(true);
        } else {
            Log.LOGGER.info(CLASS + " no action required for interrupt for loop ID " + this.fLoopId + " because executor in state " + this.fState);
        }
        return arrayList;
    }

    private void handleInterrupt(ParforInterruptMessage parforInterruptMessage, Instance instance) {
        long parforID = parforInterruptMessage.getParforID();
        ArrayList arrayList = new ArrayList();
        this.fLock.lock();
        try {
            if (this.fLoopId == parforID) {
                Log.LOGGER.fine(CLASS + ".handleInterrupt() actually interrupting loop ID: " + parforID);
                arrayList.addAll(interruptAndCleanup());
            } else {
                Log.LOGGER.fine(CLASS + ".handleInterrupt() ignoring interrupt for loop ID: " + parforID);
            }
            if (parforID >= this.fNextValidLoopId) {
                Log.LOGGER.fine(CLASS + ".handleInterrupt() about to reset state after loop ID: " + parforID);
                this.fState = State.IDLE;
                this.fLoopId = -1L;
                this.fNextValidLoopId = Math.max(1 + parforID, this.fNextValidLoopId);
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                returnResultOrProblem((ParforInterval) it.next(), null, new ParforExecutionCancelledException());
            }
            this.fCommGroup.returnTo(instance, new ParforInterruptAcknowledgement(parforInterruptMessage.getSequenceNumber(), parforID));
        } finally {
            this.fLock.unlock();
        }
    }

    private void moveToActiveStateFromIdle(State state, long j) {
        if (!$assertionsDisabled && !this.fLock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        logAndAssert(this.fState == State.IDLE, "Expected state to be idle when moving to active state");
        logAndAssert(j >= this.fNextValidLoopId, "loopId " + j + " expected to be >= " + this.fNextValidLoopId + " when moving to active state");
        this.fState = state;
        this.fLoopId = j;
        this.fNextValidLoopId = j;
    }

    private void handleInterval(ParforIntervalMessage parforIntervalMessage, Instance instance) {
        long parforID = parforIntervalMessage.getParforID();
        this.fLock.lock();
        try {
            if (parforID != this.fLoopId && parforID >= this.fNextValidLoopId) {
                moveToActiveStateFromIdle(State.EXECUTING_INTERVALS, parforID);
                scheduleIntervalProcessing(this.fLoopId, IntervalProcessingInitialAction.TAKE_NO_ACTION);
            }
            ParforInterval parforInterval = new ParforInterval(parforIntervalMessage, instance);
            if (parforID == this.fLoopId) {
                this.fOutstandingIntervals.add(parforInterval);
            } else {
                Log.LOGGER.warning(CLASS + " ignoring interval and sending dummy response for loop ID: " + parforID + " because executing loop ID: " + this.fLoopId);
                returnResultOrProblem(parforInterval, null, new Exception("Ignoring interval for loop ID: " + parforID + " because executing loop ID: " + this.fLoopId));
            }
        } finally {
            this.fLock.unlock();
        }
    }

    private void scheduleIntervalProcessing(final long j, final IntervalProcessingInitialAction intervalProcessingInitialAction) {
        if (!$assertionsDisabled && !this.fLock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        final Object broadcastDataArg = intervalProcessingInitialAction == IntervalProcessingInitialAction.SEND_BROADCAST_DATA_TO_MATLAB ? getBroadcastDataArg() : null;
        Log.LOGGER.info(CLASS + " scheduling interval processing for loop ID: " + j);
        this.fIntervalProcessing = this.fExecutor.submit(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.parfor.ParforExecutorImpl.1
            @Override // java.lang.Runnable
            public void run() {
                Log.LOGGER.fine(ParforExecutorImpl.CLASS + " About to process intervals for loop ID: " + j);
                try {
                    try {
                        try {
                            if (intervalProcessingInitialAction == IntervalProcessingInitialAction.SEND_BROADCAST_DATA_TO_MATLAB) {
                                Log.LOGGER.fine(ParforExecutorImpl.CLASS + " About to send broadcast data into MATLAB for loop ID: " + j);
                                FutureWaiter.WaitResult sendInitDataToMatlab = ParforExecutorImpl.this.sendInitDataToMatlab(broadcastDataArg);
                                if (!sendInitDataToMatlab.isSuccessful()) {
                                    Log.LOGGER.log(DistcompLevel.ONE, ParforExecutorImpl.CLASS + " Failed to send broadcast data into MATLAB for loop ID: " + j, (Throwable) sendInitDataToMatlab.getCaughtException());
                                    if (sendInitDataToMatlab.isInterrupted()) {
                                        Log.LOGGER.info(ParforExecutorImpl.CLASS + " Finished processing intervals for loop ID: " + j + ". Move to idle? false");
                                        if (0 != 0) {
                                            ParforExecutorImpl.this.moveToIdleState();
                                            return;
                                        }
                                        return;
                                    }
                                }
                            }
                            boolean processIntervals = ParforExecutorImpl.this.processIntervals(j);
                            Log.LOGGER.fine(ParforExecutorImpl.CLASS + " finished processIntervals(" + j + ") with result allOK: " + processIntervals);
                            Log.LOGGER.info(ParforExecutorImpl.CLASS + " Finished processing intervals for loop ID: " + j + ". Move to idle? " + processIntervals);
                            if (processIntervals) {
                                ParforExecutorImpl.this.moveToIdleState();
                            }
                        } catch (Throwable th) {
                            Log.LOGGER.log(DistcompLevel.ONE, ParforExecutorImpl.CLASS + " caught exception while processing for loop ID: " + j, th);
                            Log.LOGGER.info(ParforExecutorImpl.CLASS + " Finished processing intervals for loop ID: " + j + ". Move to idle? false");
                            if (0 != 0) {
                                ParforExecutorImpl.this.moveToIdleState();
                            }
                        }
                    } catch (InterruptedException e) {
                        Log.LOGGER.log(DistcompLevel.ONE, ParforExecutorImpl.CLASS + " caught InterruptedException while processing for loop ID: " + j, (Throwable) e);
                        Thread.currentThread().interrupt();
                        Log.LOGGER.info(ParforExecutorImpl.CLASS + " Finished processing intervals for loop ID: " + j + ". Move to idle? false");
                        if (0 != 0) {
                            ParforExecutorImpl.this.moveToIdleState();
                        }
                    }
                } catch (Throwable th2) {
                    Log.LOGGER.info(ParforExecutorImpl.CLASS + " Finished processing intervals for loop ID: " + j + ". Move to idle? false");
                    if (0 != 0) {
                        ParforExecutorImpl.this.moveToIdleState();
                    }
                    throw th2;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean processIntervals(long j) throws InterruptedException {
        boolean z = false;
        boolean z2 = true;
        while (!z) {
            ParforInterval poll = this.fOutstandingIntervals.poll(5L, TimeUnit.SECONDS);
            if (poll == null) {
                Log.LOGGER.finest(CLASS + " no interval received for loop ID: " + j);
            } else {
                logAndAssert(poll.getParforID() == j, "Incorrect loop ID! Got: " + poll.getParforID() + ", expecting: " + j);
                z = poll.getIntervalType() == IntervalType.FINAL;
                Log.LOGGER.finer(CLASS + " processing interval for loop ID: " + j + ". FINAL? " + z);
                if (!executeIntervalSynchronously(poll)) {
                    Log.LOGGER.info(CLASS + ".executeIntervalSynchronously() returned false , so aborting processIntervals() loop for loop ID: " + j);
                    z = true;
                    z2 = false;
                }
            }
        }
        return z2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void moveToIdleState() {
        this.fLock.lock();
        try {
            Log.LOGGER.fine(CLASS + ".moveToIdleState() from state: " + this.fState);
            logAndAssert(this.fState != State.IDLE, "moveToIdleState() called when already IDLE");
            this.fState = State.IDLE;
            this.fNextValidLoopId = Math.max(this.fNextValidLoopId, 1 + this.fLoopId);
            this.fLoopId = -1L;
            Log.LOGGER.fine(CLASS + ".moveToIdleState() set fNextValidLoopId to: " + this.fNextValidLoopId);
        } finally {
            this.fLock.unlock();
        }
    }

    private Object getBroadcastDataArg() {
        ArrayList arrayList = new ArrayList();
        this.fLock.lock();
        try {
            arrayList.addAll(this.fBroadcastData);
            this.fBroadcastData.clear();
            if (arrayList.isEmpty()) {
                Log.LOGGER.fine(CLASS + " getBroadcastDataArg - no broadcast data.");
                return null;
            }
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                for (Object obj : ((ParforBroadcastDataMessage) it.next()).getAllData()) {
                    if (!$assertionsDisabled && !(obj instanceof ByteBufferHandle)) {
                        throw new AssertionError();
                    }
                    arrayList2.add((ByteBufferHandle) obj);
                }
            }
            Log.LOGGER.fine(CLASS + " getBroadcastDataArg - " + arrayList2.size() + " buffers");
            return arrayList2.toArray(new ByteBufferHandle[arrayList2.size()]);
        } finally {
            this.fLock.unlock();
        }
    }

    private long notifyFevalStarting(String str) {
        long nextMatlabInvocationId = MatlabRefStore.getNextMatlabInvocationId();
        if (this.fProfileListener != null) {
            this.fProfileListener.matlabEvent(SessionProfilingListener.MatlabEventType.FEVAL_STARTED, nextMatlabInvocationId, str);
        }
        return nextMatlabInvocationId;
    }

    private void notifyFevalCompleted(String str, long j) {
        if (this.fProfileListener != null) {
            this.fProfileListener.matlabEvent(SessionProfilingListener.MatlabEventType.FEVAL_COMPLETED, j, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public FutureWaiter.WaitResult<Object> sendInitDataToMatlab(Object obj) {
        Log.LOGGER.fine(CLASS + " about to send init data to MATLAB for loop ID: " + this.fLoopId);
        long notifyFevalStarting = notifyFevalStarting(SessionConstants.sPARFOR_FUNCTION);
        FutureFevalResult submit = MatlabRefStore.getMVMRef().getExecutor().submit(new MatlabFevalRequest(SessionConstants.sPARFOR_FUNCTION, 2, this.fSendingWriterManager.getSendingWriter(), this.fSendingWriterManager.getSendingWriter(), new Object[]{obj, null, false}));
        try {
            FutureWaiter.WaitResult<Object> waitForFuture = FutureWaiter.waitForFuture(submit);
            Log.LOGGER.fine(CLASS + " finished sending init data to MATLAB: " + waitForFuture);
            if (waitForFuture.isInterrupted()) {
                Log.LOGGER.fine(CLASS + " sending init data to MATLAB was interrupted, so cancelling the underlying MATLAB future.");
                submit.cancel(true);
            }
            return waitForFuture;
        } finally {
            this.fSendingWriterManager.disable();
            notifyFevalCompleted(SessionConstants.sPARFOR_FUNCTION, notifyFevalStarting);
        }
    }

    private boolean executeIntervalSynchronously(ParforInterval parforInterval) {
        Log.LOGGER.fine(CLASS + " about to execute interval " + parforInterval);
        this.fSendingWriterManager.enable(parforInterval.getMessageSequenceNumber(), parforInterval.getSource());
        long notifyFevalStarting = notifyFevalStarting(SessionConstants.sPARFOR_FUNCTION);
        FutureFevalResult submit = MatlabRefStore.getMVMRef().getExecutor().submit(new MatlabFevalRequest(SessionConstants.sPARFOR_FUNCTION, 2, this.fSendingWriterManager.getSendingWriter(), this.fSendingWriterManager.getSendingWriter(), parforInterval.getArgsForMATLAB()));
        boolean z = false;
        Object obj = null;
        Exception exc = null;
        try {
            FutureWaiter.WaitResult waitForFuture = FutureWaiter.waitForFuture(submit);
            this.fSendingWriterManager.disable();
            Log.LOGGER.fine(CLASS + " completed execution with result: " + waitForFuture);
            if (waitForFuture.isSuccessful()) {
                obj = waitForFuture.getResult();
                z = true;
            } else {
                exc = waitForFuture.getCaughtException();
                if (waitForFuture.isInterrupted()) {
                    Log.LOGGER.fine(CLASS + " execution was interrupted, so cancelling the underlying MATLAB future.");
                    submit.cancel(true);
                }
            }
            this.fSendingWriterManager.disable();
            notifyFevalCompleted(SessionConstants.sPARFOR_FUNCTION, notifyFevalStarting);
        } catch (Exception e) {
            exc = e;
            this.fSendingWriterManager.disable();
            notifyFevalCompleted(SessionConstants.sPARFOR_FUNCTION, notifyFevalStarting);
        } catch (Throwable th) {
            this.fSendingWriterManager.disable();
            notifyFevalCompleted(SessionConstants.sPARFOR_FUNCTION, notifyFevalStarting);
            throw th;
        }
        returnResultOrProblem(parforInterval, obj, exc);
        return z;
    }

    private long estimateResultMessageSize(Object obj) {
        long j = -1;
        if (obj instanceof Object[]) {
            Object[] objArr = (Object[]) obj;
            if (objArr.length >= 2 && (objArr[1] instanceof ByteBufferHandle[])) {
                j = 0;
                for (ByteBufferHandle byteBufferHandle : (ByteBufferHandle[]) objArr[1]) {
                    if (byteBufferHandle.get() != null) {
                        j += r0.capacity();
                    }
                }
            }
        } else if (obj == null) {
            j = 0;
        }
        if ($assertionsDisabled || j != -1) {
            return j;
        }
        throw new AssertionError("Couldn't calculate message size for result");
    }

    private void returnResultOrProblem(ParforInterval parforInterval, Object obj, Exception exc) {
        long estimateResultMessageSize = estimateResultMessageSize(obj);
        boolean z = estimateResultMessageSize >= sSeparateCompletedMessageThreshold;
        Log.LOGGER.finer(CLASS + ".returnResultOrProblem estimated return bytes as: " + estimateResultMessageSize + " sending separate 'complete' message? " + z);
        if (z) {
            ParforIntervalCompleteMessage parforIntervalCompleteMessage = new ParforIntervalCompleteMessage(parforInterval.getParforID(), parforInterval.getMessageSequenceNumber(), exc);
            this.fCommGroup.returnTo(parforInterval.getSource(), parforIntervalCompleteMessage);
            parforIntervalCompleteMessage.dispose();
        }
        ParforIntervalResultMessage parforIntervalResultMessage = new ParforIntervalResultMessage(parforInterval.getParforID(), parforInterval.getTag(), z, 2, obj, exc, parforInterval.getMessageSequenceNumber());
        this.fCommGroup.returnTo(parforInterval.getSource(), parforIntervalResultMessage);
        parforIntervalResultMessage.dispose();
    }

    private void handleBroadcastData(ParforBroadcastDataMessage parforBroadcastDataMessage, Instance instance) {
        long parforID = parforBroadcastDataMessage.getParforID();
        this.fLock.lock();
        try {
            if (parforID != this.fLoopId && parforID >= this.fNextValidLoopId) {
                moveToActiveStateFromIdle(State.RECEIVING_BROADCAST, parforID);
            }
            if (parforID == this.fLoopId) {
                logAndAssert(this.fState == State.RECEIVING_BROADCAST, "handleBroadcastData() expected to be in state RECEIVING_BROADCAST, not " + this.fState);
                this.fBroadcastData.add(parforBroadcastDataMessage);
                if (parforBroadcastDataMessage.isFinalBroadcastMessage()) {
                    Log.LOGGER.fine(CLASS + " got final broadcast message for loop ID: " + parforID);
                    this.fState = State.EXECUTING_INTERVALS;
                    scheduleIntervalProcessing(this.fLoopId, IntervalProcessingInitialAction.SEND_BROADCAST_DATA_TO_MATLAB);
                } else {
                    Log.LOGGER.fine(CLASS + " got non-final broadcast message for loop ID: " + parforID);
                }
            } else {
                Log.LOGGER.warning(CLASS + " ignoring broadcast message for loop ID: " + parforID + " because expecting messages for loop ID: " + this.fLoopId);
            }
            this.fCommGroup.returnTo(instance, new ParforBroadcastResultMessage(parforID, parforBroadcastDataMessage.getSequenceNumber()));
            this.fLock.unlock();
        } catch (Throwable th) {
            this.fLock.unlock();
            throw th;
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.DispatchDefinition
    public Class<ParforMessage> getRootMessageClass() {
        return ParforMessage.class;
    }

    static {
        $assertionsDisabled = !ParforExecutorImpl.class.desiredAssertionStatus();
        CLASS = ParforExecutorImpl.class.getSimpleName();
        sSeparateCompletedMessageThreshold = 1048576L;
    }
}
