/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.bulktag;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.bulktag.SparkBulkTagJob;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkTagJobTest {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final String pathMap = "{\"protoMap\":{\"author\":{\"path\":\"$['author'][*]['fullname']\"}, \"title\":{\"path\":\"$['title'][*]['value']\"},  \"orcid\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid')]['value']\"} ,  \"orcid_pending\":{\"path\":\"$['author'][*]['pid'][*][?(@['qualifier']['classid']=='orcid_pending')]['value']\"} ,\"contributor\" : {\"path\":\"$['contributor'][*]['value']\"}, \"description\" : {\"path\":\"$['description'][*]['value']\"}, \"subject\" :{\"path\":\"$['subject'][*]['value']\"},  \"fos\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"} , \"sdg\" : {\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"},\"journal\":{\"path\":\"$['journal'].name\"},\"hostedby\":{\"path\":\"$['instance'][*]['hostedby']['key']\"},\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"},\"publisher\":{\"path\":\"$['publisher'].value\"},\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\",  \"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\",\"method\":\"execSubstring\",\"params\":[{\"paramName\":\"From\",  \"paramValue\":0}, {\"paramName\":\"To\",\"paramValue\":4}]}}}}";
    private static SparkSession spark;
    private static java.nio.file.Path workingDir;
    private static final Logger log;
    private static String taggingConf;

    @BeforeAll
    public static void beforeAll() throws IOException {
        workingDir = Files.createTempDirectory(BulkTagJobTest.class.getSimpleName(), new FileAttribute[0]);
        log.info("using work dir {}", (Object)workingDir);
        SparkConf conf = new SparkConf();
        conf.setAppName(BulkTagJobTest.class.getSimpleName());
        conf.setMaster("local[*]");
        conf.set("spark.driver.host", "localhost");
        conf.set("hive.metastore.local", "true");
        conf.set("spark.ui.enabled", "false");
        conf.set("spark.sql.warehouse.dir", workingDir.toString());
        conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
        spark = SparkSession.builder().appName(BulkTagJobTest.class.getSimpleName()).config(conf).getOrCreate();
    }

    @AfterAll
    public static void afterAll() throws IOException {
        FileUtils.deleteDirectory((File)workingDir.toFile());
        spark.stop();
    }

    @Test
    void noUpdatesTest() throws Exception {
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(), "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        Assertions.assertEquals((long)0L, (long)spark.sql(query).count());
    }

    @Test
    void bulktagBySubjectNoPreviousContextTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/nocontext/").getPath();
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        Assertions.assertEquals((long)5L, (long)spark.sql(query).count());
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.filter("provenance = 'community:subject'").count());
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Subject'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'covid-19'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'mes'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'fam'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'aginfra'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'covid-19' and id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("(community = 'covid-19' or community = 'aginfra') and id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("(community = 'mes' or community = 'fam') and id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62'").count());
    }

    @Test
    void bulktagBySubjectPreviousContextNoProvenanceTest() throws Exception {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.copyFromLocalFile(false, new Path(this.getClass().getResource("/eu/dnetlib/dhp/bulktag/pathMap/").getPath()), new Path(workingDir.toString() + "/data/bulktagging/protoMap"));
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/").getPath();
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap", "-nameNode", "local"});
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyT.id = 'covid-19' ";
        Assertions.assertEquals((long)3L, (long)spark.sql(query).count());
        org.apache.spark.sql.Dataset communityContext = spark.sql(query);
        Assertions.assertEquals((long)2L, (long)communityContext.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529'").count());
        Assertions.assertEquals((long)1L, (long)communityContext.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'community:subject'").count());
        Assertions.assertEquals((long)1L, (long)communityContext.filter("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and provenance = 'propagation:community:productsthroughsemrel'").count());
        query = "select id, MyT.id community, size(MyT.datainfo) datainfosize from dataset lateral view explode (context) as MyT where size(MyT.datainfo) > 0";
        Assertions.assertEquals((int)2, (int)((Row)spark.sql(query).select("datainfosize", new String[0]).where("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and community = 'covid-19'").collectAsList().get(0)).getInt(0));
    }

    @Test
    void bulktagByDatasourceTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/publication").map((Function & Serializable)item -> (Publication)OBJECT_MAPPER.readValue(item, Publication.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Publication.class));
        verificationDataset.createOrReplaceTempView("publication");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from publication lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.filter("provenance = 'community:datasource'").count());
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Datasource'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("community = 'fam'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'aginfra'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("community = 'fam' and (id = '50|ec_fp7health::000085c89f4b96dc2269bd37edb35306' or id = '50|ec_fp7health::000b9e61f83f5a4b0c35777b7bccdf38' or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'aginfra' and (id = '50|ec_fp7health::000c8195edd542e4e64ebb32172cbf89' or id = '50|ec_fp7health::0010eb63e181e3e91b8b6dc6b3e1c798')").count());
    }

    @Test
    void datasourceTag() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/").getPath();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.copyFromLocalFile(false, new Path(this.getClass().getResource("/eu/dnetlib/dhp/bulktag/pathMap/").getPath()), new Path(workingDir.toString() + "/data/bulktagging/protoMap"));
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", "-nameNode", "local"});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/datasource").map((Function & Serializable)item -> (Datasource)OBJECT_MAPPER.readValue(item, Datasource.class));
        Assertions.assertEquals((long)3L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Datasource.class));
        verificationDataset.createOrReplaceTempView("datasource");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from datasource lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        idExplodeCommunity.show(false);
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("provenance = 'community:datasource'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Datasource'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'dh-ch'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'clarin'").count());
    }

    @Test
    void organizationTag() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/").getPath();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.copyFromLocalFile(false, new Path(this.getClass().getResource("/eu/dnetlib/dhp/bulktag/pathMap/").getPath()), new Path(workingDir.toString() + "/data/bulktagging/protoMap"));
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", "-nameNode", "local"});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/organization").map((Function & Serializable)item -> (Organization)OBJECT_MAPPER.readValue(item, Organization.class));
        Assertions.assertEquals((long)4L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Organization.class));
        verificationDataset.createOrReplaceTempView("organization");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from organization lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        idExplodeCommunity.show(false);
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("provenance = 'community:organization'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Organization'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'netherlands'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'beopen'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'mes'").count());
    }

    @Test
    void projectTag() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/publication/update_datasource/").getPath();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.copyFromLocalFile(false, new Path(this.getClass().getResource("/eu/dnetlib/dhp/bulktag/pathMap/").getPath()), new Path(workingDir.toString() + "/data/bulktagging/protoMap"));
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-baseURL", "https://services.openaire.eu/openaire/community/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", "-nameNode", "local"});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/project").map((Function & Serializable)item -> (Project)OBJECT_MAPPER.readValue(item, Project.class));
        Assertions.assertEquals((long)4L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Project.class));
        verificationDataset.createOrReplaceTempView("project");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from project lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        idExplodeCommunity.show(false);
        Assertions.assertEquals((long)4L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)4L, (long)idExplodeCommunity.filter("provenance = 'community:project'").count());
        Assertions.assertEquals((long)4L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Project'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'enermaps'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'clarin'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'dh-ch'").count());
    }

    @Test
    void bulktagByZenodoCommunityTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/otherresearchproduct/update_zenodocommunity/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(OtherResearchProduct.class));
        verificationDataset.createOrReplaceTempView("orp");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from orp lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)8L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)8L, (long)idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
        Assertions.assertEquals((long)8L, (long)idExplodeCommunity.filter("name = 'Bulktagging for Community - Zenodo'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'covid-19'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'aginfra'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'beopen'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'fam'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'mes'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("id = '50|od______2017::0750a4d0782265873d669520f5e33c07' and community = 'covid-19'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("id = '50|od______2017::1bd97baef19dbd2db3203b112bb83bc5' and (community = 'aginfra' or community = 'mes' or community = 'fam')").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("id = '50|od______2017::1e400f1747487fd15998735c41a55c72' and community = 'beopen'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and (community = 'beopen' or community = 'fam' or community = 'mes')").count());
        query = "select id, MyT.id community, size(MyT.datainfo) datainfosize from orp lateral view explode (context) as MyT where size(MyT.datainfo) > 0";
        Assertions.assertEquals((int)2, (int)((Row)spark.sql(query).select("datainfosize", new String[0]).where("id = '50|od______2017::210281c5bc1c739a11ccceeeca806396' and community = 'beopen'").collectAsList().get(0)).getInt(0));
        query = "select id, MyT.id community from orp lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD ";
        org.apache.spark.sql.Dataset tmp2 = spark.sql(query);
        Assertions.assertEquals((long)0L, (long)tmp2.select("community", new String[0]).where(tmp2.col("community").contains((Object)"zenodo.org/communities/")).count());
    }

    @Test
    void bulktagBySubjectDatasourceTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject_datasource/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)7L, (long)idExplodeCommunity.count());
        Assertions.assertEquals((long)5L, (long)idExplodeCommunity.filter("provenance = 'community:subject'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("provenance = 'community:datasource'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'covid-19'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'fam'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("community = 'aginfra'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'mes'").count());
        query = "select id, MyT.id community, size(MyT.datainfo) datainfosize from dataset lateral view explode (context) as MyT where size(MyT.datainfo) > 0";
        org.apache.spark.sql.Dataset tmp2 = spark.sql(query);
        Assertions.assertEquals((int)2, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and community = 'aginfra'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)1, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::05d8c751462f9bb8d2b06956dfbc5c7b' and community = 'covid-19'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)2, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and community = 'fam'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)2, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529' and community = 'covid-19'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)1, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and community = 'fam'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)1, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______3989::0f89464c4ac4c398fe0c71433b175a62' and community = 'mes'").collectAsList().get(0)).getInt(0));
    }

    @Test
    void bulktagBySubjectDatasourceZenodoCommunityTest() throws Exception {
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/software/").getPath(), "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/software").map((Function & Serializable)item -> (Software)OBJECT_MAPPER.readValue(item, Software.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Software.class));
        verificationDataset.createOrReplaceTempView("software");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from software lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)10L, (long)idExplodeCommunity.count());
        idExplodeCommunity.show(false);
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("provenance = 'community:subject'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("provenance = 'community:datasource'").count());
        Assertions.assertEquals((long)4L, (long)idExplodeCommunity.filter("provenance = 'community:zenodocommunity'").count());
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("community = 'covid-19'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'dh-ch'").count());
        Assertions.assertEquals((long)4L, (long)idExplodeCommunity.filter("community = 'aginfra'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'dth'").count());
        Assertions.assertEquals((long)1L, (long)idExplodeCommunity.filter("community = 'fam'").count());
        Assertions.assertEquals((long)2L, (long)idExplodeCommunity.filter("provenance = 'community:zenodocommunity' and id = '50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4' and (community = 'dh-ch' or community = 'dth')").count());
        query = "select id, MyT.id community, size(MyT.datainfo) datainfosize from software lateral view explode (context) as MyT where size(MyT.datainfo) > 0";
        org.apache.spark.sql.Dataset tmp2 = spark.sql(query);
        Assertions.assertEquals((int)2, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______1582::501b25d420f808c8eddcd9b16e917f11' and community = 'covid-19'").collectAsList().get(0)).getInt(0));
        Assertions.assertEquals((int)3, (int)((Row)tmp2.select("datainfosize", new String[0]).where("id = '50|od______1582::581621232a561b7e8b4952b18b8b0e56' and community = 'aginfra'").collectAsList().get(0)).getInt(0));
    }

    @Test
    void bulktagDatasourcewithConstraintsTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)12L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        idExplodeCommunity.show(false);
    }

    @Test
    void bulkTagOtherJupyter() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        Assertions.assertEquals((long)10L, (long)sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).count());
        Assertions.assertEquals((long)0L, (long)sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).filter((Function & Serializable)orp -> orp.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
        Assertions.assertEquals((long)0L, (long)sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class)).filter((Function & Serializable)orp -> orp.getSubject().stream().anyMatch(eig -> eig.getValue().equals("EOSC::Jupyter Notebook"))).count());
    }

    @Test
    public void bulkTagDatasetJupyter() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        Assertions.assertEquals((long)10L, (long)sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class)).count());
        Assertions.assertEquals((long)0L, (long)sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class)).filter((Function & Serializable)ds -> ds.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Jupyter Notebook"))).count());
        Assertions.assertEquals((long)0L, (long)sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class)).filter((Function & Serializable)ds -> ds.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook"))).count());
    }

    @Test
    public void bulkTagSoftwareJupyter() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/jupyter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/software").map((Function & Serializable)item -> (Software)OBJECT_MAPPER.readValue(item, Software.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        Assertions.assertEquals((long)4L, (long)tmp.filter((Function & Serializable)s -> s.getEoscifguidelines() != null).filter((Function & Serializable)s -> s.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Jupyter Notebook"))).count());
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getSubject().size());
        Assertions.assertTrue((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getEoscifguidelines().stream().anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)5, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)0, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertEquals((int)8, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertTrue((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getEoscifguidelines().stream().anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)5, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)0, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::5aec1186054301b66c0c5dc35972a589")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertEquals((int)8, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Jupyter Notebook")));
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertTrue((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::639909adfad9d708308f2aedb733e4a0")).collect().get(0)).getEoscifguidelines().stream().anyMatch(s -> s.getCode().equals("EOSC::Jupyter Notebook")));
        List subjects = ((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::6e7a9b21a2feef45673890432af34244")).collect().get(0)).getSubject();
        Assertions.assertEquals((int)7, (int)subjects.size());
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("jupyter")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("Modeling and Simulation")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("structure granulaire")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("algorithme")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("simulation num\u00e9rique")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("flux de gaz")));
        Assertions.assertTrue((boolean)subjects.stream().anyMatch(s -> s.getValue().equals("flux de liquide")));
    }

    @Test
    void galaxyOtherTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD orp = sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
        Assertions.assertEquals((long)10L, (long)orp.count());
        Assertions.assertEquals((long)0L, (long)orp.filter((Function & Serializable)s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow"))).count());
        orp.foreach((VoidFunction & Serializable)o -> System.out.println(OBJECT_MAPPER.writeValueAsString(o)));
        Assertions.assertEquals((long)1L, (long)orp.filter((Function & Serializable)o -> o.getEoscifguidelines() != null).filter((Function & Serializable)o -> o.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow"))).count());
        Assertions.assertEquals((int)2, (int)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
        Assertions.assertEquals((int)1, (int)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertTrue((boolean)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::0750a4d0782265873d669520f5e33c07")).collect().get(0)).getEoscifguidelines().stream().anyMatch(s -> s.getCode().equals("EOSC::Galaxy Workflow")));
        Assertions.assertEquals((int)2, (int)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::1bd97baef19dbd2db3203b112bb83bc5")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
        Assertions.assertEquals((int)2, (int)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((OtherResearchProduct)orp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______2017::1e400f1747487fd15998735c41a55c72")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
    }

    @Test
    void galaxySoftwareTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/galaxy/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/software").map((Function & Serializable)item -> (Software)OBJECT_MAPPER.readValue(item, Software.class));
        Assertions.assertEquals((long)11L, (long)tmp.count());
        Assertions.assertEquals((long)0L, (long)tmp.filter((Function & Serializable)s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Galaxy Workflow"))).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)s -> s.getEoscifguidelines().size() > 0).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)s -> s.getEoscifguidelines().size() > 0).filter((Function & Serializable)s -> s.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow"))).count());
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
        Assertions.assertEquals((int)1, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getEoscifguidelines().size());
        Assertions.assertTrue((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::4132f5ec9496f0d6adc7b00a50a56ff4")).collect().get(0)).getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Galaxy Workflow")));
        Assertions.assertEquals((int)5, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::501b25d420f808c8eddcd9b16e917f11")).collect().get(0)).getSubject().size());
        Assertions.assertEquals((int)8, (int)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getSubject().size());
        Assertions.assertFalse((boolean)((Software)tmp.filter((Function & Serializable)sw -> sw.getId().equals("50|od______1582::581621232a561b7e8b4952b18b8b0e56")).collect().get(0)).getSubject().stream().anyMatch(s -> s.getValue().equals("EOSC::Galaxy Workflow")));
    }

    @Test
    void twitterDatasetTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD dats = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)11L, (long)dats.count());
        Assertions.assertEquals((long)3L, (long)dats.filter((Function & Serializable)s -> s.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data"))).count());
    }

    @Test
    void twitterOtherTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD orp = sc.textFile(workingDir.toString() + "/otherresearchproduct").map((Function & Serializable)item -> (OtherResearchProduct)OBJECT_MAPPER.readValue(item, OtherResearchProduct.class));
        Assertions.assertEquals((long)10L, (long)orp.count());
        Assertions.assertEquals((long)0L, (long)orp.filter((Function & Serializable)s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data"))).count());
        Assertions.assertEquals((long)3L, (long)orp.filter((Function & Serializable)s -> s.getEoscifguidelines().stream().anyMatch(eig -> eig.getCode().equals("EOSC::Twitter Data"))).count());
    }

    @Test
    void twitterSoftwareTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/eosctag/twitter/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/software").map((Function & Serializable)item -> (Software)OBJECT_MAPPER.readValue(item, Software.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        Assertions.assertEquals((long)0L, (long)tmp.filter((Function & Serializable)s -> s.getSubject().stream().anyMatch(sbj -> sbj.getValue().equals("EOSC::Twitter Data"))).count());
    }

    @Test
    void EoscContextTagTest() throws Exception {
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/eosc/dataset/").getPath();
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)8L, (long)tmp.count());
        Assertions.assertEquals((long)4L, (long)tmp.filter((Function & Serializable)s -> s.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)d -> d.getId().equals("50|475c1990cbb2::0fecfb874d9395aa69d2f4d7cd1acbea") && d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)d -> d.getId().equals("50|475c1990cbb2::3185cd5d8a2b0a06bb9b23ef11748eb1") && d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)d -> d.getId().equals("50|475c1990cbb2::3894c94123e96df8a21249957cf160cb") && d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count());
        Assertions.assertEquals((long)1L, (long)tmp.filter((Function & Serializable)d -> d.getId().equals("50|475c1990cbb2::449f28eefccf9f70c04ad70d61e041c7") && d.getContext().stream().anyMatch(c -> c.getId().equals("eosc"))).count());
    }

    @Test
    void removeTest() throws Exception {
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_datasourcewithconstraints/").getPath(), "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)12L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid provenance, MyD.provenanceaction.classname name from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset idExplodeCommunity = spark.sql(query);
        Assertions.assertEquals((long)3L, (long)idExplodeCommunity.filter("community = 'dth'").count());
    }

    @Test
    void newConfTest() throws Exception {
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/no_updates/").getPath(), "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap, "-taggingConf", taggingConf});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        Assertions.assertEquals((long)0L, (long)spark.sql(query).count());
    }

    @Test
    void pubdateTest() throws Exception {
        String pathMap = pathMap;
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/publicationyear/").getPath(), "-taggingConf", IOUtils.toString((InputStream)BulkTagJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_publicationdate.xml")), "-outputPath", workingDir.toString() + "/", "-pathMap", pathMap});
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD tmp = sc.textFile(workingDir.toString() + "/dataset").map((Function & Serializable)item -> (Dataset)OBJECT_MAPPER.readValue(item, Dataset.class));
        Assertions.assertEquals((long)10L, (long)tmp.count());
        org.apache.spark.sql.Dataset verificationDataset = spark.createDataset(tmp.rdd(), Encoders.bean(Dataset.class));
        verificationDataset.createOrReplaceTempView("dataset");
        String query = "select id, MyT.id community, MyD.provenanceaction.classid from dataset lateral view explode(context) c as MyT lateral view explode(MyT.datainfo) d as MyD where MyD.inferenceprovenance = 'bulktagging'";
        org.apache.spark.sql.Dataset queryResult = spark.sql(query);
        queryResult.show(false);
        Assertions.assertEquals((long)5L, (long)queryResult.count());
        Assertions.assertEquals((long)1L, (long)queryResult.filter((FilterFunction & Serializable)r -> r.getAs("id").equals("50|od______3989::02dd5d2c222191b0b9bd4f33c8e96529")).count());
        Assertions.assertEquals((long)1L, (long)queryResult.filter((FilterFunction & Serializable)r -> r.getAs("id").equals("50|od______3989::2f4f3c820c450bd08dac08d07cc82dcf")).count());
        Assertions.assertEquals((long)1L, (long)queryResult.filter((FilterFunction & Serializable)r -> r.getAs("id").equals("50|od______3989::7fcbe3a03280663cddebfd3cb9203177")).count());
        Assertions.assertEquals((long)1L, (long)queryResult.filter((FilterFunction & Serializable)r -> r.getAs("id").equals("50|od______3989::d791339867bec6d3eb2104deeb4e4961")).count());
        Assertions.assertEquals((long)1L, (long)queryResult.filter((FilterFunction & Serializable)r -> r.getAs("id").equals("50|od______3989::d90d3a1f64ad264b5ebed8a35b280343")).count());
    }

    @Test
    public void prova() throws Exception {
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        fs.copyFromLocalFile(false, new Path(this.getClass().getResource("/eu/dnetlib/dhp/bulktag/pathMap/").getPath()), new Path(workingDir.toString() + "/data/bulktagging/protoMap"));
        String sourcePath = this.getClass().getResource("/eu/dnetlib/dhp/bulktag/sample/dataset/update_subject/contextnoprovenance/").getPath();
        ProtoMap prova = (ProtoMap)new Gson().fromJson("{\"author\":{\"path\":\"$['author'][]['fullname']\"},\"title\":{\"path\":\"$['title'][]['value']\"},\"orcid\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid')]['value']\"},\"orcid_pending\":{\"path\":\"$['author'][]['pid'][][?(@['qualifier']['classid']=='orcid_pending')]['value']\"},\"contributor\":{\"path\":\"$['contributor'][]['value']\"},\"description\":{\"path\":\"$['description'][]['value']\"},\"subject\":{\"path\":\"$['subject'][]['value']\"},\"fos\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='FOS')].value\"},\"sdg\":{\"path\":\"$['subject'][?(@['qualifier']['classid']=='SDG')].value\"},\"journal\":{\"path\":\"$['journal'].name\"},\"hostedby\":{\"path\":\"$['instance'][]['hostedby']['key']\"},\"collectedfrom\":{\"path\":\"$['instance'][*]['collectedfrom']['key']\"},\"publisher\":{\"path\":\"$['publisher'].value\"},\"publicationyear\":{\"path\":\"$['dateofacceptance'].value\",\"action\":{\"clazz\":\"eu.dnetlib.dhp.bulktag.actions.ExecSubstringAction\",\"method\":\"execSubstring\",\"params\":[{\"paramName\":\"From\",\"paramValue\":0},{\"paramName\":\"To\",\"paramValue\":4}]}}}", ProtoMap.class);
        SparkBulkTagJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-sourcePath", sourcePath, "-taggingConf", taggingConf, "-outputPath", workingDir.toString() + "/", "-pathMap", workingDir.toString() + "/data/bulktagging/protoMap/pathMap", "-baseURL", "none", "-nameNode", "local"});
    }

    @Test
    public void testApi() throws IOException {
        String baseURL = "https://services.openaire.eu/openaire/community/";
        List subcommunities = Utils.getSubcommunities((String)"clarin", (String)baseURL);
        CommunityConfiguration tmp = Utils.getCommunityConfiguration((String)baseURL);
        System.out.println(new ObjectMapper().writeValueAsString((Object)Utils.getOrganizationCommunityMap((String)baseURL)));
        System.out.println(new ObjectMapper().writeValueAsString((Object)Utils.getDatasourceCommunities((String)baseURL)));
    }

    @Test
    public void getConfigurationApi() throws IOException {
        String baseURL = "https://services.openaire.eu/openaire/community/";
        List subcommunities = Utils.getSubcommunities((String)"clarin", (String)baseURL);
        CommunityConfiguration cc = Utils.getCommunityConfiguration((String)baseURL);
        cc.getCommunities().keySet().forEach(c -> {
            try {
                System.out.println(new ObjectMapper().writeValueAsString(cc.getCommunities().get(c)));
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        });
    }

    static {
        log = LoggerFactory.getLogger(BulkTagJobTest.class);
        taggingConf = "";
        try {
            taggingConf = IOUtils.toString((InputStream)BulkTagJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/bulktag/communityconfiguration/tagging_conf_remove.xml"));
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }
}

