/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.project;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProgramme;
import eu.dnetlib.dhp.actionmanager.project.utils.model.CSVProject;
import eu.dnetlib.dhp.actionmanager.project.utils.model.JsonTopic;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.H2020Classification;
import eu.dnetlib.dhp.schema.oaf.H2020Programme;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkAtomicActionJob {
    private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkAtomicActionJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/project/action_set_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String projectPath = parser.get("projectPath");
        log.info("projectPath: {}", (Object)projectPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath {}: ", (Object)outputPath);
        String programmePath = parser.get("programmePath");
        log.info("programmePath {}: ", (Object)programmePath);
        String topicPath = parser.get("topicPath");
        log.info("topic path {}: ", (Object)topicPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkAtomicActionJob.removeOutputDir(spark, outputPath);
            SparkAtomicActionJob.getAtomicActions(spark, projectPath, programmePath, topicPath, outputPath);
        });
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static void getAtomicActions(SparkSession spark, String projectPatH, String programmePath, String topicPath, String outputPath) {
        Dataset<CSVProject> project = SparkAtomicActionJob.readPath(spark, projectPatH, CSVProject.class);
        Dataset<CSVProgramme> programme = SparkAtomicActionJob.readPath(spark, programmePath, CSVProgramme.class);
        Dataset<JsonTopic> topic = SparkAtomicActionJob.readPath(spark, topicPath, JsonTopic.class);
        Dataset aaproject = project.joinWith(programme, project.col("programme").equalTo((Object)programme.col("code")), "left").map((MapFunction & Serializable)c -> {
            CSVProject csvProject = (CSVProject)c._1();
            return Optional.ofNullable(c._2()).map(csvProgramme -> {
                Project pp = new Project();
                pp.setId(csvProject.getId());
                pp.setH2020topiccode(csvProject.getTopics());
                H2020Programme pm = new H2020Programme();
                H2020Classification h2020classification = new H2020Classification();
                pm.setCode(csvProject.getProgramme());
                h2020classification.setClassification(csvProgramme.getClassification());
                h2020classification.setH2020Programme(pm);
                SparkAtomicActionJob.setLevelsandProgramme(h2020classification, csvProgramme.getClassification_short());
                pp.setH2020classification(Arrays.asList(h2020classification));
                return pp;
            }).orElse(null);
        }, Encoders.bean(Project.class)).filter(Objects::nonNull);
        aaproject.joinWith(topic, aaproject.col("id").equalTo((Object)topic.col("projectID")), "left").map((MapFunction & Serializable)p -> {
            Optional<Object> op = Optional.ofNullable(p._2());
            Project rp = (Project)p._1();
            rp.setId(SparkAtomicActionJob.createOpenaireId((String)ModelSupport.entityIdPrefix.get("project"), "corda__h2020", rp.getId()));
            op.ifPresent(excelTopic -> rp.setH2020topicdescription(excelTopic.getTitle()));
            return rp;
        }, Encoders.bean(Project.class)).filter(Objects::nonNull).groupByKey(OafEntity::getId, Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(s, it) -> {
            Project first = (Project)it.next();
            it.forEachRemaining(arg_0 -> ((Project)first).mergeFrom(arg_0));
            return first;
        }, Encoders.bean(Project.class)).toJavaRDD().map((Function & Serializable)p -> new AtomicAction(Project.class, (Oaf)p)).mapToPair((PairFunction & Serializable)aa -> new Tuple2((Object)new Text(aa.getClazz().getCanonicalName()), (Object)new Text(OBJECT_MAPPER.writeValueAsString(aa)))).saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
    }

    private static void setLevelsandProgramme(H2020Classification h2020Classification, String classification_short) {
        String[] tmp = classification_short.split(" \\| ");
        h2020Classification.setLevel1(tmp[0]);
        if (tmp.length > 1) {
            h2020Classification.setLevel2(tmp[1]);
        }
        if (tmp.length > 2) {
            h2020Classification.setLevel3(tmp[2]);
        }
        h2020Classification.getH2020Programme().setDescription(tmp[tmp.length - 1]);
    }

    public static <R> Dataset<R> readPath(SparkSession spark, String inputPath, Class<R> clazz) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
    }

    public static String createOpenaireId(String prefix, String nsPrefix, String id) {
        return String.format("%s|%s::%s", prefix, nsPrefix, DHPUtils.md5((String)id));
    }
}

