/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.graph.merge;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.merge.DatasourceCompatibilityComparator;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class MergeGraphTableSparkJob {
    private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String PRIORITY_DEFAULT = "BETA";
    private static final Datasource DATASOURCE = new Datasource();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)Objects.requireNonNull(MergeGraphTableSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json")));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        String priority = Optional.ofNullable(parser.get("priority")).orElse(PRIORITY_DEFAULT);
        log.info("priority: {}", (Object)priority);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String betaInputPath = parser.get("betaInputPath");
        log.info("betaInputPath: {}", (Object)betaInputPath);
        String prodInputPath = parser.get("prodInputPath");
        log.info("prodInputPath: {}", (Object)prodInputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String graphTableClassName = parser.get("graphTableClassName");
        log.info("graphTableClassName: {}", (Object)graphTableClassName);
        Class<?> entityClazz = Class.forName(graphTableClassName);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            MergeGraphTableSparkJob.removeOutputDir(spark, outputPath);
            MergeGraphTableSparkJob.mergeGraphTable(spark, priority, betaInputPath, prodInputPath, entityClazz, entityClazz, outputPath);
        });
    }

    private static <P extends Oaf, B extends Oaf> void mergeGraphTable(SparkSession spark, String priority, String betaInputPath, String prodInputPath, Class<P> p_clazz, Class<B> b_clazz, String outputPath) {
        Dataset<Tuple2<String, B>> beta = MergeGraphTableSparkJob.readTableAndGroupById(spark, betaInputPath, b_clazz);
        Dataset<Tuple2<String, P>> prod = MergeGraphTableSparkJob.readTableAndGroupById(spark, prodInputPath, p_clazz);
        prod.joinWith(beta, prod.col("value").equalTo((Object)beta.col("value")), "full_outer").map((MapFunction & Serializable)value -> {
            Optional<Oaf> b;
            Optional<Oaf> p = Optional.ofNullable((Tuple2)value._1()).map(Tuple2::_2);
            if (p.orElse((b = Optional.ofNullable((Tuple2)value._2()).map(Tuple2::_2)).orElse((Oaf)DATASOURCE)) instanceof Datasource) {
                return MergeGraphTableSparkJob.mergeDatasource(p, b);
            }
            switch (priority) {
                default: {
                    return MergeGraphTableSparkJob.mergeWithPriorityToBETA(p, b);
                }
                case "PROD": 
            }
            return MergeGraphTableSparkJob.mergeWithPriorityToPROD(p, b);
        }, Encoders.kryo(p_clazz)).filter(Objects::nonNull).map(arg_0 -> ((ObjectMapper)OBJECT_MAPPER).writeValueAsString(arg_0), Encoders.STRING()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(outputPath);
    }

    protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> p, Optional<B> b) {
        if (p.isPresent() & !b.isPresent()) {
            return (P)((Oaf)p.get());
        }
        if (b.isPresent() & !p.isPresent()) {
            return (P)((Oaf)b.get());
        }
        if (!b.isPresent() & !p.isPresent()) {
            return null;
        }
        Datasource dp = (Datasource)p.get();
        Datasource db = (Datasource)b.get();
        List<Qualifier> list = Arrays.asList(dp.getOpenairecompatibility(), db.getOpenairecompatibility());
        dp.setOpenairecompatibility(Collections.min(list, new DatasourceCompatibilityComparator()));
        dp.setCollectedfrom(Stream.concat(Optional.ofNullable(dp.getCollectedfrom()).map(Collection::stream).orElse(Stream.empty()), Optional.ofNullable(db.getCollectedfrom()).map(Collection::stream).orElse(Stream.empty())).distinct().collect(Collectors.toList()));
        dp.setOriginalId(MergeGraphTableSparkJob.mergeLists(dp.getOriginalId(), db.getOriginalId()));
        dp.setPid(MergeGraphTableSparkJob.mergeLists(dp.getPid(), db.getPid()));
        return (P)dp;
    }

    private static final <T> List<T> mergeLists(List<T> ... lists) {
        return Arrays.stream(lists).filter(Objects::nonNull).flatMap(Collection::stream).filter(Objects::nonNull).distinct().collect(Collectors.toList());
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> p, Optional<B> b) {
        if (b.isPresent() & !p.isPresent()) {
            return (P)((Oaf)b.get());
        }
        if (p.isPresent()) {
            return (P)((Oaf)p.get());
        }
        return null;
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToBETA(Optional<P> p, Optional<B> b) {
        if (p.isPresent() & !b.isPresent()) {
            return (P)((Oaf)p.get());
        }
        if (b.isPresent()) {
            return (P)((Oaf)b.get());
        }
        return null;
    }

    private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableAndGroupById(SparkSession spark, String inputEntityPath, Class<T> clazz) {
        TypedColumn aggregator = new GroupingAggregator<T>(clazz).toColumn();
        log.info("Reading Graph table from: {}", (Object)inputEntityPath);
        return spark.read().textFile(inputEntityPath).map((MapFunction & Serializable)value -> (Oaf)OBJECT_MAPPER.readValue(value, clazz), Encoders.kryo(clazz)).groupByKey((MapFunction & Serializable)oaf -> (String)ModelSupport.idFn().apply(oaf), Encoders.STRING()).agg(aggregator);
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    static {
        Qualifier compatibility = new Qualifier();
        compatibility.setClassid("UNKNOWN");
        DATASOURCE.setOpenairecompatibility(compatibility);
    }

    public static class GroupingAggregator<T extends Oaf>
    extends Aggregator<T, T, T> {
        private Class<T> clazz;

        public GroupingAggregator(Class<T> clazz) {
            this.clazz = clazz;
        }

        public T zero() {
            return null;
        }

        public T reduce(T b, T a) {
            return this.mergeAndGet(b, a);
        }

        private T mergeAndGet(T b, T a) {
            if (Objects.nonNull(a) && Objects.nonNull(b)) {
                if (ModelSupport.isSubClass(a, OafEntity.class).booleanValue() && ModelSupport.isSubClass(b, OafEntity.class).booleanValue()) {
                    return (T)MergeUtils.merge(b, a);
                }
                if (a instanceof Relation && b instanceof Relation) {
                    return (T)MergeUtils.mergeRelation((Relation)((Relation)a), (Relation)((Relation)b));
                }
            }
            return Objects.isNull(a) ? b : a;
        }

        public T merge(T b, T a) {
            return this.mergeAndGet(b, a);
        }

        public T finish(T j) {
            return j;
        }

        public Encoder<T> bufferEncoder() {
            return Encoders.kryo(this.clazz);
        }

        public Encoder<T> outputEncoder() {
            return Encoders.kryo(this.clazz);
        }
    }
}

