/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dedup;

import com.google.common.hash.Hashing;
import eu.dnetlib.dedup.DedupUtility;
import eu.dnetlib.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import org.apache.commons.io.IOUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

public class SparkCreateConnectedComponent {
    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateConnectedComponent.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/dedup_parameters.json")));
        parser.parseArgument(args);
        SparkSession spark = SparkSession.builder().appName(SparkCreateConnectedComponent.class.getSimpleName()).master(parser.get("master")).getOrCreate();
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        String inputPath = parser.get("sourcePath");
        String entity = parser.get("entity");
        String targetPath = parser.get("targetPath");
        DedupConfig dedupConf = DedupConfig.load((String)parser.get("dedupConf"));
        JavaPairRDD vertexes = sc.textFile(inputPath + "/" + entity).map((Function & Serializable)s -> MapDocumentUtil.getJPathString((String)dedupConf.getWf().getIdPath(), (String)s)).mapToPair((PairFunction & Serializable)s -> new Tuple2((Object)SparkCreateConnectedComponent.getHashcode(s), s));
        Dataset similarityRelations = spark.read().load(DedupUtility.createSimRelPath(targetPath, entity)).as(Encoders.bean(Relation.class));
        RDD edgeRdd = similarityRelations.javaRDD().map((Function & Serializable)it -> new Edge(SparkCreateConnectedComponent.getHashcode(it.getSource()), SparkCreateConnectedComponent.getHashcode(it.getTarget()), (Object)it.getRelClass())).rdd();
        JavaRDD cc = GraphProcessor.findCCs((RDD<Tuple2<Object, String>>)vertexes.rdd(), (RDD<Edge<String>>)edgeRdd, dedupConf.getWf().getMaxIterations()).toJavaRDD();
        Dataset mergeRelation = spark.createDataset(cc.filter((Function & Serializable)k -> k.getDocIds().size() > 1).flatMap((FlatMapFunction & Serializable)c -> c.getDocIds().stream().flatMap(id -> {
            ArrayList<Relation> tmp = new ArrayList<Relation>();
            Relation r = new Relation();
            r.setSource(c.getCcId());
            r.setTarget(id);
            r.setRelClass("merges");
            tmp.add(r);
            r = new Relation();
            r.setTarget(c.getCcId());
            r.setSource(id);
            r.setRelClass("isMergedIn");
            tmp.add(r);
            return tmp.stream();
        }).iterator()).rdd(), Encoders.bean(Relation.class));
        mergeRelation.write().mode("overwrite").save(DedupUtility.createMergeRelPath(targetPath, entity));
    }

    public static long getHashcode(String id) {
        return Hashing.murmur3_128().hashUnencodedChars((CharSequence)id).asLong();
    }
}

