/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.merge;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class GroupEntitiesSparkJob {
    private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
    private static final Encoder<OafEntity> OAFENTITY_KRYO_ENC = Encoders.kryo(OafEntity.class);
    private ArgumentApplicationParser parser;

    public GroupEntitiesSparkJob(ArgumentApplicationParser parser) {
        this.parser = parser;
    }

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)GroupEntitiesSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/merge/group_graph_entities_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String isLookupUrl = parser.get("isLookupUrl");
        log.info("isLookupUrl: {}", (Object)isLookupUrl);
        ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService(isLookupUrl);
        new GroupEntitiesSparkJob(parser).run(isSparkSessionManaged, isLookupService);
    }

    public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) throws ISLookUpException {
        String graphInputPath = this.parser.get("graphInputPath");
        log.info("graphInputPath: {}", (Object)graphInputPath);
        String checkpointPath = this.parser.get("checkpointPath");
        log.info("checkpointPath: {}", (Object)checkpointPath);
        String outputPath = this.parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        boolean filterInvisible = Boolean.parseBoolean(this.parser.get("filterInvisible"));
        log.info("filterInvisible: {}", (Object)filterInvisible);
        SparkConf conf = new SparkConf();
        VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS(isLookUpService);
        SparkSessionSupport.runWithSparkSession(conf, isSparkSessionManaged, spark -> {
            HdfsSupport.remove(checkpointPath, spark.sparkContext().hadoopConfiguration());
            GroupEntitiesSparkJob.groupEntities(spark, graphInputPath, checkpointPath, outputPath, filterInvisible, vocs);
        });
    }

    private static void groupEntities(SparkSession spark, String inputPath, String checkpointPath, String outputPath, boolean filterInvisible, VocabularyGroup vocs) {
        Dataset allEntities = spark.emptyDataset(OAFENTITY_KRYO_ENC);
        for (Map.Entry entry : ModelSupport.entityTypes.entrySet()) {
            String string = ((EntityType)entry.getKey()).name();
            Class entityClass = (Class)entry.getValue();
            String entityInputPath = inputPath + "/" + string;
            if (!HdfsSupport.exists(entityInputPath, spark.sparkContext().hadoopConfiguration())) continue;
            allEntities = allEntities.union(spark.read().schema(Encoders.bean((Class)entityClass).schema()).json(entityInputPath).filter("length(id) > 0").as(Encoders.bean((Class)entityClass)).map((MapFunction & Serializable)r -> r, OAFENTITY_KRYO_ENC));
        }
        Dataset groupedEntities = allEntities.map((MapFunction & Serializable)entity -> GraphCleaningFunctions.applyCoarVocabularies(entity, vocs), OAFENTITY_KRYO_ENC).groupByKey(OafEntity::getId, Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(key, group) -> (OafEntity)MergeUtils.mergeById(group, vocs), OAFENTITY_KRYO_ENC).filter((FilterFunction & Serializable)e -> !filterInvisible || e.getDataInfo() == null || e.getDataInfo().getInvisible() == false).map((MapFunction & Serializable)t -> new Tuple2((Object)t.getClass().getName(), t), Encoders.tuple((Encoder)Encoders.STRING(), OAFENTITY_KRYO_ENC));
        for (Map.Entry entry : ModelSupport.entityTypes.entrySet()) {
            String entity3 = ((EntityType)entry.getKey()).name();
            Class entityClass = (Class)entry.getValue();
            groupedEntities = groupedEntities.withColumn(entity3, functions.when((Column)functions.col((String)"_1").equalTo((Object)entityClass.getName()), (Object)functions.col((String)"_2")));
        }
        Dataset dataset = groupedEntities.drop(new String[]{"_1", "_2"}).persist();
        for (Map.Entry e4 : ModelSupport.entityTypes.entrySet()) {
            String entity4 = ((EntityType)e4.getKey()).name();
            Class entityClass = (Class)e4.getValue();
            groupedEntities.select(new Column[]{functions.col((String)entity4).as("value")}).filter("value IS NOT NULL").as(OAFENTITY_KRYO_ENC).map((MapFunction & Serializable)r -> r, Encoders.bean((Class)entityClass)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + "/" + entity4);
        }
    }
}

