/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;

import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.FOSDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Subject;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrepareFOSSparkJob
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(PrepareFOSSparkJob.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareFOSSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath");
        log.info("sourcePath: {}", (Object)sourcePath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> PrepareFOSSparkJob.distributeFOSdois(spark, sourcePath, outputPath));
    }

    private static void distributeFOSdois(SparkSession spark, String sourcePath, String outputPath) {
        Dataset<FOSDataModel> fosDataset = Constants.readPath(spark, sourcePath, FOSDataModel.class);
        fosDataset.groupByKey((MapFunction & Serializable)v -> v.getDoi().toLowerCase(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            Result r = new Result();
            FOSDataModel first = (FOSDataModel)it.next();
            r.setId(DHPUtils.generateUnresolvedIdentifier((String)k, (String)"doi"));
            HashSet<String> level1 = new HashSet<String>();
            HashSet<String> level2 = new HashSet<String>();
            HashSet<String> level3 = new HashSet<String>();
            HashSet<String> level4 = new HashSet<String>();
            PrepareFOSSparkJob.addLevels(level1, level2, level3, level4, first);
            it.forEachRemaining(v -> PrepareFOSSparkJob.addLevels(level1, level2, level3, level4, v));
            ArrayList sbjs = new ArrayList();
            level1.forEach(l -> PrepareFOSSparkJob.add(sbjs, Constants.getSubject(l, "FOS", "Fields of Science and Technology classification", "subject:fos")));
            level2.forEach(l -> PrepareFOSSparkJob.add(sbjs, Constants.getSubject(l, "FOS", "Fields of Science and Technology classification", "subject:fos")));
            level3.forEach(l -> PrepareFOSSparkJob.add(sbjs, Constants.getSubject(l, "FOS", "Fields of Science and Technology classification", "subject:fos", true)));
            level4.forEach(l -> PrepareFOSSparkJob.add(sbjs, Constants.getSubject(l, "FOS", "Fields of Science and Technology classification", "subject:fos", true)));
            r.setSubject(sbjs);
            r.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, null, (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"sysimport:enrich", null, (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), null));
            return r;
        }, Encoders.bean(Result.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + "/fos");
    }

    private static void add(List<Subject> sbsjs, Subject sbj) {
        if (sbj != null) {
            sbsjs.add(sbj);
        }
    }

    private static void addLevels(HashSet<String> level1, HashSet<String> level2, HashSet<String> level3, HashSet<String> level4, FOSDataModel first) {
        level1.add(first.getLevel1());
        level2.add(first.getLevel2());
        if (Optional.ofNullable(first.getLevel3()).isPresent() && !first.getLevel3().equalsIgnoreCase("N/A") && !first.getLevel3().equalsIgnoreCase("NULL") && first.getLevel3() != null) {
            level3.add(first.getLevel3() + "@@" + first.getScoreL3());
        } else {
            level3.add("NULL");
        }
        if (Optional.ofNullable(first.getLevel4()).isPresent() && !first.getLevel4().equalsIgnoreCase("N/A") && !first.getLevel4().equalsIgnoreCase("NULL") && first.getLevel4() != null) {
            level4.add(first.getLevel4() + "@@" + first.getScoreL4());
        } else {
            level4.add("NULL");
        }
    }
}

