package eu.dnetlib.data.mapreduce.wf;

import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.NodeToken;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.mapreduce.HadoopJob;
import eu.dnetlib.data.mapreduce.JobClientFactory;
import eu.dnetlib.data.mapreduce.wf.progress.MapreduceProgressProvider;
import eu.dnetlib.workflow.AsyncJobNode;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.springframework.beans.factory.annotation.Required;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/wf/SetupMapreduceJobNode.class */
public abstract class SetupMapreduceJobNode extends AsyncJobNode {
    private static final Log log = LogFactory.getLog(SetupMapreduceJobNode.class);

    @Resource
    private JobClientFactory jobClientFactory;
    private HadoopJob hadoopJob;

    protected abstract Properties prepareJob(NodeToken nodeToken);

    /* JADX WARN: Finally extract failed */
    public void executeAsync(Engine engine, NodeToken nodeToken) {
        try {
            beforeStart(engine, nodeToken);
            JobConf jobConf = getJobConf(nodeToken, ClusterName.DM);
            String attribute = nodeToken.getEnv().getAttribute("job.lib");
            if (attribute == null || !new File(attribute).canRead()) {
                throw new IllegalArgumentException("unable to read mapreduce library: " + attribute);
            }
            jobConf.setJobName(getHadoopJob().getName());
            jobConf.setJar(attribute);
            nodeToken.getEnv().setAttribute("mapreduce.map.class", jobConf.get("mapreduce.map.class", ""));
            nodeToken.getEnv().setAttribute("mapreduce.reduce.class", jobConf.get("mapreduce.reduce.class", ""));
            log.info("submitting job " + jobConf.getJobName());
            Iterator it = jobConf.iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                log.debug(((String) entry.getKey()) + ": " + ((String) entry.getValue()));
                nodeToken.getEnv().setAttribute((String) entry.getKey(), (String) entry.getValue());
            }
            jobConf.unset("fs.default.name");
            JobClient newInstance = this.jobClientFactory.newInstance(ClusterName.DM.toString());
            try {
                RunningJob submitJob = newInstance.submitJob(jobConf);
                log.info("submitted job " + jobConf.getJobName());
                setProgressProvider(new MapreduceProgressProvider(submitJob));
                nodeToken.getEnv().setTransientAttribute("hadoopConf", jobConf);
                nodeToken.getEnv().setTransientAttribute("runningJob", submitJob);
                submitJob.waitForCompletion();
                if (submitJob.isSuccessful()) {
                    beforeCompleted(nodeToken, submitJob);
                    handleCompletion(engine, nodeToken);
                } else {
                    beforeFailed(nodeToken, submitJob);
                    failed(engine, nodeToken, new RuntimeException(submitJob.getTrackingURL() + "\n" + submitJob.getFailureInfo()));
                }
                newInstance.close();
            } catch (Throwable th) {
                newInstance.close();
                throw th;
            }
        } catch (Throwable th2) {
            beforeFailed(nodeToken, null);
            failed(engine, nodeToken, th2);
        }
    }

    private JobConf getJobConf(NodeToken nodeToken, ClusterName clusterName) {
        return new JobConf(getHadoopJob().setJobDetails(clusterName, prepareJob(nodeToken)).getConfiguration());
    }

    protected void beforeStart(Engine engine, NodeToken nodeToken) {
    }

    protected void beforeCompleted(NodeToken nodeToken, RunningJob runningJob) {
    }

    protected void beforeFailed(NodeToken nodeToken, RunningJob runningJob) {
    }

    protected void handleCompletion(Engine engine, NodeToken nodeToken) {
        engine.complete(nodeToken, Arc.DEFAULT_ARC);
    }

    protected void log(JobConf jobConf) {
        Iterator it = jobConf.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            log.debug(((String) entry.getKey()) + " " + ((String) entry.getValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String env(NodeToken nodeToken, String str) {
        return nodeToken.getEnv().getAttribute(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void deleteHdfsFile(String str, Configuration configuration) {
        try {
            FileSystem fileSystem = FileSystem.get(URI.create(str), configuration);
            Path path = new Path(str);
            if (!fileSystem.exists(path)) {
                log.debug("nothing to delete in: " + path);
            } else {
                fileSystem.delete(path, true);
                fileSystem.close();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public HadoopJob getHadoopJob() {
        return this.hadoopJob;
    }

    @Required
    public void setHadoopJob(HadoopJob hadoopJob) {
        this.hadoopJob = hadoopJob;
    }
}
