package com.mathworks.toolbox.parallel.hadoop.link;

import com.mathworks.toolbox.parallel.hadoop.MatlabConfiguration;
import com.mathworks.toolbox.parallel.hadoop.MatlabOutputFormat;
import com.mathworks.toolbox.parallel.hadoop.SerializationUtils;
import com.mathworks.toolbox.parallel.mapreduce.Endpoint;
import com.mathworks.toolbox.parallel.mapreduce.HadoopTaskAttemptID;
import com.mathworks.toolbox.parallel.mapreduce.KeyValueMessage;
import com.mathworks.toolbox.parallel.mapreduce.KeyValueSocket;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Reducer;

/* loaded from: input_file:com/mathworks/toolbox/parallel/hadoop/link/MatlabReducer.class */
public final class MatlabReducer extends Reducer<BytesWritable, BytesWritable, Void, Void> {
    private MatlabConfiguration fConfig;
    private HadoopTaskAttemptID fTaskAttemptID;
    private KeyValueSocket fKeyValueSocket;
    private String fOutputFolder;
    private String fFinalOutputFolder;
    private HadoopMatlabWorker fWorker;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mathworks/toolbox/parallel/hadoop/link/MatlabReducer$MatlabFinishedEarlyException.class */
    public static class MatlabFinishedEarlyException extends IOException {
        private MatlabFinishedEarlyException() {
        }
    }

    protected void setup(Reducer<BytesWritable, BytesWritable, Void, Void>.Context context) throws IOException, InterruptedException {
        TaskUtils.throwIfOutputFolderDoesNotExist(context);
        Configuration configuration = context.getConfiguration();
        this.fConfig = new MatlabConfiguration(configuration);
        this.fTaskAttemptID = TaskUtils.convertTaskAttemptID(context.getTaskAttemptID(), HadoopTaskAttemptID.TaskType.REDUCE);
        this.fOutputFolder = MatlabOutputFormat.getWorkOutputPath(context).toString();
        this.fFinalOutputFolder = MatlabOutputFormat.getOutputPath(context).toString();
        if (!MatlabWorkerSingleton.isInitialized()) {
            try {
                MatlabWorkerSingleton.initialize(configuration, context.getJobID().toString());
            } catch (Exception e) {
                TaskUtils.handleMatlabStartupException(e, this.fTaskAttemptID);
            }
        }
        this.fWorker = MatlabWorkerSingleton.getOrCreateWorker();
        this.fKeyValueSocket = new KeyValueSocket();
    }

    protected void cleanup(Reducer<BytesWritable, BytesWritable, Void, Void>.Context context) throws IOException, InterruptedException {
        this.fKeyValueSocket.close();
    }

    private Endpoint bindEndpoint() throws IOException {
        Endpoint endpoint = new Endpoint("ipc://ml_" + this.fTaskAttemptID.toString(), this.fWorker.getWorkerFolder());
        this.fKeyValueSocket.bind(endpoint);
        return endpoint;
    }

    private MatlabWorkerFevalFuture launchMatlab(Endpoint endpoint) throws IOException, InterruptedException {
        return this.fWorker.feval(this.fConfig.getReducerFunction(), endpoint.toString(), endpoint.getBaseFolderAsString(), this.fTaskAttemptID.toString(), this.fOutputFolder, this.fFinalOutputFolder, SerializationUtils.serializeConfiguration(this.fConfig.toConfiguration()));
    }

    private void reduce(Reducer<BytesWritable, BytesWritable, Void, Void>.Context context) throws IOException, InterruptedException {
        MatlabWorkerFevalFuture launchMatlab = launchMatlab(bindEndpoint());
        while (context.nextKey()) {
            try {
                sendKey((BytesWritable) context.getCurrentKey(), launchMatlab);
                Iterator it = context.getValues().iterator();
                while (it.hasNext()) {
                    sendValue((BytesWritable) it.next(), launchMatlab);
                }
            } catch (MatlabFinishedEarlyException e) {
            }
        }
        sendClose(launchMatlab);
        recvClose(launchMatlab);
        launchMatlab.get();
        this.fWorker.returnToPool();
    }

    public void run(Reducer<BytesWritable, BytesWritable, Void, Void>.Context context) throws IOException, InterruptedException {
        setup(context);
        try {
            reduce(context);
        } finally {
            cleanup(context);
        }
    }

    private void doSend(KeyValueMessage keyValueMessage, Future future) throws IOException {
        while (!future.isDone()) {
            if (this.fKeyValueSocket.send(keyValueMessage, 1000L, TimeUnit.MILLISECONDS)) {
                return;
            }
        }
        throw new MatlabFinishedEarlyException();
    }

    private void sendKey(BytesWritable bytesWritable, Future future) throws IOException {
        ByteBuffer wrap = ByteBuffer.wrap(bytesWritable.getBytes());
        wrap.limit(bytesWritable.getLength());
        doSend(KeyValueMessage.createKeyMessage(wrap), future);
    }

    private void sendValue(BytesWritable bytesWritable, Future future) throws IOException {
        ByteBuffer wrap = ByteBuffer.wrap(bytesWritable.getBytes());
        wrap.limit(bytesWritable.getLength());
        doSend(KeyValueMessage.createValueMessage(wrap), future);
    }

    private void sendClose(Future future) throws IOException {
        doSend(KeyValueMessage.createCloseMessage(), future);
    }

    private void recvClose(Future future) throws IOException {
        while (!future.isDone() && this.fKeyValueSocket.recv(1000L, TimeUnit.MILLISECONDS) == null) {
        }
    }
}
