/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord;
import eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation;
import eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import scala.Tuple2;

@ExtendWith(value={MockitoExtension.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest
implements Serializable {
    @Mock(serializable=true)
    ISLookUpService isLookUpService;
    private static SparkSession spark;
    private static JavaSparkContext jsc;
    private static String testGraphBasePath;
    private static String testOutputBasePath;
    private static String testDedupGraphBasePath;
    private static final String testActionSetId = "test-orchestrator";

    @BeforeAll
    public static void cleanUp() throws IOException, URISyntaxException {
        testGraphBasePath = Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()).toFile().getAbsolutePath();
        testOutputBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        testDedupGraphBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
        spark = SparkSession.builder().appName(SparkDedupTest.class.getSimpleName()).master("local[*]").config(new SparkConf()).getOrCreate();
        jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
    }

    @BeforeEach
    public void setUp() throws IOException, ISLookUpException {
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)testActionSetId))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"organization"))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"publication"))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"software"))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"dataset"))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"otherresearchproduct"))).thenReturn((Object)IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json")));
    }

    @Test
    @Order(value=1)
    public void createSimRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath});
        new SparkCreateSimRels(parser, spark).run(this.isLookUpService);
        long orgs_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_simrel").count();
        long pubs_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_simrel").count();
        long sw_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_simrel").count();
        long ds_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_simrel").count();
        long orp_simrel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_simrel").count();
        Assertions.assertEquals((long)3432L, (long)orgs_simrel);
        Assertions.assertEquals((long)7152L, (long)pubs_simrel);
        Assertions.assertEquals((long)344L, (long)sw_simrel);
        Assertions.assertEquals((long)458L, (long)ds_simrel);
        Assertions.assertEquals((long)6750L, (long)orp_simrel);
    }

    @Test
    @Order(value=2)
    public void createMergeRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath});
        new SparkCreateMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count();
        long pubs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").count();
        long sw_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").count();
        long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
        long orp_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").count();
        Assertions.assertEquals((long)1276L, (long)orgs_mergerel);
        Assertions.assertEquals((long)1442L, (long)pubs_mergerel);
        Assertions.assertEquals((long)288L, (long)sw_mergerel);
        Assertions.assertEquals((long)472L, (long)ds_mergerel);
        Assertions.assertEquals((long)718L, (long)orp_mergerel);
    }

    @Test
    @Order(value=3)
    public void createDedupRecordTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateDedupRecord.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath});
        new SparkCreateDedupRecord(parser, spark).run(this.isLookUpService);
        long orgs_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord").count();
        long pubs_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord").count();
        long sw_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord").count();
        long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count();
        long orp_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord").count();
        Assertions.assertEquals((long)82L, (long)orgs_deduprecord);
        Assertions.assertEquals((long)66L, (long)pubs_deduprecord);
        Assertions.assertEquals((long)51L, (long)sw_deduprecord);
        Assertions.assertEquals((long)96L, (long)ds_deduprecord);
        Assertions.assertEquals((long)89L, (long)orp_deduprecord);
    }

    @Test
    @Order(value=4)
    public void updateEntityTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath});
        new SparkUpdateEntity(parser, spark).run(this.isLookUpService);
        long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
        long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
        long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
        long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
        long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
        long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
        long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
        long mergedOrgs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedPubs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedSw = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedDs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedOrp = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        Assertions.assertEquals((long)897L, (long)publications);
        Assertions.assertEquals((long)835L, (long)organizations);
        Assertions.assertEquals((long)100L, (long)projects);
        Assertions.assertEquals((long)100L, (long)datasource);
        Assertions.assertEquals((long)200L, (long)softwares);
        Assertions.assertEquals((long)388L, (long)dataset);
        Assertions.assertEquals((long)517L, (long)otherresearchproduct);
        long deletedOrgs = jsc.textFile(testDedupGraphBasePath + "/organization").filter(this::isDeletedByInference).count();
        long deletedPubs = jsc.textFile(testDedupGraphBasePath + "/publication").filter(this::isDeletedByInference).count();
        long deletedSw = jsc.textFile(testDedupGraphBasePath + "/software").filter(this::isDeletedByInference).count();
        long deletedDs = jsc.textFile(testDedupGraphBasePath + "/dataset").filter(this::isDeletedByInference).count();
        long deletedOrp = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").filter(this::isDeletedByInference).count();
        Assertions.assertEquals((long)mergedOrgs, (long)deletedOrgs);
        Assertions.assertEquals((long)mergedPubs, (long)deletedPubs);
        Assertions.assertEquals((long)mergedSw, (long)deletedSw);
        Assertions.assertEquals((long)mergedDs, (long)deletedDs);
        Assertions.assertEquals((long)mergedOrp, (long)deletedOrp);
    }

    @Test
    @Order(value=5)
    public void propagateRelationTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath});
        new SparkPropagateRelation(parser, spark).run(this.isLookUpService);
        long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
        Assertions.assertEquals((long)4975L, (long)relations);
        Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath((String)testOutputBasePath, (String)"*", (String)"*")).as(Encoders.bean(Relation.class));
        JavaPairRDD mergedIds = mergeRels.where("relClass == 'merges'").select(new Column[]{mergeRels.col("target")}).distinct().toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getString(0), (Object)"d"));
        JavaRDD toCheck = jsc.textFile(testDedupGraphBasePath + "/relation").mapToPair((PairFunction & Serializable)json -> new Tuple2((Object)MapDocumentUtil.getJPathString((String)"$.source", (String)json), json)).join(mergedIds).map((Function & Serializable)t -> (String)((Tuple2)t._2())._1()).mapToPair((PairFunction & Serializable)json -> new Tuple2((Object)MapDocumentUtil.getJPathString((String)"$.target", (String)json), json)).join(mergedIds).map((Function & Serializable)t -> (String)((Tuple2)t._2())._1());
        long deletedbyinference = toCheck.filter(this::isDeletedByInference).count();
        long updated = toCheck.count();
        Assertions.assertEquals((long)updated, (long)deletedbyinference);
    }

    @Test
    @Order(value=6)
    public void testRelations() throws Exception {
        this.testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
        this.testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
    }

    private void testUniqueness(String path, int expected_total, int expected_unique) {
        Dataset rel = spark.read().textFile(this.getClass().getResource(path).getPath()).map((MapFunction & Serializable)s -> (Relation)new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class));
        Assertions.assertEquals((long)expected_total, (long)rel.count());
        Assertions.assertEquals((long)expected_unique, (long)rel.distinct().count());
    }

    @AfterAll
    public static void finalCleanUp() throws IOException {
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
    }

    public boolean isDeletedByInference(String s) {
        return s.contains("\"deletedbyinference\":true");
    }
}

