package com.mathworks.toolbox.distcomp.mjs.cwo;

import com.mathworks.toolbox.distcomp.mjs.core.util.RepeatingRunner;
import com.mathworks.toolbox.distcomp.util.concurrent.SequentialExecutor;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:com/mathworks/toolbox/distcomp/mjs/cwo/CWOInputStreamHandler.class */
public class CWOInputStreamHandler implements TaskInputStreamHandler {
    private static final long THREAD_TIMEOUT_MILLIS = 10000;
    private final WriteRequestor fWriteRequestor;
    private final int fMaxBytesToTransferPerThread;
    private final SequentialExecutor fSequentialExecutor;
    private final Map<TaskCWOStreamIdentifier, InputStream> fStreams = new ConcurrentHashMap();
    private final Object fLock = new Object();
    private final Set<TaskCWOStreamIdentifier> fStreamsToRead = new HashSet();
    private volatile RepeatingRunner fReadAvailableBytesFromStreams;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static CWOInputStreamHandler createAndStart(WriteRequestor writeRequestor, int i, int i2) {
        CWOInputStreamHandler cWOInputStreamHandler = new CWOInputStreamHandler(writeRequestor, i, i2);
        cWOInputStreamHandler.init();
        return cWOInputStreamHandler;
    }

    private CWOInputStreamHandler(WriteRequestor writeRequestor, int i, int i2) {
        this.fWriteRequestor = writeRequestor;
        this.fMaxBytesToTransferPerThread = i;
        this.fSequentialExecutor = new SequentialExecutor(CWOInputStreamHandler.class.getSimpleName() + " executor -", i2, THREAD_TIMEOUT_MILLIS);
    }

    private void init() {
        this.fReadAvailableBytesFromStreams = RepeatingRunner.createAndPrepareToRun(this::readAvailableBytesFromStreams);
    }

    private void readAvailableBytesFromStreams() {
        ArrayList<TaskCWOStreamIdentifier> arrayList;
        synchronized (this.fLock) {
            arrayList = new ArrayList(this.fStreamsToRead);
            this.fStreamsToRead.clear();
        }
        for (TaskCWOStreamIdentifier taskCWOStreamIdentifier : arrayList) {
            this.fSequentialExecutor.execute(() -> {
                readAvailableBytesFromStream(taskCWOStreamIdentifier);
            }, taskCWOStreamIdentifier);
        }
    }

    private void readAvailableBytesFromStream(TaskCWOStreamIdentifier taskCWOStreamIdentifier) {
        InputStream inputStream = this.fStreams.get(taskCWOStreamIdentifier);
        if (inputStream == null) {
            return;
        }
        byte[] bArr = new byte[this.fMaxBytesToTransferPerThread];
        try {
            int read = inputStream.read(bArr);
            if (read > 0) {
                Log.LOGGER.log(DistcompLevel.FIVE, read + " bytes read from " + taskCWOStreamIdentifier);
                try {
                    this.fWriteRequestor.write(taskCWOStreamIdentifier.getTaskUuid(), Arrays.copyOfRange(bArr, 0, read));
                } catch (WriteFailedException e) {
                    Log.LOGGER.log(DistcompLevel.ONE, "Failed to write bytes for: " + taskCWOStreamIdentifier, (Throwable) e);
                }
            }
            if (read >= this.fMaxBytesToTransferPerThread) {
                Log.LOGGER.log(DistcompLevel.SIX, "More bytes to read from " + taskCWOStreamIdentifier);
                notifyBytesAvailable(taskCWOStreamIdentifier);
            }
        } catch (IOException e2) {
            Log.LOGGER.log(DistcompLevel.ONE, "IOException reading from " + taskCWOStreamIdentifier, (Throwable) e2);
            unregisterStreamForTask(taskCWOStreamIdentifier);
        }
    }

    @Override // com.mathworks.toolbox.distcomp.mjs.cwo.TaskInputStreamHandler
    public void registerStreamForTask(TaskCWOStreamIdentifier taskCWOStreamIdentifier, InputStream inputStream) {
        Log.LOGGER.log(DistcompLevel.FOUR, "Registering input stream with CWOInputStreamHandler for task: " + taskCWOStreamIdentifier);
        if (this.fStreams.put(taskCWOStreamIdentifier, inputStream) != null) {
            if (!$assertionsDisabled) {
                throw new AssertionError("Should not register a stream with the same ID twice!");
            }
            Log.LOGGER.log(DistcompLevel.ONE, "Stream " + taskCWOStreamIdentifier + " has already been registered");
        }
    }

    @Override // com.mathworks.toolbox.distcomp.mjs.cwo.NotifiableTaskReader
    public void notifyBytesAvailable(TaskCWOStreamIdentifier taskCWOStreamIdentifier) {
        Log.LOGGER.log(DistcompLevel.SIX, "Received a notification for " + taskCWOStreamIdentifier);
        synchronized (this.fLock) {
            if (this.fStreamsToRead.add(taskCWOStreamIdentifier)) {
                this.fReadAvailableBytesFromStreams.markForRun();
            }
        }
    }

    @Override // com.mathworks.toolbox.distcomp.mjs.cwo.TaskInputStreamHandler
    public void unregisterStreamForTask(TaskCWOStreamIdentifier taskCWOStreamIdentifier) {
        Log.LOGGER.log(DistcompLevel.FOUR, "Unregistering input stream with CWOInputStreamHandler for task: " + taskCWOStreamIdentifier);
        InputStream remove = this.fStreams.remove(taskCWOStreamIdentifier);
        if (remove != null) {
            try {
                remove.close();
            } catch (IOException e) {
            }
        }
    }

    @Override // com.mathworks.toolbox.distcomp.mjs.cwo.TaskInputStreamHandler
    public void shutdown(long j) throws InterruptedException {
        this.fReadAvailableBytesFromStreams.shutdown();
        this.fReadAvailableBytesFromStreams.awaitShutdown(j);
    }

    static {
        $assertionsDisabled = !CWOInputStreamHandler.class.desiredAssertionStatus();
    }
}
