package eu.dnetlib.data.hadoop.action;

import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanFactory;
import eu.dnetlib.data.hadoop.utils.ScanProperties;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;

/* loaded from: input_file:WEB-INF/lib/dnet-hadoop-service-1.0.5.jar:eu/dnetlib/data/hadoop/action/SubmitMapreduceJobAction.class */
public class SubmitMapreduceJobAction extends AbstractSubmitAction {
    private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class);

    @Override // eu.dnetlib.data.hadoop.action.AbstractHadoopAction
    public void executeAsync(BlackboardServerHandler blackboardServerHandler, BlackboardJob blackboardJob) {
        String str = blackboardJob.getParameters().get("job.name");
        ClusterName valueOf = ClusterName.valueOf(blackboardJob.getParameters().get("cluster"));
        try {
            JobProfile loadISJobConfiguration = loadISJobConfiguration(str, blackboardJob.getParameters());
            validateJobParams(blackboardServerHandler, blackboardJob, str, loadISJobConfiguration);
            JobConf prepareJob = prepareJob(getConf(valueOf), str, loadISJobConfiguration, blackboardJob.getParameters());
            if (!this.clientMap.isMapreduceAvailable(valueOf)) {
                throw new HadoopServiceException("mapreduce not available for cluster: " + valueOf.toString());
            }
            logJobDetails(prepareJob);
            RunningJob submitJob = this.clientMap.getJtClient(valueOf).submitJob(prepareJob);
            String newJobId = newJobId(valueOf, Integer.valueOf(submitJob.getID().getId()));
            this.jobRegistry.registerJob(HadoopJob.newInstance(newJobId, valueOf, loadISJobConfiguration, new MapreduceJobMonitor(submitJob, newCompletionCallback(blackboardServerHandler, blackboardJob, newJobId))));
            updateJobStatus(str);
            blackboardServerHandler.ongoing(blackboardJob);
        } catch (Throwable th) {
            log.error("error executing hadoop job: " + str, th);
            blackboardServerHandler.failed(blackboardJob, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobConf prepareJob(Configuration configuration, String str, JobProfile jobProfile, Map<String, String> map) throws IOException, HadoopServiceException {
        log.info("creating job: " + str);
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName(str);
        jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
        jobConf.setJar(new Path(getJobLib(configuration, jobProfile)).toString());
        set(jobConf, jobProfile.getJobDefinition());
        set(jobConf, map);
        ScanProperties scanProperties = jobProfile.getScanProperties();
        if (jobProfile.getRequiredParams().contains(TableInputFormat.INPUT_TABLE) && scanProperties != null) {
            jobConf.set(TableInputFormat.SCAN, ScanFactory.getScan(scanProperties));
        }
        return jobConf;
    }

    protected String getJobLib(Configuration configuration, JobProfile jobProfile) throws HadoopServiceException {
        String defaultLibPath = getDefaultLibPath(configuration.get("fs.defaultFS"));
        if (jobProfile.getJobDefinition().containsKey("job.lib")) {
            defaultLibPath = jobProfile.getJobDefinition().get("job.lib");
        }
        if (defaultLibPath == null || defaultLibPath.isEmpty()) {
            throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
        }
        if (!defaultLibPath.startsWith("hdfs://")) {
            defaultLibPath = configuration.get("fs.defaultFS") + defaultLibPath;
        }
        log.info("using job.lib: " + defaultLibPath);
        return defaultLibPath;
    }

    protected void set(JobConf jobConf, Map<String, String> map) throws IOException {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            if (checkHdfsProperty(entry)) {
                entry.setValue(jobConf.get("fs.defaultFS") + entry.getValue());
            }
            jobConf.set(entry.getKey(), entry.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logJobDetails(JobConf jobConf) {
        StringWriter stringWriter = new StringWriter();
        try {
            jobConf.writeXml(stringWriter);
            if (log.isDebugEnabled()) {
                log.debug("\n" + IndentXmlString.apply(stringWriter.toString()));
            }
        } catch (IOException e) {
            log.warn("unable to log job details: " + jobConf.getJobName());
        }
    }
}
