/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.actionmanager.blackboard;

import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import eu.dnetlib.data.actionmanager.blackboard.ActionManagerBlackboardCallback;
import eu.dnetlib.data.actionmanager.blackboard.ActionManagerBlackboardJobListener;
import eu.dnetlib.data.hadoop.HadoopIsClient;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobListener;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
import eu.dnetlib.rmi.data.hadoop.ClusterName;
import eu.dnetlib.rmi.data.hadoop.HadoopBlackboardActions;
import eu.dnetlib.rmi.data.hadoop.HadoopService;
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerException;
import eu.dnetlib.rmi.data.hadoop.actionmanager.ActionManagerSet;
import eu.dnetlib.rmi.enabling.ISLookUpException;
import eu.dnetlib.rmi.enabling.ISLookUpService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;

public class ActionManagerJobLauncher {
    public static final String ALL_SETS = "__ALL__";
    public static final String SEMICOLON = ";";
    public static final String COLON = ",";
    public static final String SEQFILE_INPUTFORMAT = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat";
    private static final String UPDATE_ACTION_PROFILE = "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'ActionManagerSetDSResourceType' and .//SET/@id = '%s'] return update delete $x//RAW_SETS/EXPIRED[@id = '%s']";
    private static final Log log = LogFactory.getLog(ActionManagerJobLauncher.class);
    private final Executor executor = Executors.newCachedThreadPool();
    @Resource
    private UniqueServiceLocator serviceLocator;
    private HadoopIsClient isClient;
    @Resource
    private BlackboardClientHandler blackboardClientHandler;
    @Resource
    private BlackboardJobRegistry jobRegistry;
    @Value(value="${services.actionmanager.promote.mapper.class}")
    private String promoteJobMapperClass;

    public void executePromoteFromHDFSJob(Set<String> sets, String targetTable, ActionManagerBlackboardCallback callback) throws ActionManagerException, ISLookUpException {
        log.info((Object)("Starting commit m/r job; sets=" + sets));
        ArrayList paths = Lists.newArrayList();
        if (sets == null || sets.isEmpty()) {
            paths.addAll(this.isClient.listSets().stream().map(ActionManagerSet::getPathToLatest).collect(Collectors.toList()));
        } else {
            for (String setId : sets) {
                if (this.isClient.existsSet(setId)) {
                    ActionManagerSet set = this.isClient.getSet(setId);
                    paths.add(set.getPathToLatest());
                    continue;
                }
                log.error((Object)("Invalid set " + setId));
                throw new ActionManagerException("Invalid set " + setId);
            }
        }
        HadoopService hadoopService = (HadoopService)this.serviceLocator.getService(HadoopService.class);
        ArrayList existingPaths = Lists.newArrayList((Iterable)Iterables.filter((Iterable)paths, path -> {
            try {
                boolean exist = hadoopService.existHdfsPath(ClusterName.DM.toString(), path);
                if (!exist) {
                    log.warn((Object)String.format("path '%s' doesn't exist on DM cluster'", path));
                }
                return exist;
            }
            catch (HadoopServiceException e) {
                log.error((Object)e);
                return false;
            }
        }));
        HashMap params = Maps.newHashMap();
        params.put("mapred.input.dir.formats", Joiner.on((String)COLON).join(Iterables.transform((Iterable)existingPaths, path -> path + SEMICOLON + SEQFILE_INPUTFORMAT)));
        params.put("mapred.input.dir.mappers", Joiner.on((String)COLON).join(Iterables.transform((Iterable)existingPaths, path -> path + SEMICOLON + this.promoteJobMapperClass)));
        params.put("hbase.mapred.outputtable", targetTable);
        params.put("hbase.mapreduce.outputtable", targetTable);
        log.info((Object)("promoting HDFS rawsets: " + paths));
        this.executeHDFS("promoteMultipleActionSetsJob", params, callback);
    }

    private void executeHDFS(String jobName, Map<String, String> params, ActionManagerBlackboardCallback callback) throws ActionManagerException {
        if (params == null || params.isEmpty()) {
            throw new ActionManagerException("Missing HDFS paths");
        }
        this.executor.execute(() -> {
            try {
                String serviceId = this.findHadoopServiceProfileID();
                BlackboardJob bbJob = this.blackboardClientHandler.newJob(serviceId);
                bbJob.setAction(HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString());
                bbJob.getParameters().put("job.name", jobName);
                bbJob.getParameters().put("cluster", ClusterName.DM.toString());
                bbJob.getParameters().putAll(params);
                this.jobRegistry.registerJobListener(bbJob, (BlackboardJobListener)new ActionManagerBlackboardJobListener(callback));
                this.blackboardClientHandler.assign(bbJob);
            }
            catch (Exception e) {
                log.error((Object)("Error launching m/r job: " + jobName), (Throwable)e);
                throw new RuntimeException("Error launching m/r job: " + jobName, e);
            }
        });
    }

    private String findHadoopServiceProfileID() throws Exception {
        return ((ISLookUpService)this.serviceLocator.getService(ISLookUpService.class)).getResourceProfileByQuery("collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()");
    }

    public HadoopIsClient getIsClient() {
        return this.isClient;
    }

    @Required
    public void setIsClient(HadoopIsClient isClient) {
        this.isClient = isClient;
    }
}

