package org.ros.internal.node.topic;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.ros.concurrent.ListenerGroup;
import org.ros.concurrent.SignalRunnable;
import org.ros.internal.node.server.NodeIdentifier;
import org.ros.internal.transport.ProtocolNames;
import org.ros.internal.transport.queue.IncomingMessageQueue;
import org.ros.internal.transport.tcp.TcpClientManager;
import org.ros.message.MessageDeserializer;
import org.ros.message.MessageListener;
import org.ros.node.topic.DefaultSubscriberListener;
import org.ros.node.topic.Subscriber;
import org.ros.node.topic.SubscriberListener;

/* loaded from: input_file:org/ros/internal/node/topic/DefaultSubscriber.class */
public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Subscriber<T> {
    public int DEFAULT_SHUTDOWN_TIMEOUT;
    private final NodeIdentifier nodeIdentifier;
    private final ScheduledExecutorService executorService;
    private final IncomingMessageQueue<T> incomingMessageQueue;
    private final Set<PublisherIdentifier> knownPublishers;
    private final TcpClientManager tcpClientManager;
    private final Object mutex;
    private final ListenerGroup<SubscriberListener<T>> subscriberListeners;
    private static final Log log = LogFactory.getLog(DefaultSubscriber.class);
    private static final TimeUnit DEFAULT_SHUTDOWN_TIMEOUT_UNITS = TimeUnit.SECONDS;

    public static <S> DefaultSubscriber<S> newDefault(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, ScheduledExecutorService scheduledExecutorService, MessageDeserializer<S> messageDeserializer) {
        return new DefaultSubscriber<>(nodeIdentifier, topicDeclaration, messageDeserializer, scheduledExecutorService);
    }

    private DefaultSubscriber(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, MessageDeserializer<T> messageDeserializer, ScheduledExecutorService scheduledExecutorService) {
        super(topicDeclaration);
        this.DEFAULT_SHUTDOWN_TIMEOUT = 2;
        this.nodeIdentifier = nodeIdentifier;
        this.executorService = scheduledExecutorService;
        this.incomingMessageQueue = new IncomingMessageQueue<>(messageDeserializer, scheduledExecutorService);
        this.knownPublishers = Sets.newHashSet();
        this.tcpClientManager = new TcpClientManager(scheduledExecutorService);
        this.mutex = new Object();
        this.tcpClientManager.addNamedChannelHandler(new SubscriberHandshakeHandler(toDeclaration().toConnectionHeader(), this.incomingMessageQueue, scheduledExecutorService));
        this.subscriberListeners = new ListenerGroup<>(scheduledExecutorService);
        this.subscriberListeners.add(new DefaultSubscriberListener<T>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.1
            @Override // org.ros.node.topic.DefaultSubscriberListener, org.ros.internal.node.RegistrantListener
            public void onMasterRegistrationSuccess(Subscriber<T> subscriber) {
                DefaultSubscriber.log.info("Subscriber registered: " + DefaultSubscriber.this);
            }

            @Override // org.ros.node.topic.DefaultSubscriberListener, org.ros.internal.node.RegistrantListener
            public void onMasterRegistrationFailure(Subscriber<T> subscriber) {
                DefaultSubscriber.log.info("Subscriber registration failed: " + DefaultSubscriber.this);
            }

            @Override // org.ros.node.topic.DefaultSubscriberListener, org.ros.internal.node.RegistrantListener
            public void onMasterUnregistrationSuccess(Subscriber<T> subscriber) {
                DefaultSubscriber.log.info("Subscriber unregistered: " + DefaultSubscriber.this);
            }

            @Override // org.ros.node.topic.DefaultSubscriberListener, org.ros.internal.node.RegistrantListener
            public void onMasterUnregistrationFailure(Subscriber<T> subscriber) {
                DefaultSubscriber.log.info("Subscriber unregistration failed: " + DefaultSubscriber.this);
            }
        });
    }

    public SubscriberIdentifier toIdentifier() {
        return new SubscriberIdentifier(this.nodeIdentifier, getTopicDeclaration().getIdentifier());
    }

    public SubscriberDeclaration toDeclaration() {
        return new SubscriberDeclaration(toIdentifier(), getTopicDeclaration());
    }

    public Collection<String> getSupportedProtocols() {
        return ProtocolNames.SUPPORTED;
    }

    @Override // org.ros.node.topic.Subscriber
    public boolean getLatchMode() {
        return this.incomingMessageQueue.getLatchMode();
    }

    @Override // org.ros.node.topic.Subscriber
    public void addMessageListener(MessageListener<T> messageListener, int i) {
        this.incomingMessageQueue.addListener(messageListener, i);
    }

    @Override // org.ros.node.topic.Subscriber
    public void addMessageListener(MessageListener<T> messageListener) {
        addMessageListener(messageListener, 1);
    }

    @VisibleForTesting
    public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress inetSocketAddress) {
        synchronized (this.mutex) {
            if (this.knownPublishers.contains(publisherIdentifier)) {
                this.knownPublishers.remove(publisherIdentifier);
            }
            this.tcpClientManager.connect(toString(), inetSocketAddress);
            this.knownPublishers.add(publisherIdentifier);
            signalOnNewPublisher(publisherIdentifier);
        }
    }

    public void updatePublishers(Collection<PublisherIdentifier> collection) {
        Iterator<PublisherIdentifier> it = collection.iterator();
        while (it.hasNext()) {
            this.executorService.execute(new UpdatePublisherRunnable(this, this.nodeIdentifier, it.next()));
        }
    }

    @Override // org.ros.node.topic.Subscriber
    public void shutdown(long j, TimeUnit timeUnit) {
        signalOnShutdown(j, timeUnit);
        this.incomingMessageQueue.shutdown();
        this.tcpClientManager.shutdown();
        this.subscriberListeners.shutdown();
    }

    @Override // org.ros.node.topic.Subscriber
    public void shutdown() {
        shutdown(this.DEFAULT_SHUTDOWN_TIMEOUT, DEFAULT_SHUTDOWN_TIMEOUT_UNITS);
    }

    @Override // org.ros.node.topic.Subscriber
    public void addSubscriberListener(SubscriberListener<T> subscriberListener) {
        this.subscriberListeners.add(subscriberListener);
    }

    @Override // org.ros.internal.node.topic.DefaultTopicParticipant
    public void signalOnMasterRegistrationSuccess() {
        this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.2
            @Override // org.ros.concurrent.SignalRunnable
            public void run(SubscriberListener<T> subscriberListener) {
                subscriberListener.onMasterRegistrationSuccess(this);
            }
        });
    }

    @Override // org.ros.internal.node.topic.DefaultTopicParticipant
    public void signalOnMasterRegistrationFailure() {
        this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.3
            @Override // org.ros.concurrent.SignalRunnable
            public void run(SubscriberListener<T> subscriberListener) {
                subscriberListener.onMasterRegistrationFailure(this);
            }
        });
    }

    @Override // org.ros.internal.node.topic.DefaultTopicParticipant
    public void signalOnMasterUnregistrationSuccess() {
        this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.4
            @Override // org.ros.concurrent.SignalRunnable
            public void run(SubscriberListener<T> subscriberListener) {
                subscriberListener.onMasterUnregistrationSuccess(this);
            }
        });
    }

    @Override // org.ros.internal.node.topic.DefaultTopicParticipant
    public void signalOnMasterUnregistrationFailure() {
        this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.5
            @Override // org.ros.concurrent.SignalRunnable
            public void run(SubscriberListener<T> subscriberListener) {
                subscriberListener.onMasterUnregistrationFailure(this);
            }
        });
    }

    public void signalOnNewPublisher(final PublisherIdentifier publisherIdentifier) {
        this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.6
            @Override // org.ros.concurrent.SignalRunnable
            public void run(SubscriberListener<T> subscriberListener) {
                subscriberListener.onNewPublisher(this, publisherIdentifier);
            }
        });
    }

    private void signalOnShutdown(long j, TimeUnit timeUnit) {
        try {
            this.subscriberListeners.signal(new SignalRunnable<SubscriberListener<T>>() { // from class: org.ros.internal.node.topic.DefaultSubscriber.7
                @Override // org.ros.concurrent.SignalRunnable
                public void run(SubscriberListener<T> subscriberListener) {
                    subscriberListener.onShutdown(this);
                }
            }, j, timeUnit);
        } catch (InterruptedException e) {
        }
    }

    public String toString() {
        return "Subscriber<" + getTopicDeclaration() + ">";
    }
}
