package org.ros.internal.transport.queue;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.ros.concurrent.CancellableLoop;
import org.ros.concurrent.CircularBlockingDeque;
import org.ros.internal.message.MessageBuffers;
import org.ros.message.MessageSerializer;

/* loaded from: input_file:org/ros/internal/transport/queue/OutgoingMessageQueue.class */
public class OutgoingMessageQueue<T> {
    private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
    public static final int SUBSCRIBER_QUEUE_CAPACITY = 16;
    public static final int SERIALIZED_QUEUE_CAPACITY = 16;
    private final MessageSerializer<T> serializer;
    private ExecutorService executorService;
    public Map<Integer, CircularBlockingDeque<ChannelBuffer>> subscriberQueues = new ConcurrentHashMap();
    public Map<Integer, OutgoingMessageQueue<T>.DispatchThread> subscriberDispatchers = new ConcurrentHashMap();
    private final ChannelGroup channelGroup = new DefaultChannelGroup();
    private final Object latchedBufferMutex = new Object();
    public ChannelBuffer latchedBuffer = null;
    public CircularBlockingDeque<ChannelBuffer> messageProcessQueue = new CircularBlockingDeque<>(16);
    public OutgoingMessageQueue<T>.MessageQueueThread messageQueueThread = new MessageQueueThread();
    public AtomicBoolean latchMode = new AtomicBoolean(false);
    public AtomicBoolean isShuttingDown = new AtomicBoolean(false);
    public AtomicInteger latestChannelID = new AtomicInteger(-1);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ros/internal/transport/queue/OutgoingMessageQueue$DispatchThread.class */
    public final class DispatchThread extends CancellableLoop {
        private int channelID;
        public Channel channel;

        public DispatchThread(Channel channel) {
            setOutgoingChannel(channel);
            this.channelID = channel.getId().intValue();
        }

        public void setOutgoingChannel(Channel channel) {
            Preconditions.checkArgument(channel != null, "The outgoing channel cannot be null.");
            this.channel = channel;
        }

        @Override // org.ros.concurrent.CancellableLoop
        public void loop() throws InterruptedException {
            CircularBlockingDeque<ChannelBuffer> circularBlockingDeque = OutgoingMessageQueue.this.subscriberQueues.get(Integer.valueOf(this.channelID));
            if (circularBlockingDeque == null) {
                return;
            }
            this.channel.write(circularBlockingDeque.takeFirst()).await();
        }
    }

    /* loaded from: input_file:org/ros/internal/transport/queue/OutgoingMessageQueue$MessageQueueThread.class */
    private final class MessageQueueThread extends CancellableLoop {
        private MessageQueueThread() {
        }

        @Override // org.ros.concurrent.CancellableLoop
        public void loop() throws InterruptedException {
            ChannelBuffer takeFirst = OutgoingMessageQueue.this.messageProcessQueue.takeFirst();
            Iterator<CircularBlockingDeque<ChannelBuffer>> it = OutgoingMessageQueue.this.subscriberQueues.values().iterator();
            while (it.hasNext()) {
                it.next().addLast(takeFirst.duplicate());
            }
            OutgoingMessageQueue.this.setLatchedMessage(takeFirst.duplicate());
        }
    }

    public OutgoingMessageQueue(MessageSerializer<T> messageSerializer, ExecutorService executorService) {
        this.serializer = messageSerializer;
        this.executorService = executorService;
        this.executorService.execute(this.messageQueueThread);
    }

    public void setLatchMode(boolean z) {
        this.latchMode.set(z);
    }

    public boolean getLatchMode() {
        return this.latchMode.get();
    }

    public OutgoingMessageQueue<T>.DispatchThread createDispatchThread(Channel channel) {
        return new DispatchThread(channel);
    }

    public void add(T t) {
        ChannelBuffer dynamicBuffer = MessageBuffers.dynamicBuffer();
        this.serializer.serialize(t, dynamicBuffer);
        this.messageProcessQueue.addLast(dynamicBuffer.duplicate());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setLatchedMessage(ChannelBuffer channelBuffer) {
        synchronized (this.latchedBufferMutex) {
            this.latchedBuffer = channelBuffer;
        }
    }

    public void shutdown() {
        Iterator<OutgoingMessageQueue<T>.DispatchThread> it = this.subscriberDispatchers.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.messageQueueThread.cancel();
        this.isShuttingDown.set(true);
        this.channelGroup.close().awaitUninterruptibly();
    }

    public void addChannel(Channel channel) {
        boolean z;
        if (this.isShuttingDown.get()) {
            log.warn("Failed to add channel. Cannot add channels after shutdown.");
            return;
        }
        synchronized (this.latchedBufferMutex) {
            z = this.latchedBuffer != null;
        }
        if (this.latchMode.get() && z) {
            writeLatchedMessage(channel);
        }
        int intValue = channel.getId().intValue();
        this.latestChannelID.set(intValue);
        this.subscriberQueues.put(Integer.valueOf(intValue), new CircularBlockingDeque<>(16));
        OutgoingMessageQueue<T>.DispatchThread createDispatchThread = createDispatchThread(channel);
        this.subscriberDispatchers.put(Integer.valueOf(intValue), createDispatchThread);
        this.channelGroup.add(channel);
        this.executorService.execute(createDispatchThread);
        channel.getCloseFuture().addListener(new ChannelFutureListener() { // from class: org.ros.internal.transport.queue.OutgoingMessageQueue.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                int intValue2 = channelFuture.getChannel().getId().intValue();
                OutgoingMessageQueue<T>.DispatchThread dispatchThread = OutgoingMessageQueue.this.subscriberDispatchers.get(Integer.valueOf(intValue2));
                if (dispatchThread == null) {
                    OutgoingMessageQueue.log.error("Failed to stop writer thread for channel " + intValue2 + ". Dispatch thread object was not found.");
                } else {
                    dispatchThread.cancel();
                    OutgoingMessageQueue.this.subscriberDispatchers.remove(Integer.valueOf(intValue2));
                }
                OutgoingMessageQueue.this.subscriberQueues.remove(Integer.valueOf(intValue2));
            }
        });
    }

    private void writeLatchedMessage(Channel channel) {
        synchronized (this.latchedBufferMutex) {
            channel.write(this.latchedBuffer);
        }
    }

    public int getNumberOfChannels() {
        return this.channelGroup.size();
    }

    public ChannelGroup getChannelGroup() {
        return this.channelGroup;
    }
}
