package com.mathworks.toolbox.distcomp.pmode.io;

import com.mathworks.toolbox.distcomp.pmode.PackageInfo;
import com.mathworks.toolbox.distcomp.pmode.SessionProfilingListener;
import com.mathworks.toolbox.distcomp.pmode.io.RateLimiterSenderBlocker;
import com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel;
import com.mathworks.toolbox.distcomp.pmode.peermessaging.ReconnectionFailedException;
import com.mathworks.toolbox.distcomp.pmode.peermessaging.Reconnector;
import com.mathworks.toolbox.distcomp.pmode.shared.BufferTransferrable;
import com.mathworks.toolbox.distcomp.pmode.shared.ChannelDispatcher;
import com.mathworks.toolbox.distcomp.pmode.shared.Connection;
import com.mathworks.toolbox.distcomp.pmode.shared.Instance;
import com.mathworks.toolbox.distcomp.pmode.shared.Message;
import com.mathworks.toolbox.distcomp.pmode.shared.MessageInfo;
import com.mathworks.toolbox.distcomp.pmode.shared.PmodeSerializable;
import com.mathworks.toolbox.distcomp.util.concurrent.SequentialExecutor;
import com.mathworks.toolbox.parallel.pctutil.concurrent.NamedThreadFactory;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import java.io.EOFException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Selector;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/ReliableTransmissionChannel.class */
public class ReliableTransmissionChannel implements TransmissionChannel {
    private final long fHeartbeatIntervalMillis;
    private final long fReconnectionCheckIntervalMillis;
    private final long fResendCheckIntervalMillis;
    private volatile ScheduledExecutorService fReliabilityMaintenanceScheduledExecutorService;
    private final SentButUnacknowledgedBuffer fSentButUnacknowledged;
    private final ReliableFlowController fFlowController;
    private final Reconnector fReconnector;
    private final UnreliableTransmissionChannel fUnreliableTransmissionChannel;
    private final SessionProfilingListener fSessionProfilingListener;
    private final String fLogPrefix;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean fCaughtExceptionDuringSelect = false;
    private boolean fCaughtEOFException = false;
    private volatile boolean fShouldReRegisterFlag = false;
    private boolean fReconnectionHasFailed = false;
    private boolean fHasReceivedRecentlySinceLastReconnection = true;
    private final Lock fReconnectingLock = new ReentrantLock();
    private long fSequenceNumberForSending = 0;
    private final Object fEnqueueLock = new Object();
    private volatile long fExpectedSequenceNumber = 0;
    private final Object fExpectedSequenceNumberLock = new Object();
    private final RateLimiterListener fRateLimiterListener = new ControlSenderRateLimiterListener();
    private final AtomicLong fLastReconnectionTime = new AtomicLong(System.currentTimeMillis());
    private final AtomicBoolean fShouldAddWriteInterestOp = new AtomicBoolean(false);
    private boolean fShouldResendOld = false;

    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/ReliableTransmissionChannel$ControlSenderRateLimiterListener.class */
    private class ControlSenderRateLimiterListener implements RateLimiterListener {
        private ControlSenderRateLimiterListener() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.RateLimiterListener
        public void onGoingOverLimit() {
            sendRateLimiterMessage(true);
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.RateLimiterListener
        public void onGoingBackUnderLimit() {
            sendRateLimiterMessage(false);
        }

        private void sendRateLimiterMessage(boolean z) {
            try {
                RateLimiterMessage rateLimiterMessage = new RateLimiterMessage(z);
                UnreliableTransmissionChannel.RateLimiterBypassingMessageHolder rateLimiterBypassingMessageHolder = new UnreliableTransmissionChannel.RateLimiterBypassingMessageHolder(Messages.createControlHeaderPayloadForSend(rateLimiterMessage));
                if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                    Log.LOGGER.log(DistcompLevel.TWO, ReliableTransmissionChannel.this.fLogPrefix + "Enqueueing: " + rateLimiterMessage);
                }
                ReliableTransmissionChannel.this.fUnreliableTransmissionChannel.enqueueRateLimiterMessage(rateLimiterBypassingMessageHolder);
            } catch (IOException e) {
                Log.LOGGER.log(DistcompLevel.ONE, "Caught exception while trying to send rate limiter message: " + e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/ReliableTransmissionChannel$ReliableFlowController.class */
    public static class ReliableFlowController implements NetworkFlowController {
        private final RateLimiterSenderBlocker iRateLimiterSenderBlocker;

        private ReliableFlowController(int i) {
            this.iRateLimiterSenderBlocker = new RateLimiterSenderBlocker(new RateLimiterSenderBlocker.RandomBackoffTimeoutGenerator(500L), i);
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.NetworkFlowController
        public void blockUntilIsAllowedToSend(UnreliableTransmissionChannel.MessageHolder messageHolder) throws InterruptedException {
            this.iRateLimiterSenderBlocker.waitForCanSend();
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.NetworkFlowController
        public boolean isAllowedToReceive() {
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void update(boolean z) {
            this.iRateLimiterSenderBlocker.update(z);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isBlocked() {
            return this.iRateLimiterSenderBlocker.isBlocked();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetOnReconnection() {
            this.iRateLimiterSenderBlocker.resetOnReconnection();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/ReliableTransmissionChannel$ReliableMessageHandler.class */
    public class ReliableMessageHandler implements MessageHandler {
        static final /* synthetic */ boolean $assertionsDisabled;

        private ReliableMessageHandler() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.MessageHandler
        public void handleControlMessage(PmodeSerializable pmodeSerializable) {
            if (pmodeSerializable instanceof AckMessage) {
                ReliableTransmissionChannel.this.fSentButUnacknowledged.dropAcknowledged(((AckMessage) pmodeSerializable).getSequenceNumber());
            } else if (pmodeSerializable instanceof RateLimiterMessage) {
                ReliableTransmissionChannel.this.fFlowController.update(((RateLimiterMessage) pmodeSerializable).shouldPauseSending());
            }
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, ReliableTransmissionChannel.this.fLogPrefix + "Got:" + pmodeSerializable + " from: " + ReliableTransmissionChannel.this.fUnreliableTransmissionChannel.getRemoteProcess());
            }
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.MessageHandler
        public void handleMessage(PmodeSerializable pmodeSerializable, ChannelDispatcher<Message> channelDispatcher, long j) {
            if (pmodeSerializable instanceof AckableMessage) {
                ReliableTransmissionChannel.this.sendAcknowledgementAndDispatch((AckableMessage) pmodeSerializable, channelDispatcher, j);
                return;
            }
            if ((pmodeSerializable instanceof HeartbeatMessage) || (pmodeSerializable instanceof AckMessage) || (pmodeSerializable instanceof RateLimiterMessage)) {
                Log.LOGGER.log(DistcompLevel.ONE, ReliableTransmissionChannel.this.fLogPrefix + "Got control message in handleMessage. This should never happen" + pmodeSerializable.getClass().getName());
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
            } else {
                if (!(pmodeSerializable instanceof Message)) {
                    Log.LOGGER.log(DistcompLevel.ONE, ReliableTransmissionChannel.this.fLogPrefix + "Received a message that was NOT and instanceof Message. This was of class " + pmodeSerializable.getClass().getName());
                    return;
                }
                Log.LOGGER.log(DistcompLevel.ONE, ReliableTransmissionChannel.this.fLogPrefix + "Got plain message in reliable mode. This should never happen");
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
            }
        }

        static {
            $assertionsDisabled = !ReliableTransmissionChannel.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReliableTransmissionChannel(UnreliableTransmissionChannel unreliableTransmissionChannel, Reconnector reconnector, long j, long j2, long j3, SessionProfilingListener sessionProfilingListener, int i) {
        this.fLogPrefix = getClass().getSimpleName() + "-" + i + " ";
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Constructing for: " + unreliableTransmissionChannel);
        }
        this.fUnreliableTransmissionChannel = unreliableTransmissionChannel;
        this.fSentButUnacknowledged = new SentButUnacknowledgedBuffer();
        this.fFlowController = new ReliableFlowController(i);
        this.fReconnector = reconnector;
        this.fResendCheckIntervalMillis = j;
        this.fHeartbeatIntervalMillis = j2;
        this.fReconnectionCheckIntervalMillis = j3;
        this.fSessionProfilingListener = sessionProfilingListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startReliabilityServices() {
        Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Connection tells us to be reliable, starting reliability services");
        this.fReliabilityMaintenanceScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(NamedThreadFactory.createDaemonThreadFactory("TransmissionChannelReliability-", PackageInfo.LOGGER));
        this.fReliabilityMaintenanceScheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.ReliableTransmissionChannel.1
            @Override // java.lang.Runnable
            public void run() {
                ReliableTransmissionChannel.this.maintain();
            }
        }, this.fReconnectionCheckIntervalMillis, this.fReconnectionCheckIntervalMillis, TimeUnit.MILLISECONDS);
        this.fReliabilityMaintenanceScheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.ReliableTransmissionChannel.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ReliableTransmissionChannel.this.sendHeartbeat();
                } catch (IOException e) {
                    Log.LOGGER.log(DistcompLevel.ONE, ReliableTransmissionChannel.this.fLogPrefix + "Exception while sending heartbeat", (Throwable) e);
                }
            }
        }, this.fHeartbeatIntervalMillis, this.fHeartbeatIntervalMillis, TimeUnit.MILLISECONDS);
        this.fReliabilityMaintenanceScheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.ReliableTransmissionChannel.3
            @Override // java.lang.Runnable
            public void run() {
                ReliableTransmissionChannel.this.resendOld();
            }
        }, this.fResendCheckIntervalMillis, this.fResendCheckIntervalMillis, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maintain() {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Checking if need to reconnect to " + this.fUnreliableTransmissionChannel.getRemoteProcess());
        }
        this.fReconnectingLock.lock();
        try {
            try {
                if (shouldReconnect()) {
                    logReliabilityEvent(SessionProfilingListener.ReliabilityEventType.RECONNECTION_STARTED, -1L);
                    reconnect();
                    logReliabilityEvent(SessionProfilingListener.ReliabilityEventType.RECONNECTION_COMPLETED, -1L);
                    this.fShouldResendOld = true;
                    resendAll();
                }
                this.fReconnectingLock.unlock();
            } catch (ReconnectionFailedException e) {
                this.fReconnectionHasFailed = true;
                logReliabilityEvent(SessionProfilingListener.ReliabilityEventType.RECONNECTION_FAILED, -1L);
                this.fReconnectingLock.unlock();
            } catch (Throwable th) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Throwable while checking for reconnection or during reconnection. This may cause the reconnection to fail and/or prevent future reconnections from happening: ", th);
                throw th;
            }
        } catch (Throwable th2) {
            this.fReconnectingLock.unlock();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartbeat() throws IOException {
        this.fUnreliableTransmissionChannel.enqueueHeartbeat(new UnreliableTransmissionChannel.RateLimiterBypassingMessageHolder(Messages.createControlHeaderPayloadForSend(new HeartbeatMessage())));
    }

    private boolean shouldReconnect() throws ReconnectionFailedException {
        if (this.fReconnectionHasFailed) {
            Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Should not reconnect, because earlier attempt failed");
            return false;
        }
        boolean andResetHasReceived = this.fUnreliableTransmissionChannel.getAndResetHasReceived();
        this.fHasReceivedRecentlySinceLastReconnection = this.fHasReceivedRecentlySinceLastReconnection || andResetHasReceived;
        boolean z = System.currentTimeMillis() - this.fLastReconnectionTime.get() < this.fReconnectionCheckIntervalMillis * (this.fUnreliableTransmissionChannel.getHasEverReceived() ? 3L : 6L);
        boolean z2 = andResetHasReceived || z;
        boolean z3 = this.fCaughtExceptionDuringSelect;
        boolean z4 = this.fCaughtEOFException;
        boolean z5 = (!z2 || z3) && !z4;
        if (z5 && !z3 && !this.fHasReceivedRecentlySinceLastReconnection) {
            if (Log.LOGGER.isLoggable(DistcompLevel.ONE)) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Should not reconnect, and will close connection because we have not received anything singe last reconnection, probably because the previous reconnection was not detected on the remote host : " + getLogString(false, false, false, false));
            }
            this.fUnreliableTransmissionChannel.cancelSelectionKey();
            this.fReliabilityMaintenanceScheduledExecutorService.shutdown();
            throw new ReconnectionFailedException();
        }
        if (z5) {
            if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Should reconnect because " + getLogString(andResetHasReceived, z, z3, false) + " the broken connection is: " + this.fUnreliableTransmissionChannel.getConnectionString());
            }
        } else if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Should not reconnect because " + getLogString(andResetHasReceived, z, z3, z4));
        }
        return z5;
    }

    private String getLogString(boolean z, boolean z2, boolean z3, boolean z4) {
        return "hasReceivedRecently: " + z + " hasReconnectedRecently: " + z2 + " hasCaughtException: " + z3 + " hasCaughtEOFException: " + z4 + " hasReceivedRecentlySinceLastReconnection: " + this.fHasReceivedRecentlySinceLastReconnection;
    }

    private void resend(List<AckableMessage> list) {
        Set<Long> messagesOnSendQueue = this.fUnreliableTransmissionChannel.getMessagesOnSendQueue();
        for (int size = list.size() - 1; size >= 0; size--) {
            AckableMessage ackableMessage = list.get(size);
            if (!messagesOnSendQueue.contains(Long.valueOf(ackableMessage.getSequenceNumber()))) {
                HeaderPayload headerPayload = null;
                try {
                    headerPayload = Messages.createHeaderPayloadForSend(ackableMessage);
                } catch (IOException e) {
                    Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Exception while enqueueing messages for resend. Notifying error handler.", (Throwable) e);
                    this.fUnreliableTransmissionChannel.writeErrorWithErrorHandler(e);
                }
                if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                    Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Enqueueing message with sequence number " + ackableMessage.getSequenceNumber() + " for resending.");
                }
                logReliabilityEvent(SessionProfilingListener.ReliabilityEventType.ENQUEUE_FOR_RESEND, ackableMessage.getSequenceNumber());
                this.fUnreliableTransmissionChannel.enqueueMessageForSending(headerPayload, ackableMessage.getSequenceNumber(), true);
            } else if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Ignoring resend of message with sequence number: " + ackableMessage.getSequenceNumber() + " since it is already on the queue or recently sent");
            }
        }
        this.fShouldAddWriteInterestOp.set(true);
    }

    private void resendAll() {
        if (this.fFlowController.isBlocked()) {
            Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Rate limiter preventing resend in resendAll");
            return;
        }
        List<AckableMessage> allForResend = this.fSentButUnacknowledged.getAllForResend();
        if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
            Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Resending all " + allForResend.size() + " unacknowledged messages");
        }
        resend(allForResend);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resendOld() {
        boolean z = true;
        this.fReconnectingLock.lock();
        try {
            if (!this.fShouldResendOld) {
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Will not check for old messages or resend");
                }
                z = false;
            } else if (this.fSentButUnacknowledged.isEmpty()) {
                if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                    Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Recovered after reconnection. Disabling resending of old messages.");
                }
                this.fShouldResendOld = false;
                z = false;
            }
            if (z) {
                if (this.fFlowController.isBlocked()) {
                    Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Rate limiter preventing resend in resendOld");
                    return;
                }
                if (!this.fSentButUnacknowledged.hasOldMessages(System.currentTimeMillis() - this.fResendCheckIntervalMillis)) {
                    if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                        Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Found 0 old messages, will not resend.");
                    }
                } else {
                    List<AckableMessage> allForResend = this.fSentButUnacknowledged.getAllForResend();
                    if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                        Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Found old messages. Resending all " + allForResend.size() + " messages. ");
                    }
                    resend(allForResend);
                }
            }
        } finally {
            this.fReconnectingLock.unlock();
        }
    }

    private void reconnect() throws ReconnectionFailedException {
        this.fReconnectingLock.lock();
        try {
            try {
                if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                    Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Starting to reconnect, last received message: " + (this.fExpectedSequenceNumber - 1));
                }
                Connection reconnect = this.fReconnector.reconnect(this.fUnreliableTransmissionChannel.getReconnectionTimeLimitMillis());
                if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                    Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Reconnection succeeded, got new connection: " + reconnect);
                }
                Instance remoteProcess = this.fUnreliableTransmissionChannel.getRemoteProcess();
                if (!remoteProcess.getUuid().equals(reconnect.getRemoteInstance().getUuid())) {
                    Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Error during reconnection, new remote instance: " + reconnect.getRemoteInstance() + " is different from old: " + remoteProcess + " Cancelling selection key. DirectCommunicationGroup should close us");
                    this.fUnreliableTransmissionChannel.cancelSelectionKey();
                    this.fReliabilityMaintenanceScheduledExecutorService.shutdownNow();
                    throw new ReconnectionFailedException();
                }
                this.fUnreliableTransmissionChannel.replaceConnection(reconnect);
                this.fCaughtExceptionDuringSelect = false;
                this.fShouldReRegisterFlag = true;
                this.fLastReconnectionTime.set(System.currentTimeMillis());
                this.fHasReceivedRecentlySinceLastReconnection = false;
                this.fFlowController.resetOnReconnection();
            } catch (ReconnectionFailedException e) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Unable to reconnect. Cancelling selection key. DirectCommunicationGroup should close us.");
                this.fUnreliableTransmissionChannel.cancelSelectionKey();
                this.fReliabilityMaintenanceScheduledExecutorService.shutdown();
                throw e;
            }
        } finally {
            this.fReconnectingLock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendAcknowledgementAndDispatch(AckableMessage ackableMessage, ChannelDispatcher<Message> channelDispatcher, long j) {
        long j2;
        boolean z;
        synchronized (this.fExpectedSequenceNumberLock) {
            j2 = this.fExpectedSequenceNumber;
            if (ackableMessage.getSequenceNumber() == this.fExpectedSequenceNumber) {
                z = true;
                this.fExpectedSequenceNumber++;
            } else {
                z = false;
            }
        }
        if (!z) {
            if (Log.LOGGER.isLoggable(DistcompLevel.ONE)) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Received out-of-order message. Expecting sequence number " + j2 + " but got " + ackableMessage.getSequenceNumber());
            }
            logReliabilityEvent(SessionProfilingListener.ReliabilityEventType.RECEIVED_OUT_OF_ORDER, ackableMessage.getSequenceNumber());
            try {
                enqueueAckMessageForSending(new AckMessage(j2 - 1));
                return;
            } catch (IOException e) {
                Log.LOGGER.log(DistcompLevel.TWO, "Exception when trying to send ack for out-of-order message " + ackableMessage.getSequenceNumber() + e);
                return;
            }
        }
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Received in-order message with seq number " + j2);
        }
        try {
            enqueueAckMessageForSending(new AckMessage(ackableMessage.getSequenceNumber()));
            Message payloadMessage = ackableMessage.getPayloadMessage();
            Instance remoteProcess = this.fUnreliableTransmissionChannel.getRemoteProcess();
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "actual dispatch of " + payloadMessage + " from: " + remoteProcess + " with sequence number: " + j2);
            }
            channelDispatcher.dispatch(payloadMessage, remoteProcess, new MessageInfo(j));
        } catch (IOException e2) {
            Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Exception when trying to send acknowledgement for " + ackableMessage.getSequenceNumber() + ". Will not dispatch message, expecting it to be resent", (Throwable) e2);
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public boolean handleSelect() throws IOException, InvalidHeaderDataException {
        try {
            if (!this.fReconnectingLock.tryLock()) {
                return false;
            }
            try {
                try {
                    try {
                        if (this.fShouldReRegisterFlag) {
                            this.fReconnectingLock.unlock();
                            return false;
                        }
                        if (this.fCaughtExceptionDuringSelect) {
                            this.fReconnectingLock.unlock();
                            return false;
                        }
                        boolean handleSelect = this.fUnreliableTransmissionChannel.handleSelect();
                        this.fReconnectingLock.unlock();
                        return handleSelect;
                    } catch (InvalidHeaderDataException e) {
                        throw e;
                    }
                } catch (IOException e2) {
                    if (this.fCaughtExceptionDuringSelect) {
                        Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Caught IOException during handleSelect. Maintenance has already been scheduled");
                    } else {
                        Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Caught IOException during handleSelect, scheduling maintenance ", (Throwable) e2);
                        this.fCaughtExceptionDuringSelect = true;
                        this.fReliabilityMaintenanceScheduledExecutorService.schedule(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.ReliableTransmissionChannel.4
                            @Override // java.lang.Runnable
                            public void run() {
                                ReliableTransmissionChannel.this.maintain();
                            }
                        }, 0L, TimeUnit.SECONDS);
                    }
                    this.fReconnectingLock.unlock();
                    return false;
                }
            } catch (EOFException e3) {
                this.fCaughtEOFException = true;
                throw e3;
            }
        } catch (Throwable th) {
            this.fReconnectingLock.unlock();
            throw th;
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public Instance getRemoteProcess() {
        return this.fUnreliableTransmissionChannel.getRemoteProcess();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void close() throws IOException {
        Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Closing: stopped fReliabilityMaintenanceScheduledExecutorService, cancelled " + this.fReliabilityMaintenanceScheduledExecutorService.shutdownNow().size() + " tasks");
        this.fUnreliableTransmissionChannel.removeRateLimiterListener(this.fRateLimiterListener);
        this.fUnreliableTransmissionChannel.close();
        this.fSentButUnacknowledged.dropAllMessages();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public boolean shouldReregister() {
        return this.fShouldReRegisterFlag;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public boolean shouldAddWriteInterestOp() {
        return this.fUnreliableTransmissionChannel.shouldAddWriteInterestOp() || this.fShouldAddWriteInterestOp.getAndSet(false);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void setDispatcher(ChannelDispatcher<Message> channelDispatcher, SequentialExecutor sequentialExecutor) {
        this.fUnreliableTransmissionChannel.setDispatcher(channelDispatcher, sequentialExecutor);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void enqueueMessageForSending(Message message) throws IOException {
        synchronized (this.fEnqueueLock) {
            if (!$assertionsDisabled && (message instanceof AckMessage)) {
                throw new AssertionError("AckMessages should not be enqueued with this method!");
            }
            long j = this.fSequenceNumberForSending;
            this.fSequenceNumberForSending++;
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Enqueueing message " + message + " with seq num " + j + " ");
            }
            AckableMessage ackableBufferTransferrableMessage = message instanceof BufferTransferrable ? new AckableBufferTransferrableMessage(j, message) : new AckableMessage(j, message);
            this.fSentButUnacknowledged.add(ackableBufferTransferrableMessage);
            this.fUnreliableTransmissionChannel.enqueueMessageForSending(Messages.createHeaderPayloadForSend(ackableBufferTransferrableMessage), j, false);
        }
    }

    private void enqueueAckMessageForSending(AckMessage ackMessage) throws IOException {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Enqueueing: " + ackMessage);
        }
        this.fUnreliableTransmissionChannel.enqueueAck(new UnreliableTransmissionChannel.RateLimiterBypassingMessageHolder(Messages.createControlHeaderPayloadForSend(ackMessage)));
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public long getTotalReadBytes() {
        return this.fUnreliableTransmissionChannel.getTotalReadBytes();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public long getTotalWriteBytes() {
        return this.fUnreliableTransmissionChannel.getTotalWriteBytes();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void registerWithSelector(Selector selector, int i, Object obj) throws ClosedChannelException {
        this.fReconnectingLock.lock();
        try {
            try {
                this.fUnreliableTransmissionChannel.registerWithSelector(selector, i, obj);
                sendHeartbeat();
                addInterestOps(4);
                this.fShouldReRegisterFlag = false;
                this.fReconnectingLock.unlock();
            } catch (IOException e) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Unable to enqueue heartbeat after reconnect");
                this.fReconnectingLock.unlock();
            }
        } catch (Throwable th) {
            this.fReconnectingLock.unlock();
            throw th;
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void addInterestOps(int i) {
        this.fUnreliableTransmissionChannel.addInterestOps(i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageHandler getNewReliableMessageHandler() {
        return new ReliableMessageHandler();
    }

    private void logReliabilityEvent(SessionProfilingListener.ReliabilityEventType reliabilityEventType, long j) {
        if (this.fSessionProfilingListener != null) {
            this.fSessionProfilingListener.reliabilityEvent(reliabilityEventType, j, this.fUnreliableTransmissionChannel.getRemoteProcess());
        }
    }

    public String toString() {
        return this.fLogPrefix;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkFlowController getReliableFlowController() {
        return this.fFlowController;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RateLimiterListener getRateLimiterListener() {
        return this.fRateLimiterListener;
    }

    static {
        $assertionsDisabled = !ReliableTransmissionChannel.class.desiredAssertionStatus();
    }
}
