package com.mathworks.toolbox.distcomp.pmode.io;

import com.mathworks.toolbox.distcomp.pmode.SessionProfilingListener;
import com.mathworks.toolbox.distcomp.pmode.peermessaging.PeerMessagingRuntimeException;
import com.mathworks.toolbox.distcomp.pmode.shared.ChannelDispatcher;
import com.mathworks.toolbox.distcomp.pmode.shared.Connection;
import com.mathworks.toolbox.distcomp.pmode.shared.ErrorHandler;
import com.mathworks.toolbox.distcomp.pmode.shared.Instance;
import com.mathworks.toolbox.distcomp.pmode.shared.Message;
import com.mathworks.toolbox.distcomp.pmode.shared.MessageInfo;
import com.mathworks.toolbox.distcomp.pmode.shared.PmodeSerializable;
import com.mathworks.toolbox.distcomp.ui.model.Property;
import com.mathworks.toolbox.distcomp.util.ByteBufferHandle;
import com.mathworks.toolbox.distcomp.util.concurrent.SequentialExecutor;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel.class */
public final class UnreliableTransmissionChannel implements TransmissionChannel {
    private final BlockingQueue<HeaderPayload> fReceiveQueue;
    private final BlockingDeque<MessageHolderAndSequenceNumber> fSendQueue;
    private volatile HeaderPayload fPartiallyReceivedMessage;
    private MessageHolderAndSequenceNumber fNowSendingHolderAndSequence;
    private volatile boolean fReadPending;
    private Connection fConnection;
    private SelectionKey fSelectionKey;
    private MessageHandler fMessageHandler;
    private NetworkFlowController fRateLimiterHandler;
    private final ErrorHandler fErrorHandler;
    private final boolean fResumeIncompleteReads;
    private final RateLimiter fIncomingRateLimiter;
    private final RateLimiter fOutgoingRateLimiter;
    private final Instance fRemoteInstance;
    private final SessionProfilingListener fProfilingListener;
    private static final int MAX_NUM_READS_OR_WRITES_PER_SELECT = 20;
    private final String fLogPrefix;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ReentrantReadWriteLock fLock = new ReentrantReadWriteLock();
    private ChannelDispatcher<Message> fDispatcher = null;
    private SequentialExecutor fDispatchExec = null;
    private final Object fDispatchLock = new Object();
    private final Object fHasCloseBeenCalledLock = new Object();
    private boolean fHasCloseBeenCalled = false;
    private final AtomicLong fTotalReadBytes = new AtomicLong(0);
    private final AtomicLong fTotalWriteBytes = new AtomicLong(0);
    private final AtomicBoolean fHasReceivedFlag = new AtomicBoolean(false);
    private final AtomicBoolean fHasEverReceived = new AtomicBoolean(false);
    private final AtomicBoolean fShouldAddWriteInterestOp = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$MessageHolder.class */
    public interface MessageHolder {
        void acquireNecessaryPermits();

        void releaseAcquiredPermits();

        void consumePermitsWithoutBlocking();

        HeaderPayload getHeaderPayload();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$MessageHolderAndSequenceNumber.class */
    public static final class MessageHolderAndSequenceNumber {
        private final MessageHolder fMessageHolder;
        private final long fSequenceNumber;

        MessageHolderAndSequenceNumber(MessageHolder messageHolder, long j) {
            this.fMessageHolder = messageHolder;
            this.fSequenceNumber = j;
        }

        MessageHolder getMessageHolder() {
            return this.fMessageHolder;
        }

        long getSequenceNumber() {
            return this.fSequenceNumber;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$RateLimitedMessageHolder.class */
    public static class RateLimitedMessageHolder implements MessageHolder {
        private final HeaderPayload fHeaderPayload;
        private final int fHeapKb;
        private final int fDirectKb;
        private final RateLimiter fRateLimiter;
        private AtomicLong fAcquisitionTime = new AtomicLong(-1);
        private AtomicBoolean fHasPermitsToRelease = new AtomicBoolean(false);

        RateLimitedMessageHolder(HeaderPayload headerPayload, RateLimiter rateLimiter) {
            this.fHeaderPayload = headerPayload;
            this.fRateLimiter = rateLimiter;
            this.fHeapKb = this.fHeaderPayload.heapKBytes();
            this.fDirectKb = this.fHeaderPayload.directKBytes();
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public HeaderPayload getHeaderPayload() {
            return this.fHeaderPayload;
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void acquireNecessaryPermits() {
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, "Permit acquisition: heap: " + this.fHeapKb + "direct: " + this.fDirectKb);
            }
            this.fRateLimiter.waitToConsume(this.fHeapKb, this.fDirectKb);
            this.fAcquisitionTime.set(System.currentTimeMillis());
            this.fHasPermitsToRelease.set(true);
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void releaseAcquiredPermits() {
            if (!this.fHasPermitsToRelease.get()) {
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, "Ignoring permit release (already released):  heap: " + this.fHeapKb + " direct: " + this.fDirectKb);
                }
            } else {
                if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                    Log.LOGGER.log(DistcompLevel.SIX, "Message size: " + (this.fHeapKb + this.fDirectKb) + " Kb dealt with in " + (System.currentTimeMillis() - this.fAcquisitionTime.get()) + " ms");
                }
                this.fRateLimiter.release(this.fHeapKb, this.fDirectKb);
                this.fHasPermitsToRelease.set(false);
            }
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void consumePermitsWithoutBlocking() {
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, "Non-blocking permit acquisition: heap: " + this.fHeapKb + " direct: " + this.fDirectKb);
            }
            this.fRateLimiter.consume(this.fHeapKb, this.fDirectKb);
            this.fAcquisitionTime.set(System.currentTimeMillis());
            this.fHasPermitsToRelease.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$RateLimiterBypassingMessageHolder.class */
    public static class RateLimiterBypassingMessageHolder implements MessageHolder {
        private final HeaderPayload fHeaderPayload;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RateLimiterBypassingMessageHolder(HeaderPayload headerPayload) {
            this.fHeaderPayload = headerPayload;
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void acquireNecessaryPermits() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void releaseAcquiredPermits() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public void consumePermitsWithoutBlocking() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.MessageHolder
        public HeaderPayload getHeaderPayload() {
            return this.fHeaderPayload;
        }
    }

    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$UnreliableFlowController.class */
    private class UnreliableFlowController implements NetworkFlowController {
        private UnreliableFlowController() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.NetworkFlowController
        public boolean isAllowedToReceive() {
            return !UnreliableTransmissionChannel.this.fIncomingRateLimiter.isLimitExceeded();
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.NetworkFlowController
        public void blockUntilIsAllowedToSend(MessageHolder messageHolder) {
            messageHolder.acquireNecessaryPermits();
        }
    }

    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/UnreliableTransmissionChannel$UnreliableMessageHandler.class */
    private class UnreliableMessageHandler implements MessageHandler {
        static final /* synthetic */ boolean $assertionsDisabled;

        private UnreliableMessageHandler() {
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.MessageHandler
        public void handleMessage(PmodeSerializable pmodeSerializable, ChannelDispatcher<Message> channelDispatcher, long j) throws Exception {
            if ((pmodeSerializable instanceof AckableMessage) || (pmodeSerializable instanceof AckMessage) || (pmodeSerializable instanceof HeartbeatMessage) || (pmodeSerializable instanceof RateLimiterMessage)) {
                Log.LOGGER.log(DistcompLevel.ONE, UnreliableTransmissionChannel.this.fLogPrefix + "Received reliability message: " + pmodeSerializable.getClass().getSimpleName() + " in unreliable mode. This should never happen");
                if (!$assertionsDisabled) {
                    throw new AssertionError();
                }
            }
            if (pmodeSerializable instanceof Message) {
                UnreliableTransmissionChannel.this.dispatch((Message) pmodeSerializable, channelDispatcher, j);
            }
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.io.MessageHandler
        public void handleControlMessage(PmodeSerializable pmodeSerializable) {
            Log.LOGGER.log(DistcompLevel.ONE, UnreliableTransmissionChannel.this.fLogPrefix + "Received reliability message: " + pmodeSerializable.getClass().getSimpleName() + " in unreliable mode. This should never happen");
            if (!$assertionsDisabled) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !UnreliableTransmissionChannel.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnreliableTransmissionChannel(Connection connection, ErrorHandler errorHandler, RateLimiter rateLimiter, RateLimiter rateLimiter2, SessionProfilingListener sessionProfilingListener, int i) throws PeerMessagingRuntimeException {
        this.fLogPrefix = getClass().getSimpleName() + "-" + i + " ";
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Constructing for " + connection);
        }
        this.fErrorHandler = errorHandler;
        this.fLock.writeLock().lock();
        try {
            try {
                this.fConnection = connection;
                this.fConnection.getSelectableChannel().configureBlocking(false);
                this.fLock.writeLock().unlock();
                this.fRemoteInstance = connection.getRemoteInstance();
                this.fSendQueue = new LinkedBlockingDeque();
                this.fReceiveQueue = new LinkedBlockingQueue();
                this.fResumeIncompleteReads = connection.mustResumeIncompleteReads();
                this.fPartiallyReceivedMessage = Messages.createHeaderPayloadForReceive();
                this.fReadPending = false;
                this.fIncomingRateLimiter = rateLimiter;
                this.fOutgoingRateLimiter = rateLimiter2;
                this.fProfilingListener = sessionProfilingListener;
            } catch (IOException e) {
                throw new PeerMessagingRuntimeException("IOException caught while configuring " + this.fConnection + " to not block.", e);
            }
        } catch (Throwable th) {
            this.fLock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setFlowControllerAndMessageHandler(NetworkFlowController networkFlowController, MessageHandler messageHandler) {
        synchronized (this.fDispatchLock) {
            this.fRateLimiterHandler = networkFlowController;
            this.fMessageHandler = messageHandler;
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public Instance getRemoteProcess() {
        return this.fRemoteInstance;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getReconnectionTimeLimitMillis() {
        this.fLock.readLock().lock();
        try {
            return this.fConnection.getReconnectionTimeLimitMillis();
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getConnectionString() {
        this.fLock.readLock().lock();
        try {
            return this.fConnection.toString();
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueMessageForSending(HeaderPayload headerPayload, long j, boolean z) {
        long j2 = headerPayload.totalBytes();
        this.fTotalWriteBytes.addAndGet(j2);
        notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType.SEND_ENQUEUED, headerPayload.getLogID(), j2);
        MessageHolder rateLimiterBypassingMessageHolder = z ? new RateLimiterBypassingMessageHolder(headerPayload) : new RateLimitedMessageHolder(headerPayload, this.fOutgoingRateLimiter);
        if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Sending message with UID: " + headerPayload.getLogID() + ", bytes: " + j2 + " to: " + getRemoteProcess());
        }
        try {
            this.fRateLimiterHandler.blockUntilIsAllowedToSend(rateLimiterBypassingMessageHolder);
            synchronized (this.fHasCloseBeenCalledLock) {
                if (this.fHasCloseBeenCalled) {
                    if (Log.LOGGER.isLoggable(DistcompLevel.ONE)) {
                        Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Transmission channel closed while waiting for rate limiter permits. Releasing permits, not enqueueing message, returning.");
                    }
                    rateLimiterBypassingMessageHolder.releaseAcquiredPermits();
                } else {
                    boolean offerFirst = z ? this.fSendQueue.offerFirst(new MessageHolderAndSequenceNumber(rateLimiterBypassingMessageHolder, j)) : this.fSendQueue.offer(new MessageHolderAndSequenceNumber(rateLimiterBypassingMessageHolder, j));
                    if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                        Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Enqueued message " + (j == -1 ? Property.EMPTY_MATLAB_STRING_VALUE : " with sequence number: " + Long.toString(j)) + ", queue size now: " + this.fSendQueue.size());
                    }
                    if (!$assertionsDisabled && !offerFirst) {
                        throw new AssertionError("Unexpectedly failed to offer message holder to fSendQueue");
                    }
                }
            }
        } catch (InterruptedException e) {
            Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Interrupted while waiting for rate limiter");
            rateLimiterBypassingMessageHolder.releaseAcquiredPermits();
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueHeartbeat(MessageHolder messageHolder) {
        if (!this.fSendQueue.isEmpty()) {
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Skipping sending heartbeat, since there is already stuff on the send queue");
            }
        } else {
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Sending heartbeat");
            }
            this.fSendQueue.offer(new MessageHolderAndSequenceNumber(messageHolder, -1L));
            this.fShouldAddWriteInterestOp.set(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueAck(MessageHolder messageHolder) {
        this.fSendQueue.offerFirst(new MessageHolderAndSequenceNumber(messageHolder, -1L));
        this.fShouldAddWriteInterestOp.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueRateLimiterMessage(RateLimiterBypassingMessageHolder rateLimiterBypassingMessageHolder) {
        this.fSendQueue.offer(new MessageHolderAndSequenceNumber(rateLimiterBypassingMessageHolder, -1L));
        this.fShouldAddWriteInterestOp.set(true);
    }

    private void notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType messageEventType, long j, long j2) {
        if (this.fProfilingListener != null) {
            this.fProfilingListener.messageEvent(messageEventType, j, this.fRemoteInstance, j2);
        }
    }

    private void dispatchOnExec(final HeaderPayload headerPayload) {
        final RateLimitedMessageHolder rateLimitedMessageHolder;
        synchronized (this.fDispatchLock) {
            if (!$assertionsDisabled && this.fDispatcher == null) {
                throw new AssertionError("No dispatcher!");
            }
            if (!$assertionsDisabled && this.fDispatchExec == null) {
                throw new AssertionError("No executor!");
            }
            final ChannelDispatcher<Message> channelDispatcher = this.fDispatcher;
            try {
                try {
                    rateLimitedMessageHolder = new RateLimitedMessageHolder(headerPayload, this.fIncomingRateLimiter);
                    rateLimitedMessageHolder.consumePermitsWithoutBlocking();
                } catch (IOException | ClassNotFoundException e) {
                    Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Unable to interpret message: " + e);
                }
            } catch (InvalidHeaderDataException e2) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Invalid header data", (Throwable) e2);
            } catch (RejectedExecutionException e3) {
                Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "No executor for dispatch", (Throwable) e3);
            }
            if (!Messages.isControlMessage(headerPayload)) {
                this.fDispatchExec.execute(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.UnreliableTransmissionChannel.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            try {
                                long j = headerPayload.totalBytes();
                                PmodeSerializable interpretCompletedMessage = Messages.interpretCompletedMessage(headerPayload);
                                UnreliableTransmissionChannel.this.fTotalReadBytes.addAndGet(j);
                                UnreliableTransmissionChannel.this.fMessageHandler.handleMessage(interpretCompletedMessage, channelDispatcher, j);
                                rateLimitedMessageHolder.releaseAcquiredPermits();
                            } catch (Throwable th) {
                                Log.LOGGER.log(DistcompLevel.ZERO, UnreliableTransmissionChannel.this.fLogPrefix + "Error occurred during interpretation/dispatch", th);
                                UnreliableTransmissionChannel.this.fErrorHandler.readError(UnreliableTransmissionChannel.this.fRemoteInstance, th);
                                rateLimitedMessageHolder.releaseAcquiredPermits();
                            }
                        } catch (Throwable th2) {
                            rateLimitedMessageHolder.releaseAcquiredPermits();
                            throw th2;
                        }
                    }
                }, this);
                return;
            }
            this.fMessageHandler.handleControlMessage(Messages.interpretCompletedMessage(headerPayload));
            rateLimitedMessageHolder.releaseAcquiredPermits();
        }
    }

    private void dispatchOrQueue(HeaderPayload headerPayload) {
        synchronized (this.fDispatchLock) {
            if (this.fDispatcher != null) {
                if (!$assertionsDisabled && !this.fReceiveQueue.isEmpty()) {
                    throw new AssertionError("Receive queue was non-empty after dispatcher had been set");
                }
                dispatchOnExec(headerPayload);
            } else if (!this.fReceiveQueue.offer(this.fPartiallyReceivedMessage)) {
                Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + "Failed to enqueue a received message");
                if (!$assertionsDisabled) {
                    throw new AssertionError("Failed to enqueue a received message");
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatch(Message message, ChannelDispatcher<Message> channelDispatcher, long j) throws IOException {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "actual dispatch of " + message + " from: " + this.fRemoteInstance);
        }
        channelDispatcher.dispatch(message, this.fRemoteInstance, new MessageInfo(j));
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void setDispatcher(ChannelDispatcher<Message> channelDispatcher, SequentialExecutor sequentialExecutor) {
        synchronized (this.fDispatchLock) {
            this.fDispatcher = channelDispatcher;
            this.fDispatchExec = sequentialExecutor;
            while (!this.fReceiveQueue.isEmpty()) {
                dispatchOnExec(this.fReceiveQueue.remove());
            }
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public synchronized boolean handleSelect() throws IOException, InvalidHeaderDataException {
        this.fLock.readLock().lock();
        try {
            return performHandleSelect();
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    private boolean performHandleSelect() throws IOException, InvalidHeaderDataException {
        if (!this.fSelectionKey.isValid()) {
            return false;
        }
        boolean z = this.fSelectionKey.isValid() && this.fSelectionKey.isWritable();
        if (this.fResumeIncompleteReads) {
            this.fReadPending |= this.fSelectionKey.isValid() && this.fSelectionKey.isReadable();
        } else {
            this.fReadPending = this.fSelectionKey.isValid() && this.fSelectionKey.isReadable();
        }
        boolean z2 = true;
        int i = 0;
        while (z2) {
            boolean isAllowedToReceive = this.fRateLimiterHandler.isAllowedToReceive();
            if (this.fConnection.isOpen() && this.fSelectionKey.isValid() && this.fReadPending) {
                if (isAllowedToReceive) {
                    try {
                        this.fReadPending = handleRead();
                    } catch (InvalidHeaderDataException e) {
                        Log.LOGGER.log(DistcompLevel.ONE, this.fLogPrefix + "Detected invalid message header data. Closing Connection and TransmissionChannel", (Throwable) e);
                        close();
                        throw e;
                    }
                }
                i++;
            }
            if (this.fConnection.isOpen() && this.fSelectionKey.isValid() && z) {
                z = handleWrite();
                i++;
            }
            z2 = (this.fReadPending || z) && this.fConnection.isOpen() && this.fSelectionKey.isValid();
            if (z2 && i >= MAX_NUM_READS_OR_WRITES_PER_SELECT) {
                return this.fResumeIncompleteReads && this.fReadPending;
            }
        }
        return false;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void close() throws IOException {
        synchronized (this.fHasCloseBeenCalledLock) {
            if (Log.LOGGER.isLoggable(DistcompLevel.FOUR)) {
                Log.LOGGER.log(this.fHasCloseBeenCalled ? DistcompLevel.FIVE : DistcompLevel.FOUR, this.fLogPrefix + "Closing " + this);
            }
            this.fHasCloseBeenCalled = true;
        }
        Iterator<MessageHolderAndSequenceNumber> it = this.fSendQueue.iterator();
        while (it.hasNext()) {
            it.next().getMessageHolder().releaseAcquiredPermits();
        }
        this.fLock.readLock().lock();
        try {
            this.fSelectionKey.cancel();
            this.fConnection.close();
            this.fLock.readLock().unlock();
        } catch (Throwable th) {
            this.fLock.readLock().unlock();
            throw th;
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public boolean shouldReregister() {
        return false;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public boolean shouldAddWriteInterestOp() {
        return this.fShouldAddWriteInterestOp.getAndSet(false);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public long getTotalReadBytes() {
        return this.fTotalReadBytes.get();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public long getTotalWriteBytes() {
        return this.fTotalWriteBytes.get();
    }

    private void detectEndOfFile(long j) throws IOException {
        if (j == -1) {
            if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                Log.LOGGER.log(DistcompLevel.FIVE, this.fLogPrefix + "Read -1: EOF for connection " + this.fConnection);
            }
            close();
            throw new EOFException("EOF while reading from " + this.fConnection);
        }
    }

    private void logDataRead(long j, long j2) {
        if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Reading into message UID: " + j2 + ", from: " + getRemoteProcess() + ", bytes: " + j);
        }
    }

    private long readFromChannel(ByteBuffer byteBuffer, long j) throws IOException {
        long read = this.fConnection.read(byteBuffer);
        if (read > 0) {
            this.fHasReceivedFlag.set(true);
            this.fHasEverReceived.set(true);
        }
        logDataRead(read, j);
        detectEndOfFile(read);
        return read;
    }

    private long readFromChannel(ByteBuffer[] byteBufferArr, long j) throws IOException {
        int[] iLimitBuffers = iLimitBuffers(byteBufferArr, IoConstants.sMAX_RECEIVE_LIMIT_BYTES);
        try {
            long read = this.fConnection.read(byteBufferArr);
            if (read > 0) {
                this.fHasReceivedFlag.set(true);
                this.fHasEverReceived.set(true);
            }
            logDataRead(read, j);
            detectEndOfFile(read);
            iRestoreBufferLimits(byteBufferArr, iLimitBuffers);
            return read;
        } catch (Throwable th) {
            iRestoreBufferLimits(byteBufferArr, iLimitBuffers);
            throw th;
        }
    }

    private static ByteBuffer[] handleArr2BufArr(ByteBufferHandle[] byteBufferHandleArr) {
        ByteBuffer[] byteBufferArr = new ByteBuffer[byteBufferHandleArr.length];
        for (int i = 0; i < byteBufferHandleArr.length; i++) {
            byteBufferArr[i] = byteBufferHandleArr[i].get();
        }
        return byteBufferArr;
    }

    private boolean handleRead() throws IOException, InvalidHeaderDataException {
        boolean isHeaderCompletelyTransmitted = this.fPartiallyReceivedMessage.isHeaderCompletelyTransmitted();
        if (!isHeaderCompletelyTransmitted) {
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Attempting to read a header");
            }
            readFromChannel(this.fPartiallyReceivedMessage.getHeaderMessage().get(), this.fPartiallyReceivedMessage.getLogID());
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Completed reading a header");
            }
            if (this.fPartiallyReceivedMessage.isHeaderCompletelyTransmitted()) {
                notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType.RECEIVE_STARTED, this.fPartiallyReceivedMessage.getLogID(), this.fPartiallyReceivedMessage.totalBytes());
                Messages.configurePayload(this.fPartiallyReceivedMessage);
                isHeaderCompletelyTransmitted = true;
            }
        }
        if (!isHeaderCompletelyTransmitted) {
            return false;
        }
        if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Attempting to read the payload");
        }
        long readFromChannel = readFromChannel(handleArr2BufArr(this.fPartiallyReceivedMessage.getPayloadMessages()), this.fPartiallyReceivedMessage.getLogID());
        if (readFromChannel == 0 && !isHeaderCompletelyTransmitted) {
            long currentTimeMillis = System.currentTimeMillis();
            while (readFromChannel == 0 && System.currentTimeMillis() < currentTimeMillis + 5) {
                try {
                    Thread.sleep(1L);
                    readFromChannel += readFromChannel(handleArr2BufArr(this.fPartiallyReceivedMessage.getPayloadMessages()), this.fPartiallyReceivedMessage.getLogID());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (!this.fPartiallyReceivedMessage.isPayloadCompletelyTransmitted()) {
            if (!Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                return false;
            }
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Have not read the payload");
            return false;
        }
        if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Finished reading message with UID: " + this.fPartiallyReceivedMessage.getLogID() + ", bytes: " + this.fPartiallyReceivedMessage.totalBytes());
        }
        notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType.RECEIVE_COMPLETED, this.fPartiallyReceivedMessage.getLogID(), this.fPartiallyReceivedMessage.totalBytes());
        dispatchOrQueue(this.fPartiallyReceivedMessage);
        this.fPartiallyReceivedMessage = Messages.createHeaderPayloadForReceive();
        return true;
    }

    private int[] iLimitBuffers(ByteBuffer[] byteBufferArr, int i) {
        int length = byteBufferArr.length;
        int[] iArr = new int[length];
        int i2 = i;
        for (int i3 = 0; i3 < length; i3++) {
            if (byteBufferArr[i3].hasRemaining()) {
                long position = byteBufferArr[i3].position();
                long min = Math.min(position + i2, byteBufferArr[i3].limit());
                if (!$assertionsDisabled && min > 2147483647L) {
                    throw new AssertionError("newLim must fit into an 'int'");
                }
                i2 -= (int) (min - position);
                iArr[i3] = byteBufferArr[i3].limit();
                byteBufferArr[i3].limit((int) min);
            } else {
                iArr[i3] = -1;
            }
        }
        return iArr;
    }

    private void iRestoreBufferLimits(ByteBuffer[] byteBufferArr, int[] iArr) {
        int length = byteBufferArr.length;
        for (int i = 0; i < length; i++) {
            if (iArr[i] != -1) {
                byteBufferArr[i].limit(iArr[i]);
            }
        }
    }

    private boolean handleWrite() throws IOException {
        if (this.fNowSendingHolderAndSequence == null) {
            this.fNowSendingHolderAndSequence = this.fSendQueue.peek();
        }
        if (this.fNowSendingHolderAndSequence == null) {
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "removeInterestOps(SelectionKey.OP_WRITE) for " + this.fConnection);
            }
            removeInterestOps(4);
            return false;
        }
        MessageHolder messageHolder = this.fNowSendingHolderAndSequence.getMessageHolder();
        HeaderPayload headerPayload = messageHolder.getHeaderPayload();
        ByteBuffer[] handleArr2BufArr = handleArr2BufArr(headerPayload.getAllMessages());
        if (handleArr2BufArr[0].position() == 0) {
            notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType.SEND_STARTED, headerPayload.getLogID(), headerPayload.totalBytes());
        }
        int[] iLimitBuffers = iLimitBuffers(handleArr2BufArr, IoConstants.sMAX_SEND_LIMIT_BYTES);
        try {
            long write = this.fConnection.write(handleArr2BufArr);
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Write: " + write + " to: " + this.fConnection);
            }
            if (!headerPayload.isPayloadCompletelyTransmitted()) {
                return true;
            }
            notifyProfilingMessageEvent(SessionProfilingListener.MessageEventType.SEND_COMPLETED, headerPayload.getLogID(), headerPayload.totalBytes());
            for (ByteBufferHandle byteBufferHandle : headerPayload.getPayloadMessages()) {
                byteBufferHandle.free();
            }
            this.fSendQueue.remove(this.fNowSendingHolderAndSequence);
            this.fNowSendingHolderAndSequence = null;
            messageHolder.releaseAcquiredPermits();
            if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Finished sending message with UID: " + messageHolder.getHeaderPayload().getLogID());
            }
            return !this.fSendQueue.isEmpty();
        } finally {
            iRestoreBufferLimits(handleArr2BufArr, iLimitBuffers);
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void enqueueMessageForSending(Message message) throws IOException {
        if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
            Log.LOGGER.log(DistcompLevel.SIX, this.fLogPrefix + "Enqueueing message " + message + " " + message.getClass().getSimpleName());
        }
        enqueueMessageForSending(Messages.createHeaderPayloadForSend(message), -1L, false);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void registerWithSelector(Selector selector, int i, Object obj) throws ClosedChannelException {
        this.fLock.writeLock().lock();
        try {
            this.fSelectionKey = this.fConnection.getSelectableChannel().register(selector, i, obj);
            if (!this.fSelectionKey.isValid()) {
                String str = "Invalid selection key " + this.fSelectionKey + " returned from registering " + this.fConnection;
                Log.LOGGER.log(DistcompLevel.TWO, this.fLogPrefix + str);
                if (!$assertionsDisabled && !this.fSelectionKey.isValid()) {
                    throw new AssertionError(str);
                }
            }
        } finally {
            this.fLock.writeLock().unlock();
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.TransmissionChannel
    public void addInterestOps(int i) {
        this.fLock.readLock().lock();
        try {
            this.fSelectionKey.interestOps(this.fSelectionKey.interestOps() | i);
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    private void removeInterestOps(int i) {
        this.fLock.readLock().lock();
        try {
            this.fSelectionKey.interestOps(this.fSelectionKey.interestOps() & (i ^ (-1)));
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelSelectionKey() {
        this.fLock.readLock().lock();
        try {
            this.fSelectionKey.cancel();
        } finally {
            this.fLock.readLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void replaceConnection(Connection connection) {
        this.fLock.writeLock().lock();
        try {
            if (Log.LOGGER.isLoggable(DistcompLevel.THREE)) {
                Log.LOGGER.log(DistcompLevel.THREE, this.fLogPrefix + "Replacing : " + this.fConnection + " with: " + connection);
            }
            this.fConnection = connection;
            try {
                this.fConnection.getSelectableChannel().configureBlocking(false);
                this.fPartiallyReceivedMessage = Messages.createHeaderPayloadForReceive();
                this.fReadPending = false;
                this.fSendQueue.remove(this.fNowSendingHolderAndSequence);
                this.fNowSendingHolderAndSequence = null;
            } catch (IOException e) {
                throw new PeerMessagingRuntimeException("IOException caught while configuring " + this.fConnection + " to not block.", e);
            }
        } finally {
            this.fLock.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeErrorWithErrorHandler(IOException iOException) {
        this.fErrorHandler.writeError(this.fRemoteInstance, iOException);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean getAndResetHasReceived() {
        return this.fHasReceivedFlag.getAndSet(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean getHasEverReceived() {
        return this.fHasEverReceived.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<Long> getMessagesOnSendQueue() {
        HashSet hashSet = new HashSet();
        Iterator<MessageHolderAndSequenceNumber> it = this.fSendQueue.iterator();
        while (it.hasNext()) {
            long sequenceNumber = it.next().getSequenceNumber();
            if (sequenceNumber != -1) {
                hashSet.add(Long.valueOf(sequenceNumber));
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeRateLimiterListener(RateLimiterListener rateLimiterListener) {
        this.fIncomingRateLimiter.removeListener(rateLimiterListener);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NetworkFlowController getNewUnreliableFlowController() {
        return new UnreliableFlowController();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageHandler getNewUnreliableMessageHandler() {
        return new UnreliableMessageHandler();
    }

    public String toString() {
        return this.fLogPrefix + "{fConnection=" + getConnectionString() + '}';
    }

    static {
        $assertionsDisabled = !UnreliableTransmissionChannel.class.desiredAssertionStatus();
    }
}
