/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2;

public class SparkCreateConnectedComponent {
    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        parser.parseArgument(args);
        new SparkCreateConnectedComponent().run(parser);
    }

    private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
        String graphBasePath = parser.get("graphBasePath");
        String workingPath = parser.get("workingPath");
        String isLookUpUrl = parser.get("isLookUpUrl");
        String actionSetId = parser.get("actionSetId");
        try (SparkSession spark = SparkCreateConnectedComponent.getSparkSession(parser);){
            JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
            for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
                String entity = dedupConf.getWf().getEntityType();
                String subEntity = dedupConf.getWf().getSubEntityValue();
                JavaPairRDD vertexes = sc.textFile(graphBasePath + "/" + subEntity).map((Function & Serializable)s -> MapDocumentUtil.getJPathString((String)dedupConf.getWf().getIdPath(), (String)s)).mapToPair((PairFunction & Serializable)s -> new Tuple2((Object)SparkCreateConnectedComponent.getHashcode(s), s));
                Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).as(Encoders.bean(Relation.class));
                RDD edgeRdd = similarityRelations.javaRDD().map((Function & Serializable)it -> new Edge(SparkCreateConnectedComponent.getHashcode(it.getSource()), SparkCreateConnectedComponent.getHashcode(it.getTarget()), (Object)it.getRelClass())).rdd();
                JavaRDD cc = GraphProcessor.findCCs((RDD<Tuple2<Object, String>>)vertexes.rdd(), (RDD<Edge<String>>)edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
                Dataset mergeRelation = spark.createDataset(cc.filter((Function & Serializable)k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction & Serializable)c -> c.getDocIds().stream().flatMap(id -> {
                    ArrayList<Relation> tmp = new ArrayList<Relation>();
                    Relation r = new Relation();
                    r.setSource(c.getCcId());
                    r.setTarget(id);
                    r.setRelClass("merges");
                    tmp.add(r);
                    r = new Relation();
                    r.setTarget(c.getCcId());
                    r.setSource(id);
                    r.setRelClass("isMergedIn");
                    tmp.add(r);
                    return tmp.stream();
                }).iterator()).rdd(), Encoders.bean(Relation.class));
                mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(workingPath, actionSetId, entity));
            }
        }
    }

    public static long getHashcode(String id) {
        return Hashing.murmur3_128().hashString((CharSequence)id).asLong();
    }

    private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
        SparkConf conf = new SparkConf();
        return SparkSession.builder().appName(SparkCreateSimRels.class.getSimpleName()).master(parser.get("master")).config(conf).getOrCreate();
    }
}

