/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCopyRelationsNoOpenorgs;
import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord;
import eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.oa.dedup.SparkPropagateRelation;
import eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity;
import eu.dnetlib.dhp.oa.dedup.SparkWhitelistSimRels;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public class SparkDedupTest
implements Serializable {
    static final boolean CHECK_CARDINALITIES = true;
    @Mock(serializable=true)
    ISLookUpService isLookUpService;
    private static SparkSession spark;
    private static JavaSparkContext jsc;
    private static String testGraphBasePath;
    private static String testOutputBasePath;
    private static String testDedupGraphBasePath;
    private static String testConsistencyGraphBasePath;
    private static final String testActionSetId = "test-orchestrator";
    private static String whitelistPath;
    private static List<String> whiteList;
    private static String WHITELIST_SEPARATOR;

    @BeforeAll
    public static void cleanUp() throws IOException, URISyntaxException {
        testGraphBasePath = Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/entities").toURI()).toFile().getAbsolutePath();
        testOutputBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        testDedupGraphBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        testConsistencyGraphBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        whitelistPath = Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/whitelist.simrels.txt").toURI()).toFile().getAbsolutePath();
        whiteList = IOUtils.readLines((Reader)new FileReader(whitelistPath));
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
        SparkConf conf = new SparkConf();
        conf.set("spark.sql.shuffle.partitions", "200");
        conf.set("spark.sql.warehouse.dir", testOutputBasePath + "/spark-warehouse");
        spark = SparkSession.builder().appName(SparkDedupTest.class.getSimpleName()).master("local[*]").config(conf).getOrCreate();
        jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
    }

    @BeforeEach
    public void setUp() throws IOException, ISLookUpException {
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)testActionSetId))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator.xml"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"organization"))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"publication"))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"software"))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/sw.curr.conf.json"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"dataset"))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/ds.curr.conf.json"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"otherresearchproduct"))).thenReturn((Object)SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/orp.curr.conf.json"));
    }

    @Test
    @Order(value=1)
    void createSimRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-np", "50"});
        new SparkCreateSimRels(parser, spark).run(this.isLookUpService);
        long orgs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"organization")).count();
        long pubs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"publication")).count();
        long sw_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"software")).count();
        long ds_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"dataset")).count();
        long orp_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"otherresearchproduct")).count();
        System.out.println("orgs_simrel = " + orgs_simrel);
        System.out.println("pubs_simrel = " + pubs_simrel);
        System.out.println("sw_simrel = " + sw_simrel);
        System.out.println("ds_simrel = " + ds_simrel);
        System.out.println("orp_simrel = " + orp_simrel);
        Assertions.assertEquals((long)720L, (long)orgs_simrel);
        Assertions.assertEquals((long)566L, (long)pubs_simrel);
        Assertions.assertEquals((long)113L, (long)sw_simrel);
        Assertions.assertEquals((long)148L, (long)ds_simrel);
        Assertions.assertEquals((long)280L, (long)orp_simrel);
    }

    @Test
    @Order(value=2)
    void whitelistSimRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/whitelistSimRels_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-np", "50", "-wl", whitelistPath});
        new SparkWhitelistSimRels(parser, spark).run(this.isLookUpService);
        long orgs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"organization")).count();
        long pubs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"publication")).count();
        long ds_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"dataset")).count();
        long orp_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"otherresearchproduct")).count();
        Dataset sw_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"software"));
        System.out.println("orgs_simrel = " + orgs_simrel);
        System.out.println("pubs_simrel = " + pubs_simrel);
        System.out.println("ds_simrel = " + ds_simrel);
        System.out.println("orp_simrel = " + orp_simrel);
        System.out.println("sw_simrel = " + sw_simrel.count());
        Assertions.assertEquals((long)720L, (long)orgs_simrel);
        Assertions.assertEquals((long)566L, (long)pubs_simrel);
        Assertions.assertEquals((long)148L, (long)ds_simrel);
        Assertions.assertEquals((long)280L, (long)orp_simrel);
        Assertions.assertEquals((long)115L, (long)sw_simrel.count());
        Assertions.assertTrue((sw_simrel.as(Encoders.bean(Relation.class)).toJavaRDD().filter((Function & Serializable)rel -> rel.getSource().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(0).split(WHITELIST_SEPARATOR)[1])).count() > 0L ? 1 : 0) != 0);
        Assertions.assertTrue((sw_simrel.as(Encoders.bean(Relation.class)).toJavaRDD().filter((Function & Serializable)rel -> rel.getSource().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[0]) && rel.getTarget().equalsIgnoreCase(whiteList.get(1).split(WHITELIST_SEPARATOR)[1])).count() > 0L ? 1 : 0) != 0);
    }

    @Test
    @Order(value=3)
    void cutMergeRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-cc", "3", "-h", ""});
        new SparkCreateMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        long pubs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        long sw_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        long orp_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        Assertions.assertEquals((long)0L, (long)orgs_mergerel);
        Assertions.assertEquals((long)0L, (long)pubs_mergerel);
        Assertions.assertEquals((long)0L, (long)sw_mergerel);
        Assertions.assertEquals((long)0L, (long)ds_mergerel);
        Assertions.assertEquals((long)0L, (long)orp_mergerel);
        FileUtils.deleteDirectory((File)new File(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel"));
        FileUtils.deleteDirectory((File)new File(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel"));
        FileUtils.deleteDirectory((File)new File(testOutputBasePath + "/" + testActionSetId + "/software_mergerel"));
        FileUtils.deleteDirectory((File)new File(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel"));
        FileUtils.deleteDirectory((File)new File(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel"));
    }

    @Test
    @Order(value=3)
    void createMergeRelsWithPivotHistoryTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"));
        spark.sql("CREATE DATABASE IF NOT EXISTS pivot_history_test");
        ModelSupport.oafTypes.keySet().forEach(entityType -> {
            try {
                spark.read().json(Paths.get(SparkDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/pivot_history").toURI()).toFile().getAbsolutePath()).write().mode("overwrite").saveAsTable("pivot_history_test." + entityType);
            }
            catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        });
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-h", "", "-pivotHistoryDatabase", "pivot_history_test"});
        new SparkCreateMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count();
        Dataset pubs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class));
        long sw_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").count();
        long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
        long orp_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").count();
        List merges = pubs.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)4, (int)merges.size());
        HashSet dups = Sets.newHashSet((Object[])new String[]{"50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c", "50|arXiv_dedup_::c93aeb433eb90ed7a86e29be00791b7c"});
        merges.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"merges", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getTarget()));
        });
        List mergedIn = pubs.filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)4, (int)mergedIn.size());
        mergedIn.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"isMergedIn", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getSource()));
        });
        System.out.println("orgs_mergerel = " + orgs_mergerel);
        System.out.println("pubs_mergerel = " + pubs.count());
        System.out.println("sw_mergerel = " + sw_mergerel);
        System.out.println("ds_mergerel = " + ds_mergerel);
        System.out.println("orp_mergerel = " + orp_mergerel);
        Assertions.assertEquals((long)1280L, (long)orgs_mergerel);
        Assertions.assertEquals((long)1158L, (long)pubs.count());
        Assertions.assertEquals((long)292L, (long)sw_mergerel);
        Assertions.assertEquals((long)476L, (long)ds_mergerel);
        Assertions.assertEquals((long)742L, (long)orp_mergerel);
    }

    @Test
    @Order(value=4)
    void createMergeRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-h", ""});
        new SparkCreateMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count();
        Dataset pubs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class));
        long sw_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").count();
        long ds_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").count();
        long orp_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").count();
        List merges = pubs.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)3, (int)merges.size());
        HashSet dups = Sets.newHashSet((Object[])new String[]{"50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"});
        merges.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"merges", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getTarget()));
        });
        List mergedIn = pubs.filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)3, (int)mergedIn.size());
        mergedIn.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"isMergedIn", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getSource()));
        });
        System.out.println("orgs_mergerel = " + orgs_mergerel);
        System.out.println("pubs_mergerel = " + pubs.count());
        System.out.println("sw_mergerel = " + sw_mergerel);
        System.out.println("ds_mergerel = " + ds_mergerel);
        System.out.println("orp_mergerel = " + orp_mergerel);
        Assertions.assertEquals((long)1280L, (long)orgs_mergerel);
        Assertions.assertEquals((long)1156L, (long)pubs.count());
        Assertions.assertEquals((long)292L, (long)sw_mergerel);
        Assertions.assertEquals((long)476L, (long)ds_mergerel);
        Assertions.assertEquals((long)742L, (long)orp_mergerel);
    }

    @Test
    @Order(value=5)
    void createDedupRecordTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath});
        new SparkCreateDedupRecord(parser, spark).run(this.isLookUpService);
        ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        Dataset pubs = spark.read().textFile(testOutputBasePath + "/" + testActionSetId + "/publication_deduprecord").map((MapFunction & Serializable)value -> (Publication)mapper.readValue(value, Publication.class), Encoders.bean(Publication.class));
        long orgs_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/organization_deduprecord").count();
        long sw_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/software_deduprecord").count();
        long ds_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/dataset_deduprecord").count();
        long orp_deduprecord = jsc.textFile(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_deduprecord").count();
        System.out.println("orgs_deduprecord = " + orgs_deduprecord);
        System.out.println("pubs_deduprecord = " + pubs.count());
        System.out.println("sw_deduprecord = " + sw_deduprecord);
        System.out.println("ds_deduprecord = " + ds_deduprecord);
        System.out.println("orp_deduprecord = " + orp_deduprecord);
        Assertions.assertEquals((long)87L, (long)orgs_deduprecord);
        Assertions.assertEquals((long)96L, (long)pubs.count());
        Assertions.assertEquals((long)47L, (long)sw_deduprecord);
        Assertions.assertEquals((long)97L, (long)ds_deduprecord);
        Assertions.assertEquals((long)92L, (long)orp_deduprecord);
        SparkDedupTest.verifyRoot_1(mapper, (Dataset<Publication>)pubs);
    }

    private static void verifyRoot_1(ObjectMapper mapper, Dataset<Publication> pubs) {
        Publication root = (Publication)pubs.filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").first();
        Assertions.assertNotNull((Object)root);
        Dataset publication = spark.read().textFile(DedupUtility.createEntityPath((String)testGraphBasePath, (String)"publication"));
        Publication crossref_duplicate = (Publication)publication.map((MapFunction & Serializable)value -> (Publication)mapper.readValue(value, Publication.class), Encoders.bean(Publication.class)).filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList().get(0);
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getName(), (Object)root.getJournal().getName());
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getIssnPrinted(), (Object)root.getJournal().getIssnPrinted());
        Assertions.assertEquals((Object)crossref_duplicate.getPublisher().getValue(), (Object)root.getPublisher().getValue());
        Set rootPids = root.getPid().stream().map(StructuredProperty::getValue).collect(Collectors.toCollection(HashSet::new));
        Set dupPids = crossref_duplicate.getPid().stream().map(StructuredProperty::getValue).collect(Collectors.toCollection(HashSet::new));
        Assertions.assertFalse((boolean)Sets.intersection((Set)rootPids, (Set)dupPids).isEmpty());
        Assertions.assertTrue((boolean)rootPids.contains("10.1109/jstqe.2022.3205716"));
        Optional<Instance> instance_cr = root.getInstance().stream().filter(i -> i.getCollectedfrom().getValue().equals("Crossref")).findFirst();
        Assertions.assertTrue((boolean)instance_cr.isPresent());
        Assertions.assertEquals((Object)"OPEN", (Object)instance_cr.get().getAccessright().getClassid());
        Assertions.assertEquals((Object)"Open Access", (Object)instance_cr.get().getAccessright().getClassname());
        Assertions.assertEquals((Object)OpenAccessRoute.hybrid, (Object)instance_cr.get().getAccessright().getOpenAccessRoute());
        Assertions.assertEquals((Object)"IEEE Journal of Selected Topics in Quantum Electronics", (Object)instance_cr.get().getHostedby().getValue());
        Assertions.assertEquals((Object)"0001", (Object)instance_cr.get().getInstancetype().getClassid());
        Assertions.assertEquals((Object)"Article", (Object)instance_cr.get().getInstancetype().getClassname());
    }

    @Test
    @Order(value=6)
    void updateEntityTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json"));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath});
        new SparkUpdateEntity(parser, spark).run(this.isLookUpService);
        long organizations = jsc.textFile(testDedupGraphBasePath + "/organization").count();
        long publications = jsc.textFile(testDedupGraphBasePath + "/publication").count();
        long projects = jsc.textFile(testDedupGraphBasePath + "/project").count();
        long datasource = jsc.textFile(testDedupGraphBasePath + "/datasource").count();
        long softwares = jsc.textFile(testDedupGraphBasePath + "/software").count();
        long dataset = jsc.textFile(testDedupGraphBasePath + "/dataset").count();
        long otherresearchproduct = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").count();
        long mergedOrgs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedPubs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/publication_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedSw = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/software_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedDs = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/dataset_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        long mergedOrp = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/otherresearchproduct_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").javaRDD().map(Relation::getTarget).distinct().count();
        System.out.println("publications = " + publications);
        System.out.println("organizations = " + organizations);
        System.out.println("projects = " + projects);
        System.out.println("datasource = " + datasource);
        System.out.println("software = " + softwares);
        System.out.println("dataset = " + dataset);
        System.out.println("otherresearchproduct = " + otherresearchproduct);
        Assertions.assertEquals((long)930L, (long)publications);
        Assertions.assertEquals((long)840L, (long)organizations);
        Assertions.assertEquals((long)100L, (long)projects);
        Assertions.assertEquals((long)100L, (long)datasource);
        Assertions.assertEquals((long)196L, (long)softwares);
        Assertions.assertEquals((long)389L, (long)dataset);
        Assertions.assertEquals((long)520L, (long)otherresearchproduct);
        long deletedOrgs = jsc.textFile(testDedupGraphBasePath + "/organization").filter(this::isDeletedByInference).count();
        long deletedPubs = jsc.textFile(testDedupGraphBasePath + "/publication").filter(this::isDeletedByInference).count();
        long deletedSw = jsc.textFile(testDedupGraphBasePath + "/software").filter(this::isDeletedByInference).count();
        long deletedDs = jsc.textFile(testDedupGraphBasePath + "/dataset").filter(this::isDeletedByInference).count();
        long deletedOrp = jsc.textFile(testDedupGraphBasePath + "/otherresearchproduct").filter(this::isDeletedByInference).count();
        Assertions.assertEquals((long)mergedOrgs, (long)deletedOrgs);
        Assertions.assertEquals((long)mergedPubs, (long)deletedPubs);
        Assertions.assertEquals((long)mergedSw, (long)deletedSw);
        Assertions.assertEquals((long)mergedDs, (long)deletedDs);
        Assertions.assertEquals((long)mergedOrp, (long)deletedOrp);
    }

    @Test
    @Order(value=6)
    void copyRelationsNoOpenorgsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCopyRelationsNoOpenorgs.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-w", testOutputBasePath, "-o", testDedupGraphBasePath});
        new SparkCopyRelationsNoOpenorgs(parser, spark).run(this.isLookUpService);
        Dataset outputRels = spark.read().text(testDedupGraphBasePath + "/relation");
        System.out.println(outputRels.count());
    }

    @Test
    @Order(value=7)
    void propagateRelationTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkDedupTest.classPathResourceAsString("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json"));
        parser.parseArgument(new String[]{"-i", testDedupGraphBasePath, "-w", testOutputBasePath, "-o", testConsistencyGraphBasePath});
        new SparkPropagateRelation(parser, spark).run(this.isLookUpService);
        long relations = jsc.textFile(testDedupGraphBasePath + "/relation").count();
        System.out.println("relations = " + relations);
        Dataset mergeRels = spark.read().load(DedupUtility.createMergeRelPath((String)testOutputBasePath, (String)"*", (String)"*")).as(Encoders.bean(Relation.class));
        Dataset inputRels = spark.read().json(testDedupGraphBasePath + "/relation");
        Dataset outputRels = spark.read().json(testConsistencyGraphBasePath + "/relation");
        Assertions.assertEquals((long)0L, (long)outputRels.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true").count());
        Assertions.assertEquals((long)5L, (long)outputRels.filter("relClass NOT IN ('merges', 'isMergedIn')").count());
        Assertions.assertEquals((long)(5L + mergeRels.count()), (long)outputRels.count());
    }

    @Test
    @Order(value=8)
    void testCleanedPropagatedRelations() throws Exception {
        Dataset df_before = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testDedupGraphBasePath + "/relation");
        Dataset df_after = spark.read().schema(Encoders.bean(Relation.class).schema()).json(testConsistencyGraphBasePath + "/relation");
        Assertions.assertNotEquals((long)df_before.count(), (long)df_after.count());
        Assertions.assertEquals((long)0L, (long)df_after.filter("dataInfo.deletedbyinference == true OR dataInfo.invisible == true").count());
        Assertions.assertEquals((long)5L, (long)df_after.filter("relClass NOT IN ('merges', 'isMergedIn')").count());
    }

    @Test
    @Order(value=10)
    void testRelations() throws Exception {
        this.testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_1.json", 12, 10);
        this.testUniqueness("/eu/dnetlib/dhp/dedup/test/relation_2.json", 10, 2);
    }

    private void testUniqueness(String path, int expected_total, int expected_unique) {
        Dataset rel = spark.read().textFile(this.getClass().getResource(path).getPath()).map((MapFunction & Serializable)s -> (Relation)new ObjectMapper().readValue(s, Relation.class), Encoders.bean(Relation.class));
        Assertions.assertEquals((long)expected_total, (long)rel.count());
        Assertions.assertEquals((long)expected_unique, (long)rel.distinct().count());
    }

    @AfterAll
    public static void finalCleanUp() throws IOException {
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
        FileUtils.deleteDirectory((File)new File(testConsistencyGraphBasePath));
    }

    public boolean isDeletedByInference(String s) {
        return s.contains("\"deletedbyinference\":true");
    }

    private static String classPathResourceAsString(String path) throws IOException {
        return IOUtils.toString((InputStream)SparkDedupTest.class.getResourceAsStream(path));
    }

    static {
        WHITELIST_SEPARATOR = "####";
    }
}

