package com.mathworks.toolbox.distcomp.pmode;

import com.mathworks.toolbox.distcomp.pmode.shared.CommunicationObserver;
import com.mathworks.toolbox.distcomp.pmode.shared.Dispatcher;
import com.mathworks.toolbox.distcomp.pmode.shared.ErrorHandler;
import com.mathworks.toolbox.distcomp.pmode.shared.FinalReturnMessage;
import com.mathworks.toolbox.distcomp.pmode.shared.Instance;
import com.mathworks.toolbox.distcomp.pmode.shared.MessageObserver;
import com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessage;
import com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessageRegistry;
import com.mathworks.toolbox.distcomp.pmode.shared.ReturnMessage;
import com.mathworks.toolbox.distcomp.util.LimitedQueueExecutor;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import com.mathworks.util.Pair;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/ReturnMessageDispatcherImpl.class */
public class ReturnMessageDispatcherImpl implements Dispatcher<ReturnMessage>, ObservableMessageRegistry, CommunicationObserver {
    private ExecutorService fDispatchExec;
    private ErrorHandler fErrorHandler;
    private final ConcurrentHashMap<Pair<Long, Instance>, List<MessageObserver>> fMessageObserverMap = new ConcurrentHashMap<>();
    private final Object fNewMessageObserverListLock = new Object();
    private List<MessageObserver> fNewMessageObserverList = Collections.synchronizedList(new LinkedList());

    public static ReturnMessageDispatcherImpl create(ExecutorService executorService, ErrorHandler errorHandler) {
        return new ReturnMessageDispatcherImpl(executorService, errorHandler);
    }

    protected ReturnMessageDispatcherImpl(ExecutorService executorService, ErrorHandler errorHandler) {
        this.fDispatchExec = new LimitedQueueExecutor(executorService, 50);
        this.fErrorHandler = errorHandler;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.DispatchDefinition
    public Class<ReturnMessage> getRootMessageClass() {
        return ReturnMessage.class;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.Dispatcher
    public void dispatch(final ReturnMessage returnMessage, final Instance instance) {
        if (PackageInfo.LOGGER.isLoggable(DistcompLevel.SIX)) {
            PackageInfo.LOGGER.log(DistcompLevel.SIX, "Received a ReturnMessage with ID:" + returnMessage.getOriginalSequenceNumber() + " from src:" + instance);
        }
        Pair pair = new Pair(Long.valueOf(returnMessage.getOriginalSequenceNumber()), instance);
        List<MessageObserver> remove = returnMessage instanceof FinalReturnMessage ? this.fMessageObserverMap.remove(pair) : this.fMessageObserverMap.get(pair);
        if (remove == null) {
            if (PackageInfo.LOGGER.isLoggable(DistcompLevel.SIX)) {
                PackageInfo.LOGGER.log(DistcompLevel.SIX, "No observers for message with ID:" + returnMessage.getOriginalSequenceNumber() + " from src:" + instance);
            }
        } else {
            try {
                final List<MessageObserver> list = remove;
                this.fDispatchExec.execute(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.ReturnMessageDispatcherImpl.1
                    @Override // java.lang.Runnable
                    public void run() {
                        for (MessageObserver messageObserver : list) {
                            if (messageObserver != null) {
                                try {
                                    if (PackageInfo.LOGGER.isLoggable(DistcompLevel.SIX)) {
                                        PackageInfo.LOGGER.log(DistcompLevel.SIX, "Message " + returnMessage.getClass().getSimpleName() + " " + returnMessage.getOriginalSequenceNumber() + " dispatched to " + messageObserver);
                                    }
                                    messageObserver.completed(returnMessage, instance);
                                } catch (Throwable th) {
                                    PackageInfo.LOGGER.log(DistcompLevel.ONE, "ReturnMessageObserver threw a Throwable.", th);
                                }
                            }
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                this.fErrorHandler.executorError(e);
            }
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessageRegistry
    public void addReturnMessageObserver(ObservableMessage observableMessage, Collection<Instance> collection, MessageObserver messageObserver) {
        Iterator<Instance> it = collection.iterator();
        while (it.hasNext()) {
            addOneReturnMessageObserver(observableMessage, it.next(), messageObserver);
        }
    }

    private void addOneReturnMessageObserver(ObservableMessage observableMessage, Instance instance, MessageObserver messageObserver) {
        List<MessageObserver> putIfAbsent;
        Pair<Long, Instance> pair = new Pair<>(Long.valueOf(observableMessage.getSequenceNumber()), instance);
        synchronized (this.fNewMessageObserverListLock) {
            putIfAbsent = this.fMessageObserverMap.putIfAbsent(pair, this.fNewMessageObserverList);
            if (putIfAbsent == null) {
                putIfAbsent = this.fNewMessageObserverList;
                this.fNewMessageObserverList = Collections.synchronizedList(new LinkedList());
            }
        }
        putIfAbsent.add(messageObserver);
        if (PackageInfo.LOGGER.isLoggable(DistcompLevel.SIX)) {
            PackageInfo.LOGGER.log(DistcompLevel.SIX, "Size of MessageObserverMap = " + this.fMessageObserverMap.size());
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessageRegistry
    public Dispatcher<ReturnMessage> getDispatcher() {
        return this;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.ObservableMessageRegistry
    public void destroy() {
        this.fMessageObserverMap.clear();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.CommunicationObserver
    public void communicationLost(final Instance instance, Throwable th) {
        if (PackageInfo.LOGGER.isLoggable(DistcompLevel.FIVE)) {
            PackageInfo.LOGGER.log(DistcompLevel.FIVE, "ReturnMessageDispatcherImpl.communicationLost(" + instance + ", ...)", th);
        }
        ConcurrentHashMap.KeySetView<Pair> keySet = this.fMessageObserverMap.keySet();
        final LinkedList linkedList = new LinkedList();
        for (Pair pair : keySet) {
            if (((Instance) pair.getSecond()).equals(instance)) {
                linkedList.add(new Pair(this.fMessageObserverMap.get(pair), pair.getFirst()));
                this.fMessageObserverMap.remove(pair);
            }
        }
        try {
            this.fDispatchExec.execute(new Runnable() { // from class: com.mathworks.toolbox.distcomp.pmode.ReturnMessageDispatcherImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    for (Pair pair2 : linkedList) {
                        long longValue = ((Long) pair2.getSecond()).longValue();
                        Iterator it = ((List) pair2.getFirst()).iterator();
                        while (it.hasNext()) {
                            try {
                                ((MessageObserver) it.next()).aborted(longValue, instance);
                            } catch (Throwable th2) {
                                PackageInfo.LOGGER.log(DistcompLevel.ONE, "ReturnMessageObserver threw a Throwable.", th2);
                            }
                        }
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            this.fErrorHandler.executorError(e);
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.shared.CommunicationObserver
    public void communicationEstablished(Instance instance) {
    }
}
