package com.mathworks.toolbox.distcomp.pmode.io;

import com.mathworks.toolbox.distcomp.mjs.core.util.ConcurrencyUtil;
import com.mathworks.toolbox.distcomp.pmode.SessionProfilingListener;
import com.mathworks.toolbox.distcomp.pmode.peermessaging.PeerMessagingRuntimeException;
import com.mathworks.toolbox.distcomp.pmode.shared.ChannelDispatcher;
import com.mathworks.toolbox.distcomp.pmode.shared.CommunicationObserver;
import com.mathworks.toolbox.distcomp.pmode.shared.Connection;
import com.mathworks.toolbox.distcomp.pmode.shared.Dispatcher;
import com.mathworks.toolbox.distcomp.pmode.shared.ErrorHandler;
import com.mathworks.toolbox.distcomp.pmode.shared.Instance;
import com.mathworks.toolbox.distcomp.pmode.shared.Message;
import com.mathworks.toolbox.distcomp.pmode.shared.MessageInfo;
import com.mathworks.toolbox.distcomp.pmode.shared.MessageObserver;
import com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessage;
import com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessageRegistry;
import com.mathworks.toolbox.distcomp.pmode.shared.ReturnMessage;
import com.mathworks.toolbox.distcomp.util.PackageInfo;
import com.mathworks.toolbox.distcomp.util.Pair;
import com.mathworks.toolbox.distcomp.util.concurrent.SequentialExecutor;
import com.mathworks.toolbox.parallel.pctutil.concurrent.NamedThreadFactory;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import com.mathworks.toolbox.parallel.util.concurrent.Awaitable;
import com.mathworks.toolbox.parallel.util.concurrent.ReentrantLock;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup.class */
public final class DirectCommunicationGroup implements Runnable, CommunicationGroup {
    private static final int MAX_SELECT_THREADS = 8;
    private static final int MAX_DISPATCHER_THREADS = 8;
    private static final long THREAD_TIMEOUT_MILLISECONDS = 60000;
    private final Selector fSelector;
    private final Map<Instance, TransmissionChannel> fTransmissionChannelAddressBook;
    private final Set<Instance> fSeenInstances;
    private HashSet<TransmissionChannel> fPendingChannels;
    private final SequentialExecutor fDispatchExec;
    private final AtomicBoolean fKeepGoing;
    private Thread fThread;
    private final ObservableMessageRegistry fReturnRegistry;
    private final ErrorHandler fErrorHandler;
    private final Instance fThisInstance;
    private static final RateLimiter INCOMING_RATE_LIMITER;
    private static final RateLimiter OUTGOING_RATE_LIMITER;
    private final RateLimiter fIncomingRateLimiter;
    private final RateLimiter fOutgoingRateLimiter;
    private final ExecutorService fChannelHandleExec;
    private final long fSelectTimeoutMillis;
    private final long fShortSelectTimeoutMillis;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ConcurrentUniqueQueue<Runnable> fRunnables = new ConcurrentUniqueQueue<>();
    private ChannelDispatcher<Message> fDispatch = null;
    private final Set<Instance> fExpectedInstances = new HashSet();
    private final ExecutorService fCommunicationLostExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.createDaemonThreadFactory(getClass().getSimpleName() + " fCommunicationLostExecutor-", PackageInfo.LOGGER));
    private final List<CommunicationObserver> fCommunicationObservers = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup$AddOpWrite.class */
    public static final class AddOpWrite implements Runnable {
        private final TransmissionChannel fTransmissionChannel;

        private AddOpWrite(TransmissionChannel transmissionChannel) {
            this.fTransmissionChannel = transmissionChannel;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.fTransmissionChannel.addInterestOps(4);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return this.fTransmissionChannel.equals(((AddOpWrite) obj).fTransmissionChannel);
        }

        public int hashCode() {
            return this.fTransmissionChannel.hashCode();
        }

        public String toString() {
            return "addInterestOps(SelectionKey.OP_WRITE) for " + this.fTransmissionChannel;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup$BrokenInstance.class */
    public static class BrokenInstance {
        private final Instance fInstance;
        private final Throwable fThrownException;

        private BrokenInstance(Instance instance, Throwable th) {
            this.fInstance = instance;
            this.fThrownException = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeInstanceAndNotifyErrorHandler(DirectCommunicationGroup directCommunicationGroup) {
            directCommunicationGroup.removeInstanceNoNotification(this.fInstance);
            directCommunicationGroup.notifyLostCommunication(this.fInstance, this.fThrownException);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup$ChannelHandle.class */
    public static final class ChannelHandle {
        private final TransmissionChannel fTransmissionChannel;
        private final Future<Boolean> fFuture;

        ChannelHandle(TransmissionChannel transmissionChannel, ExecutorService executorService) {
            this.fTransmissionChannel = transmissionChannel;
            this.fFuture = executorService.submit(new Callable<Boolean>() { // from class: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.ChannelHandle.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws Exception {
                    return Boolean.valueOf(ChannelHandle.this.fTransmissionChannel.handleSelect());
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean getWorkDeferred() throws Throwable {
            try {
                return this.fFuture.get().booleanValue();
            } catch (InterruptedException | ExecutionException e) {
                throw e.getCause();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TransmissionChannel getChannel() {
            return this.fTransmissionChannel;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup$ConcurrentUniqueQueue.class */
    public static final class ConcurrentUniqueQueue<E> {
        private final ReentrantLock fLock;
        private final Queue<E> fQueue;
        private final Set<E> fSet;

        private ConcurrentUniqueQueue() {
            this.fLock = new ReentrantLock();
            this.fQueue = new LinkedList();
            this.fSet = new HashSet();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void add(E e) {
            if (e == null) {
                throw new NullPointerException("A ConcurrentUniqueQueue can not contain nulls");
            }
            this.fLock.lock();
            try {
                if (this.fSet.add(e)) {
                    this.fQueue.add(e);
                }
            } finally {
                this.fLock.unlock();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public E poll() {
            this.fLock.lock();
            try {
                E poll = this.fQueue.poll();
                if (poll != null) {
                    this.fSet.remove(poll);
                }
                return poll;
            } finally {
                this.fLock.unlock();
            }
        }
    }

    /* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/io/DirectCommunicationGroup$ForwardingDispatcher.class */
    private static final class ForwardingDispatcher implements ChannelDispatcher<Message> {
        private final Dispatcher<Message> fMessageDispatcher;

        private ForwardingDispatcher(Dispatcher<Message> dispatcher) {
            this.fMessageDispatcher = dispatcher;
        }

        @Override // com.mathworks.toolbox.distcomp.pmode.shared.ChannelDispatcher
        public void dispatch(Message message, Instance instance, MessageInfo messageInfo) {
            this.fMessageDispatcher.dispatch(message, instance);
        }

        public String toString() {
            return "ForwardingDispatcher";
        }
    }

    private DirectCommunicationGroup(ErrorHandler errorHandler, ObservableMessageRegistry observableMessageRegistry, Instance instance, RateLimiter rateLimiter, RateLimiter rateLimiter2, long j, long j2) {
        this.fSelectTimeoutMillis = j;
        this.fShortSelectTimeoutMillis = j2;
        this.fIncomingRateLimiter = rateLimiter;
        this.fOutgoingRateLimiter = rateLimiter2;
        Log.LOGGER.log(DistcompLevel.FIVE, "In DirectCommunicationGroup constructor.");
        this.fErrorHandler = errorHandler;
        this.fReturnRegistry = observableMessageRegistry;
        this.fThisInstance = instance;
        this.fKeepGoing = new AtomicBoolean(true);
        this.fTransmissionChannelAddressBook = Collections.synchronizedMap(new HashMap());
        this.fSeenInstances = Collections.synchronizedSet(new HashSet());
        this.fPendingChannels = new LinkedHashSet();
        Log.LOGGER.log(DistcompLevel.FOUR, "Executing handle select using thread pool");
        this.fChannelHandleExec = ConcurrencyUtil.createThreadPool("fChannelHandleExec", 8, 60000L);
        Log.LOGGER.log(DistcompLevel.SIX, "About to call Selector.open()");
        try {
            this.fSelector = Selector.open();
            Log.LOGGER.log(DistcompLevel.SIX, "Called Selector.open()");
            this.fDispatchExec = new SequentialExecutor("fDispatcher", 8, 60000L);
            Log.LOGGER.log(DistcompLevel.FIVE, "Exiting DirectCommunicationGroup constructor.");
        } catch (IOException e) {
            throw new PeerMessagingRuntimeException("Unexpected IOException while opening a selector for " + instance, e);
        }
    }

    public static DirectCommunicationGroup build(ErrorHandler errorHandler, ObservableMessageRegistry observableMessageRegistry, Instance instance, RateLimiter rateLimiter, RateLimiter rateLimiter2, long j, long j2) {
        DirectCommunicationGroup directCommunicationGroup = new DirectCommunicationGroup(errorHandler, observableMessageRegistry, instance, rateLimiter, rateLimiter2, j, j2);
        directCommunicationGroup.startSelectThread();
        return directCommunicationGroup;
    }

    public static DirectCommunicationGroup build(ErrorHandler errorHandler, ObservableMessageRegistry observableMessageRegistry, Instance instance, RateLimiter rateLimiter, RateLimiter rateLimiter2) {
        return build(errorHandler, observableMessageRegistry, instance, rateLimiter, rateLimiter2, IoConstants.sSELECT_TIMEOUT_MILLIS, 200L);
    }

    public static DirectCommunicationGroup build(ErrorHandler errorHandler, ObservableMessageRegistry observableMessageRegistry, Instance instance) {
        return build(errorHandler, observableMessageRegistry, instance, INCOMING_RATE_LIMITER, OUTGOING_RATE_LIMITER);
    }

    private void startSelectThread() {
        this.fThread = new Thread(this, "CommGroup select thread " + toString());
        this.fThread.setDaemon(true);
        this.fThread.start();
    }

    private TransmissionChannel[] getAllTransmissionChannels() {
        TransmissionChannel[] transmissionChannelArr;
        synchronized (this.fTransmissionChannelAddressBook) {
            transmissionChannelArr = (TransmissionChannel[]) this.fTransmissionChannelAddressBook.values().toArray(new TransmissionChannel[this.fTransmissionChannelAddressBook.values().size()]);
        }
        return transmissionChannelArr;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public Awaitable setDispatcher(Dispatcher<Message> dispatcher) {
        return setDispatcher(new ForwardingDispatcher(dispatcher));
    }

    public Awaitable setDispatcher(ChannelDispatcher<Message> channelDispatcher) {
        this.fDispatch = channelDispatcher;
        Log.LOGGER.log(DistcompLevel.FIVE, "Attaching dispatcher: " + channelDispatcher + " to " + getAddressBookSize() + " channels");
        for (TransmissionChannel transmissionChannel : getAllTransmissionChannels()) {
            transmissionChannel.setDispatcher(channelDispatcher, this.fDispatchExec);
        }
        return new Awaitable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.1
            public boolean await(long j, TimeUnit timeUnit) {
                return true;
            }
        };
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public void sendTo(List<Instance> list, Message message) {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, "sendTo(list) - that's " + list.size());
        }
        sendToImpl(list, message);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.ReturnGroup
    public void returnTo(Instance instance, ReturnMessage returnMessage) {
        sendToImpl(Collections.singletonList(instance), returnMessage);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.ReturnGroup
    public void returnTo(List<Instance> list, ReturnMessage returnMessage) {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, "returnTo(list) - that's " + list.size());
        }
        sendToImpl(list, returnMessage);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public void sendTo(List<Instance> list, ObservableMessage observableMessage, MessageObserver messageObserver) {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, "sendTo(list) with observer - that's " + list.size());
        }
        sendToWithObserverImpl(list, observableMessage, messageObserver);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public void sendTo(Instance instance, ObservableMessage observableMessage, MessageObserver messageObserver) {
        sendToWithObserverImpl(Collections.singletonList(instance), observableMessage, messageObserver);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public void sendTo(Instance instance, Message message) {
        sendToImpl(Collections.singletonList(instance), message);
    }

    private Map<Instance, TransmissionChannel> getCurrentDestinations(List<Instance> list, Message message) {
        HashMap hashMap = new HashMap();
        for (Instance instance : list) {
            TransmissionChannel transmissionChannel = this.fTransmissionChannelAddressBook.get(instance);
            if (transmissionChannel != null) {
                hashMap.put(instance, transmissionChannel);
            } else {
                if (!this.fSeenInstances.contains(instance)) {
                    Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup cannot send " + message + " to " + instance);
                    throw new UnableToSendDueToNoAddressBookEntryException(message, instance);
                }
                Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup can no longer send " + message + " to " + instance);
            }
        }
        return hashMap;
    }

    private void sendToWithObserverImpl(List<Instance> list, ObservableMessage observableMessage, MessageObserver messageObserver) {
        Map<Instance, TransmissionChannel> currentDestinations = getCurrentDestinations(list, observableMessage);
        messageObserver.expectReturnsFrom(observableMessage.getSequenceNumber(), new ArrayList(currentDestinations.keySet()));
        if (currentDestinations.isEmpty()) {
            Log.LOGGER.log(DistcompLevel.ONE, "DirectCommunicationGroup cannot send command " + observableMessage + " None of the specified destinations were found in the address book: " + list);
        } else {
            this.fReturnRegistry.addReturnMessageObserver(observableMessage, list, messageObserver);
            sendToChannel(currentDestinations.values(), observableMessage);
        }
    }

    private void sendToImpl(List<Instance> list, Message message) {
        sendToChannel(getCurrentDestinations(list, message).values(), message);
    }

    private void sendToChannel(Collection<TransmissionChannel> collection, Message message) {
        for (TransmissionChannel transmissionChannel : collection) {
            try {
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, "Enqueuing a message: " + message + " to: " + transmissionChannel.getRemoteProcess());
                }
                transmissionChannel.enqueueMessageForSending(message);
                invokeOnSelectThread(new AddOpWrite(transmissionChannel));
            } catch (IOException e) {
                Log.LOGGER.log(DistcompLevel.ONE, "IOException during sendTo", (Throwable) e);
                this.fErrorHandler.writeError(transmissionChannel.getRemoteProcess(), e);
            } catch (Throwable th) {
                RuntimeException launderThrowable = launderThrowable(th);
                Log.LOGGER.log(DistcompLevel.ONE, "RuntimeException during sendTo: ", (Throwable) launderThrowable);
                this.fErrorHandler.writeError(transmissionChannel.getRemoteProcess(), launderThrowable);
            }
        }
    }

    private RuntimeException launderThrowable(Throwable th) {
        if (th instanceof Error) {
            throw ((Error) th);
        }
        if (th.getCause() instanceof OutOfMemoryError) {
            throw ((Error) th.getCause());
        }
        if (th instanceof RuntimeException) {
            return (RuntimeException) th;
        }
        throw new IllegalStateException("Not unchecked", th);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public List<Instance> getConnectedInstances() {
        ArrayList arrayList;
        synchronized (this.fTransmissionChannelAddressBook) {
            arrayList = new ArrayList(this.fTransmissionChannelAddressBook.keySet());
        }
        return arrayList;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public Instance getLocalInstance() {
        return this.fThisInstance;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public Map<Instance, Pair<Long, Long>> getBytesTransferredToInstances() {
        HashMap hashMap = new HashMap();
        for (Instance instance : this.fTransmissionChannelAddressBook.keySet()) {
            TransmissionChannel transmissionChannel = this.fTransmissionChannelAddressBook.get(instance);
            hashMap.put(instance, new Pair(Long.valueOf(transmissionChannel.getTotalReadBytes()), Long.valueOf(transmissionChannel.getTotalWriteBytes())));
        }
        return hashMap;
    }

    private int getAddressBookSize() {
        return this.fTransmissionChannelAddressBook.size();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.OutputGroup
    public void closeStreams() {
        Log.LOGGER.log(DistcompLevel.FIVE, "Closing channels");
        this.fKeepGoing.set(false);
        this.fSelector.wakeup();
        try {
            this.fThread.join(200L);
            Log.LOGGER.log(DistcompLevel.FIVE, "Joined select() thread, closing channels. fThread.isAlive() = " + this.fThread.isAlive());
        } catch (InterruptedException e) {
            Log.LOGGER.log(DistcompLevel.FIVE, "Interruption during join", (Throwable) e);
            Thread.currentThread().interrupt();
        }
        for (TransmissionChannel transmissionChannel : getAllTransmissionChannels()) {
            try {
                transmissionChannel.close();
            } catch (IOException e2) {
                Log.LOGGER.log(DistcompLevel.TWO, "IOException during close: ", (Throwable) e2);
            }
        }
        Log.LOGGER.log(DistcompLevel.FIVE, "Clearing TransmissionChannelAddressBook");
        this.fTransmissionChannelAddressBook.clear();
        try {
            this.fSelector.close();
        } catch (IOException e3) {
            Log.LOGGER.log(DistcompLevel.TWO, "IOException during selector close: ", (Throwable) e3);
        }
        this.fChannelHandleExec.shutdownNow();
        this.fDispatchExec.shutdownNow();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public void addConnection(final Connection connection, SessionProfilingListener sessionProfilingListener) {
        final TransmissionChannel buildTransmissionChannel = TransmissionChannelFactory.buildTransmissionChannel(connection, this.fErrorHandler, this.fIncomingRateLimiter, this.fOutgoingRateLimiter, sessionProfilingListener);
        Instance remoteInstance = connection.getRemoteInstance();
        if (!$assertionsDisabled && this.fTransmissionChannelAddressBook.containsKey(remoteInstance)) {
            throw new AssertionError("Address book should not contain the same remote instance twice");
        }
        Log.LOGGER.log(DistcompLevel.ONE, "Adding remoteInstance " + remoteInstance + " to address book already containing this instance, will overwrite old instance.");
        this.fTransmissionChannelAddressBook.put(remoteInstance, buildTransmissionChannel);
        this.fSeenInstances.add(remoteInstance);
        Iterator<CommunicationObserver> it = this.fCommunicationObservers.iterator();
        while (it.hasNext()) {
            it.next().communicationEstablished(connection.getRemoteInstance());
        }
        invokeOnSelectThread(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                        Log.LOGGER.log(DistcompLevel.FIVE, "Registering " + buildTransmissionChannel + " with selector.");
                    }
                    buildTransmissionChannel.registerWithSelector(DirectCommunicationGroup.this.fSelector, 1, buildTransmissionChannel);
                    DirectCommunicationGroup.this.fExpectedInstances.add(connection.getRemoteInstance());
                    if (DirectCommunicationGroup.this.fDispatch != null) {
                        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                            Log.LOGGER.log(DistcompLevel.FIVE, "Adding dispatcher to " + buildTransmissionChannel);
                        }
                        buildTransmissionChannel.setDispatcher(DirectCommunicationGroup.this.fDispatch, DirectCommunicationGroup.this.fDispatchExec);
                    }
                } catch (IOException e) {
                    Log.LOGGER.log(DistcompLevel.TWO, "IOException while adding a new connection", (Throwable) e);
                }
            }

            public String toString() {
                return "addConnection(" + connection + ")";
            }
        });
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public void addConnection(Connection connection) {
        addConnection(connection, null);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public void removeInstance(Instance instance) {
        removeInstanceNoNotification(instance);
        notifyLostCommunication(instance, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeInstanceNoNotification(final Instance instance) {
        final TransmissionChannel remove = this.fTransmissionChannelAddressBook.remove(instance);
        if (remove == null && Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, "Attempted to remove " + instance + " that is not in address book");
        }
        invokeOnSelectThread(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.3
            @Override // java.lang.Runnable
            public void run() {
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, "About to call safeCloseTransmissionChannel for " + instance);
                }
                safeCloseTransmissionChannel(remove);
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, "About to remove " + instance + " from fExpectedInstances");
                }
                DirectCommunicationGroup.this.fExpectedInstances.remove(instance);
                if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
                    Log.LOGGER.log(DistcompLevel.FIVE, "Finished removing " + instance + " from fExpectedInstances");
                }
            }

            private void safeCloseTransmissionChannel(TransmissionChannel transmissionChannel) {
                if (transmissionChannel == null) {
                    return;
                }
                try {
                    transmissionChannel.close();
                } catch (IOException e) {
                    Log.LOGGER.log(DistcompLevel.TWO, "IOException closing " + transmissionChannel, (Throwable) e);
                }
            }

            public String toString() {
                return "removeInstance(" + instance + ")";
            }
        });
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public void addCommunicationObserver(CommunicationObserver communicationObserver) {
        this.fCommunicationObservers.add(communicationObserver);
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.io.CommunicationGroup
    public void removeCommunicationObserver(CommunicationObserver communicationObserver) {
        this.fCommunicationObservers.remove(communicationObserver);
    }

    private void invokeOnSelectThread(Runnable runnable) {
        this.fRunnables.add(runnable);
        this.fSelector.wakeup();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                long j = -1;
                long j2 = this.fSelectTimeoutMillis;
                long j3 = this.fShortSelectTimeoutMillis;
                while (true) {
                    if (!this.fKeepGoing.get() && j == -1) {
                        Log.LOGGER.log(DistcompLevel.FIVE, "Shutting down select() thread");
                        j = System.currentTimeMillis() + 100;
                        j2 = 50;
                        j3 = 50 / 2;
                    }
                    if (j != -1 && System.currentTimeMillis() > j) {
                        Log.LOGGER.log(DistcompLevel.FIVE, "select() thread returning");
                        Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup select thread exiting.");
                        return;
                    }
                    detectAndHandleShouldAddWriteOp();
                    drainRunnableQueue();
                    if (doSelect(j2, j3)) {
                        Log.LOGGER.log(DistcompLevel.FIVE, "DirectCommunicationGroup.doSelect() returned TRUE, select thread exiting");
                        Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup select thread exiting.");
                        return;
                    }
                    detectLostInstances();
                    detectAndHandleReconnections();
                }
            } catch (Error e) {
                Log.LOGGER.log(DistcompLevel.ONE, "DirectCommunicationGroup caught an Error.", (Throwable) e);
                this.fErrorHandler.communicationError(e);
                throw e;
            }
        } catch (Throwable th) {
            Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup select thread exiting.");
            throw th;
        }
    }

    private void drainRunnableQueue() {
        Object poll = this.fRunnables.poll();
        while (true) {
            Runnable runnable = (Runnable) poll;
            if (runnable == null) {
                return;
            }
            try {
                if (Log.LOGGER.isLoggable(DistcompLevel.SIX)) {
                    Log.LOGGER.log(DistcompLevel.SIX, "Running " + runnable + " on select thread");
                }
                runnable.run();
            } catch (Error e) {
                Log.LOGGER.log(DistcompLevel.ONE, "Error thrown by runnable on select thread.", (Throwable) e);
                throw e;
            } catch (RuntimeException e2) {
                Log.LOGGER.log(DistcompLevel.ONE, "RuntimeException thrown by runnable on select thread.", (Throwable) e2);
            }
            poll = this.fRunnables.poll();
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00d8  */
    /* JADX WARN: Removed duplicated region for block: B:87:? A[RETURN, SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean doSelect(long r8, long r10) {
        /*
            Method dump skipped, instructions count: 720
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.doSelect(long, long):boolean");
    }

    private static void logInvalidKeys(Level level, Selector selector) {
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            for (SelectionKey selectionKey : selector.keys()) {
                if (!selectionKey.isValid()) {
                    Log.LOGGER.log(level, selectionKey + " is not valid.");
                }
            }
        }
    }

    private void removeInstancesAndNotifyErrorHandler(List<BrokenInstance> list) {
        Iterator<BrokenInstance> it = list.iterator();
        while (it.hasNext()) {
            it.next().removeInstanceAndNotifyErrorHandler(this);
        }
    }

    private void detectLostInstances() {
        if (this.fKeepGoing.get()) {
            Collection<Instance> registeredInstances = getRegisteredInstances();
            for (Instance instance : setdiff(this.fExpectedInstances, registeredInstances)) {
                if (Log.LOGGER.isLoggable(DistcompLevel.FOUR)) {
                    Log.LOGGER.log(DistcompLevel.FOUR, "Expected " + instance + " to be registered with selector but it was not");
                }
                removeInstance(instance);
            }
            if (setdiff(registeredInstances, this.fExpectedInstances).isEmpty()) {
                return;
            }
            Log.LOGGER.log(DistcompLevel.ONE, "DirectCommunicationGroup in unexpected state - there are Instances registered with the selector that we did not put there.");
            if (!$assertionsDisabled) {
                throw new AssertionError("Unexpected instance registered with selector");
            }
        }
    }

    private void detectAndHandleReconnections() {
        try {
            ArrayList<SelectionKey> arrayList = new ArrayList();
            for (SelectionKey selectionKey : this.fSelector.keys()) {
                Object attachment = selectionKey.attachment();
                if ((attachment instanceof TransmissionChannel) && ((TransmissionChannel) attachment).shouldReregister()) {
                    arrayList.add(selectionKey);
                }
            }
            for (SelectionKey selectionKey2 : arrayList) {
                Object attachment2 = selectionKey2.attachment();
                if (attachment2 instanceof TransmissionChannel) {
                    TransmissionChannel transmissionChannel = (TransmissionChannel) attachment2;
                    if (Log.LOGGER.isLoggable(DistcompLevel.TWO)) {
                        Log.LOGGER.log(DistcompLevel.TWO, "DirectCommunicationGroup attempting to reregister transmissionChannel");
                    }
                    selectionKey2.cancel();
                    transmissionChannel.registerWithSelector(this.fSelector, 1, transmissionChannel);
                }
            }
        } catch (ClosedChannelException e) {
            Log.LOGGER.log(DistcompLevel.ONE, "Unable to reregister selection key after reconnection.", (Throwable) e);
        } catch (ClosedSelectorException e2) {
            if (this.fKeepGoing.get()) {
                Log.LOGGER.log(DistcompLevel.ONE, "Selector was found to be closed unexpectedly in detectAndHandleReconnections - returning", (Throwable) e2);
            } else {
                Log.LOGGER.log(DistcompLevel.FOUR, "Selector was found to be closed as expected in detectAndHandleReconnections - returning", (Throwable) e2);
            }
        }
    }

    private void detectAndHandleShouldAddWriteOp() {
        try {
            Iterator<SelectionKey> it = this.fSelector.keys().iterator();
            while (it.hasNext()) {
                Object attachment = it.next().attachment();
                if (attachment instanceof TransmissionChannel) {
                    TransmissionChannel transmissionChannel = (TransmissionChannel) attachment;
                    if (transmissionChannel.shouldAddWriteInterestOp()) {
                        try {
                            transmissionChannel.addInterestOps(4);
                        } catch (CancelledKeyException e) {
                            Log.LOGGER.log(DistcompLevel.TWO, "Attempted to send acknowledgement on TransmissionChannel with cancelled selection key, which probably means the TransmissionChannel has been closed");
                        }
                    }
                }
            }
        } catch (ClosedSelectorException e2) {
            if (this.fKeepGoing.get()) {
                Log.LOGGER.log(DistcompLevel.ONE, "Selector was found to be closed unexpectedly in detectAndHandleShouldAddWriteOp - returning", (Throwable) e2);
            } else {
                Log.LOGGER.log(DistcompLevel.FOUR, "Selector was found to be closed as expected in detectAndHandleShouldAddWriteOP - returning", (Throwable) e2);
            }
        }
    }

    private Collection<Instance> setdiff(Collection<Instance> collection, Collection<Instance> collection2) {
        ArrayList arrayList = new ArrayList(collection);
        arrayList.removeAll(collection2);
        return arrayList;
    }

    private Collection<Instance> getRegisteredInstances() {
        Vector vector = new Vector(this.fSelector.keys().size());
        Iterator<SelectionKey> it = this.fSelector.keys().iterator();
        while (it.hasNext()) {
            Object attachment = it.next().attachment();
            if (attachment instanceof TransmissionChannel) {
                vector.add(((TransmissionChannel) attachment).getRemoteProcess());
            } else {
                Log.LOGGER.log(DistcompLevel.ONE, "DirectCommunicationGroup in unexpected state - found non-Selectable [" + attachment + "] attached to selection key.");
                if (!$assertionsDisabled) {
                    throw new AssertionError("Found non-Selectable attached to selection key");
                }
            }
        }
        return vector;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyLostCommunication(final Instance instance, @Nullable final Throwable th) {
        if (!$assertionsDisabled && instance == null) {
            throw new AssertionError("Must specify the instance we've lost communication to");
        }
        if (Log.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            Log.LOGGER.log(DistcompLevel.FIVE, "Notifying error handler of loss of communication to " + instance);
        }
        this.fCommunicationLostExecutor.submit(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.io.DirectCommunicationGroup.4
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = DirectCommunicationGroup.this.fCommunicationObservers.iterator();
                while (it.hasNext()) {
                    ((CommunicationObserver) it.next()).communicationLost(instance, th);
                }
                DirectCommunicationGroup.this.fErrorHandler.lostCommunication(instance, th);
            }
        });
    }

    static {
        $assertionsDisabled = !DirectCommunicationGroup.class.desiredAssertionStatus();
        INCOMING_RATE_LIMITER = new RateLimiter("INCOMING", IoConstants.sHEAP_KB_MAX, IoConstants.sDIRECT_KB_MAX);
        OUTGOING_RATE_LIMITER = new RateLimiter("OUTGOING", IoConstants.sHEAP_KB_MAX, IoConstants.sDIRECT_KB_MAX);
    }
}
