/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.Deduper;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import scala.Tuple2;

public class SparkCreateSimRels
implements Serializable {
    private static final Log log = LogFactory.getLog(SparkCreateSimRels.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
        parser.parseArgument(args);
        new SparkCreateSimRels().run(parser);
    }

    private void run(ArgumentApplicationParser parser) throws ISLookUpException, DocumentException {
        String graphBasePath = parser.get("graphBasePath");
        String isLookUpUrl = parser.get("isLookUpUrl");
        String actionSetId = parser.get("actionSetId");
        String workingPath = parser.get("workingPath");
        System.out.println(String.format("graphBasePath: '%s'", graphBasePath));
        System.out.println(String.format("isLookUpUrl: '%s'", isLookUpUrl));
        System.out.println(String.format("actionSetId: '%s'", actionSetId));
        System.out.println(String.format("workingPath: '%s'", workingPath));
        try (SparkSession spark = SparkCreateSimRels.getSparkSession(parser);){
            JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
            for (DedupConfig dedupConf : DedupUtility.getConfigurations(isLookUpUrl, actionSetId)) {
                String entity = dedupConf.getWf().getEntityType();
                String subEntity = dedupConf.getWf().getSubEntityValue();
                JavaPairRDD mapDocument = sc.textFile(DedupUtility.createEntityPath(graphBasePath, subEntity)).mapToPair((PairFunction & Serializable)s -> {
                    MapDocument d = MapDocumentUtil.asMapDocumentWithJPath((DedupConfig)dedupConf, (String)s);
                    return new Tuple2((Object)d.getIdentifier(), (Object)d);
                });
                JavaPairRDD<String, List<MapDocument>> blocks = Deduper.createsortedBlocks(sc, (JavaPairRDD<String, MapDocument>)mapDocument, dedupConf);
                JavaPairRDD<String, String> dedupRels = Deduper.computeRelations2(sc, blocks, dedupConf);
                JavaRDD relationsRDD = dedupRels.map((Function & Serializable)r -> this.createSimRel((String)r._1(), (String)r._2(), entity));
                spark.createDataset(relationsRDD.rdd(), Encoders.bean(Relation.class)).write().mode("overwrite").save(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity));
            }
        }
    }

    public Tuple2<Text, Text> createSequenceFileRow(Relation relation) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        String id = relation.getSource() + "@" + relation.getRelClass() + "@" + relation.getTarget();
        AtomicAction aa = new AtomicAction(Relation.class, (Oaf)relation);
        return new Tuple2((Object)new Text(id), (Object)new Text(mapper.writeValueAsString((Object)aa)));
    }

    public Relation createSimRel(String source, String target, String entity) {
        Relation r = new Relation();
        r.setSource(source);
        r.setTarget(target);
        switch (entity) {
            case "result": {
                r.setRelClass("resultResult_dedupSimilarity_isSimilarTo");
                break;
            }
            case "organization": {
                r.setRelClass("organizationOrganization_dedupSimilarity_isSimilarTo");
                break;
            }
            default: {
                r.setRelClass("isSimilarTo");
            }
        }
        return r;
    }

    private static SparkSession getSparkSession(ArgumentApplicationParser parser) {
        SparkConf conf = new SparkConf();
        return SparkSession.builder().appName(SparkCreateSimRels.class.getSimpleName()).master(parser.get("master")).config(conf).getOrCreate();
    }
}

