package com.mathworks.toolbox.distcomp.pmode.transfer;

import com.mathworks.toolbox.distcomp.pmode.poolmessaging.ProcessInstance;
import com.mathworks.toolbox.distcomp.pmode.poolmessaging.RoleOutputGroup;
import com.mathworks.toolbox.parallel.pctutil.logging.DistcompLevel;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/mathworks/toolbox/distcomp/pmode/transfer/DataReceiverImpl.class */
public class DataReceiverImpl extends DataReceiver {
    private RoleOutputGroup fOutGroup;
    private long fTransferSeqNumber;
    private long fLastBlockReturnedIntoM;
    private ProcessInstance fSrcID;
    static final /* synthetic */ boolean $assertionsDisabled;
    private List<TransferPayload> fDataQueue = Collections.synchronizedList(new LinkedList());
    private AtomicLong fLastReceivedBlock = new AtomicLong(-1);
    private AtomicReference<TransferHeader> fTransferHeader = new AtomicReference<>(null);

    public DataReceiverImpl(RoleOutputGroup roleOutputGroup) {
        this.fOutGroup = roleOutputGroup;
        reset();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.transfer.DataReceiver
    public byte[] getNextBlock() {
        if (!isTransferInProgress()) {
            return null;
        }
        byte[] bArr = null;
        if (this.fLastBlockReturnedIntoM < this.fLastReceivedBlock.get()) {
            if (!$assertionsDisabled && this.fDataQueue.isEmpty()) {
                throw new AssertionError("Data queue should not be empty when we have received more blocks than we have returned.");
            }
            bArr = this.fDataQueue.remove(0).fData;
            this.fLastBlockReturnedIntoM++;
        }
        TransferHeader transferHeader = this.fTransferHeader.get();
        if (transferHeader != null && this.fLastBlockReturnedIntoM == transferHeader.fNumberOfBlocks) {
            reset();
        }
        return bArr;
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.transfer.DataReceiver
    public TransferHeader getHeader() {
        return this.fTransferHeader.get();
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.transfer.DataTransfer
    public void dispatch(TransferDataItem transferDataItem, ProcessInstance processInstance) {
        if (!$assertionsDisabled && !this.fSrcID.equals(processInstance)) {
            throw new AssertionError("Received a TransferDataItem from source " + processInstance + ". Expected source to be " + this.fSrcID + ".");
        }
        if (!$assertionsDisabled && !isTransferInProgress()) {
            throw new AssertionError("Received data of type " + transferDataItem.getClass().getName() + "when transfer was not in progress.");
        }
        assertSeqNumber(transferDataItem.getTransferSeqNumber());
        if (transferDataItem instanceof TransferPayload) {
            receiveDataBlock((TransferPayload) transferDataItem);
            return;
        }
        if (transferDataItem instanceof TransferHeader) {
            this.fTransferHeader.set((TransferHeader) transferDataItem);
            return;
        }
        String sMsgUnexpectedTransferCommand = sMsgUnexpectedTransferCommand(transferDataItem);
        PackageInfo.LOGGER.log(DistcompLevel.ZERO, sMsgUnexpectedTransferCommand);
        if (!$assertionsDisabled) {
            throw new AssertionError(sMsgUnexpectedTransferCommand);
        }
    }

    @Override // com.mathworks.toolbox.distcomp.pmode.transfer.DataTransfer
    public void prepareForTransfer(long j, ProcessInstance processInstance) {
        reset();
        this.fTransferSeqNumber = j;
        this.fSrcID = processInstance;
    }

    private void receiveDataBlock(TransferPayload transferPayload) {
        PackageInfo.LOGGER.log(DistcompLevel.FIVE, "Received block " + (transferPayload.fBlockNumber + 1) + " out of " + this.fTransferHeader.get().fNumberOfBlocks + ".");
        if (!$assertionsDisabled && transferPayload.fBlockNumber != this.fLastReceivedBlock.get() + 1) {
            throw new AssertionError("Received blocks out-of-order. Expected block " + (this.fLastReceivedBlock.get() + 1) + " but received block " + transferPayload.fBlockNumber + ".");
        }
        this.fOutGroup.sendTo(this.fSrcID, new TransferACKOfDataReceived(this.fTransferSeqNumber, transferPayload.fBlockNumber));
        this.fDataQueue.add(transferPayload);
        this.fLastReceivedBlock.incrementAndGet();
    }

    private boolean isTransferInProgress() {
        return this.fTransferSeqNumber >= 0;
    }

    private void reset() {
        this.fTransferSeqNumber = -1L;
        this.fTransferHeader.set(null);
        this.fLastReceivedBlock.set(-1L);
        this.fLastBlockReturnedIntoM = -1L;
        this.fSrcID = null;
        while (!this.fDataQueue.isEmpty()) {
            this.fDataQueue.remove(0);
        }
    }

    private void assertSeqNumber(long j) {
        if (!$assertionsDisabled && this.fTransferSeqNumber != j) {
            throw new AssertionError("Invalid Transfer sequence number. Was " + j + ". Expected " + this.fTransferSeqNumber + ".");
        }
    }

    private static String sMsgUnexpectedTransferCommand(TransferCommand transferCommand) {
        return "Received unexpected transfer object of type " + transferCommand.getClass().getName() + " with transfer sequence number " + transferCommand.getTransferSeqNumber() + ".";
    }

    static {
        $assertionsDisabled = !DataReceiverImpl.class.desiredAssertionStatus();
    }
}
