package eu.dnetlib.data.hadoop.mapred;

import com.google.common.collect.Maps;
import eu.dnetlib.data.hadoop.action.JobCompletion;
import eu.dnetlib.data.hadoop.action.JobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.jets3t.service.impl.soap.axis._2006_03_01.StorageClass;

/* loaded from: input_file:WEB-INF/lib/dnet-hadoop-service-2.2.3.jar:eu/dnetlib/data/hadoop/mapred/MapreduceJobMonitor.class */
public class MapreduceJobMonitor extends JobMonitor {
    private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class);
    private final RunningJob runningJob;

    public MapreduceJobMonitor(RunningJob runningJob, JobCompletion jobCompletion) {
        super(jobCompletion);
        this.runningJob = runningJob;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor, java.lang.Runnable
    public void run() {
        try {
            log.info("waiting for job completion: " + getRunningJob().getID().getId());
            int runState = getRunState();
            while (!getRunningJob().isComplete()) {
                Thread.sleep(10000L);
                int runState2 = getRunState();
                if (runState2 != runState) {
                    runState = runState2;
                    this.lastActivity = new Date(System.currentTimeMillis());
                }
            }
            if (getRunningJob().isSuccessful()) {
                log.info("job successful: " + getRunningJob().getID().getId());
                getCallback().done(asMap(getRunningJob().getCounters()));
            } else {
                String str = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
                getCallback().failed(str, new HadoopServiceException(str));
            }
        } catch (Throwable th) {
            getCallback().failed(getHadoopId(), th);
        }
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getHadoopId() {
        return String.valueOf(getRunningJob().getID().getId());
    }

    protected Map<String, String> asMap(Counters counters) {
        HashMap newHashMap = Maps.newHashMap();
        if (counters != null) {
            Iterator<Counters.Group> it = counters.iterator();
            while (it.hasNext()) {
                Counters.Group next = it.next();
                Iterator<Counters.Counter> it2 = next.iterator();
                while (it2.hasNext()) {
                    Counters.Counter next2 = it2.next();
                    newHashMap.put(next.getDisplayName() + "." + next2.getDisplayName(), String.valueOf(next2.getValue()));
                }
            }
        }
        return newHashMap;
    }

    public RunningJob getRunningJob() {
        return this.runningJob;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getStatus() {
        try {
            return JobStatus.getJobRunState(getRunState());
        } catch (IOException e) {
            log.error("error accessing job status", e);
            return StorageClass._UNKNOWN;
        }
    }

    private int getRunState() throws IOException {
        return getRunningJob().getJobStatus().getRunState();
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public Date getLastActivity() {
        return this.lastActivity;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public Date getStartTime() throws HadoopServiceException {
        try {
            return new Date(getRunningJob().getJobStatus().getStartTime());
        } catch (IOException e) {
            throw new HadoopServiceException("unable to read job start time", e);
        }
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getTrackerUrl() {
        return getRunningJob().getTrackingURL();
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public void kill() {
        try {
            log.info("killing job: " + getHadoopId());
            getRunningJob().killJob();
        } catch (IOException e) {
            log.error("unable to kill job: " + getHadoopId(), e);
        }
    }
}
