/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.affro;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.affro.beans.Affiliation;
import eu.dnetlib.dhp.actionmanager.affro.beans.Author;
import eu.dnetlib.dhp.actionmanager.affro.beans.IISModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrepareDataset
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(PrepareDataset.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareDataset.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/affro/input_preparedataset_parameter.json"));
        log.info("read parameter file");
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String oalexPath = parser.get("oalexPath");
        log.info("oalexPath: {}", (Object)oalexPath);
        String oairePath = parser.get("oairePath");
        log.info("oairePath: {}", (Object)oairePath);
        String publishersPath = parser.get("publishersPath");
        log.info("publishersPath: {}", (Object)publishersPath);
        String iisPath = parser.get("iisPath");
        log.info("iisPath: {}", (Object)iisPath);
        String oldMatches = parser.get("oldMatches");
        log.info("oldMatches: {}", (Object)oldMatches);
        String workingDir = parser.get("outputPath");
        log.info("workingDir: {}", (Object)workingDir);
        Boolean startFromScratch = Optional.ofNullable(parser.get("applyOnAll")).map(Boolean::valueOf).orElse(Boolean.FALSE);
        String hiveMetastoreUris = parser.get("hiveMetastoreUris");
        log.info("hiveMetastoreUris: {}", (Object)hiveMetastoreUris);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", hiveMetastoreUris);
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            Constants.removeOutputDir(spark, workingDir);
            PrepareDataset.prepareDataset(spark, oalexPath, oairePath, iisPath, publishersPath, workingDir, oldMatches, startFromScratch);
        });
    }

    private static void prepareDataset(SparkSession spark, String oalexPath, String oairePath, String iisPath, String publishersPath, String workingDir, String oldMatches, Boolean startFromScratch) {
        spark.udf().register("md5HashWithPrefix", (UDF1 & Serializable)doi -> "50|doi_________::" + DHPUtils.md5((String)doi), DataTypes.StringType);
        spark.udf().register("addResultPrefix", (UDF1 & Serializable)id -> "50|" + id, DataTypes.StringType);
        spark.udf().register("selectId", (UDF2 & Serializable)(doi, id) -> StringUtils.isNotEmpty((CharSequence)id) ? id : "50|doi_________::" + DHPUtils.md5((String)StringUtils.substringAfter((String)doi, (String)"doi.org/")), DataTypes.StringType);
        Dataset oalex = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.OALEX_SCHEMA).json(oalexPath).filter(functions.col((String)"doi").isNotNull()).withColumn("id", functions.expr((String)"md5HashWithPrefix(doi)")).select(new Column[]{functions.col((String)"id"), functions.explode((Column)functions.col((String)"authorships")).alias("author")}).withColumn("fullname", functions.col((String)"author.author.display_name")).withColumn("raw_affiliation_strings", functions.col((String)"author.raw_affiliation_strings")).select(new Column[]{functions.col((String)"id"), functions.col((String)"fullname"), functions.explode((Column)functions.col((String)"raw_affiliation_strings")).alias("raw_affiliation_string")}).filter("raw_affiliation_string IS NOT NULL AND TRIM(raw_affiliation_string) != '' AND LOWER(raw_affiliation_string) NOT IN ('unknown', 'none')").withColumn("corresponding", functions.lit(null)).withColumn("contributor_roles", functions.lit(null)).select(new Column[]{functions.col((String)"id"), functions.col((String)"fullname"), functions.col((String)"raw_affiliation_string"), functions.col((String)"corresponding"), functions.col((String)"contributor_roles")}).as((Encoder)RowEncoder.apply((StructType)eu.dnetlib.dhp.actionmanager.affro.Constants.DATASET_SCHEMA));
        oalex.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "exploded/oalex");
        Dataset oaire_entities = spark.createDataFrame(Collections.emptyList(), eu.dnetlib.dhp.actionmanager.affro.Constants.GRAPH_SCHEMA);
        for (EntityType entity : ModelSupport.entityTypes.keySet()) {
            if (!ModelSupport.isResult((EntityType)entity)) continue;
            oaire_entities = oaire_entities.union(spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.GRAPH_SCHEMA).json(oairePath + "/" + entity.name()));
        }
        Dataset oaire = oaire_entities.select(new Column[]{functions.col((String)"id"), functions.explode((Column)functions.col((String)"author")).alias("author")}).select(new Column[]{functions.col((String)"id"), functions.col((String)"author"), functions.explode((Column)functions.col((String)"author.rawAffiliationString")).alias("raw_affiliation_string")}).filter("raw_affiliation_string IS NOT NULL AND TRIM(raw_affiliation_string) != '' AND LOWER(raw_affiliation_string) NOT IN ('unknown', 'none')").withColumn("fullname", functions.col((String)"author.fullName")).drop("author").withColumn("corresponding", functions.lit(null)).withColumn("contributor_roles", functions.lit(null)).select(new Column[]{functions.col((String)"id"), functions.col((String)"fullname"), functions.col((String)"raw_affiliation_string"), functions.col((String)"corresponding"), functions.col((String)"contributor_roles")});
        oaire.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "exploded/oaire");
        Dataset iis = spark.sql("SELECT id, authors, affiliations FROM mh.extracted_document_metadata_prod").as(Encoders.bean(IISModel.class)).filter((FilterFunction & Serializable)value -> Optional.ofNullable(value.getAuthors()).isPresent() && !value.getAuthors().isEmpty() && Optional.ofNullable(value.getAffiliations()).isPresent() && !value.getAffiliations().isEmpty()).flatMap((FlatMapFunction & Serializable)value -> {
            ArrayList ret = new ArrayList();
            value.getAuthors().stream().forEach(author -> ret.addAll(PrepareDataset.getAuthorLines(value.getId(), author, value.getAffiliations())));
            return ret.iterator();
        }, (Encoder)RowEncoder.apply((StructType)eu.dnetlib.dhp.actionmanager.affro.Constants.DATASET_SCHEMA)).filter("raw_affiliation_string IS NOT NULL AND TRIM(raw_affiliation_string) != '' AND LOWER(raw_affiliation_string) NOT IN ('unknown', 'none')").select(new Column[]{functions.col((String)"id"), functions.col((String)"fullname"), functions.col((String)"raw_affiliation_string"), functions.col((String)"corresponding"), functions.col((String)"contributor_roles")});
        iis.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "exploded/iis");
        Dataset publishers = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.PUBLISHER_SCHEMA).json(publishersPath).filter(functions.col((String)"success").equalTo((Object)true)).withColumn("authors", functions.col((String)"parsing_output.authors")).select(new Column[]{functions.col((String)"id"), functions.col((String)"doi"), functions.explode((Column)functions.col((String)"authors")).alias("author")}).withColumn("graphId", functions.col((String)"id")).drop(functions.col((String)"id")).withColumn("fullname", functions.col((String)"author.name.full")).withColumn("raw_affiliation_strings", functions.col((String)"author.raw_affiliations")).withColumn("corresponding", functions.col((String)"author.corresponding")).withColumn("contributor_roles", functions.col((String)"author.contributor_roles")).drop(functions.col((String)"author")).select(new Column[]{functions.col((String)"graphId"), functions.col((String)"doi"), functions.col((String)"fullname"), functions.explode((Column)functions.col((String)"raw_affiliation_strings")).alias("raw_affiliation_string"), functions.col((String)"corresponding"), functions.col((String)"contributor_roles")}).filter("raw_affiliation_string IS NOT NULL AND TRIM(raw_affiliation_string) != '' AND LOWER(raw_affiliation_string) NOT IN ('unknown', 'none')").withColumn("id", functions.expr((String)"selectId(doi, graphId)")).drop(functions.col((String)"graphId")).drop(functions.col((String)"doi")).select("id", new String[]{"fullname", "raw_affiliation_string", "corresponding", "contributor_roles"});
        publishers.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "exploded/publishers");
        Dataset inputDataset = oalex.union(oaire).union(publishers).union(iis).distinct();
        inputDataset.select(new Column[]{functions.col((String)"raw_affiliation_string")}).distinct().write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "/all_strings");
        Dataset alreadyMatched = spark.createDataFrame(Collections.emptyList(), eu.dnetlib.dhp.actionmanager.affro.Constants.AFFILIATION_SCHEMA);
        if (!startFromScratch.booleanValue()) {
            alreadyMatched = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.AFFILIATION_SCHEMA).json(oldMatches);
        }
        Dataset affStrings = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.AFFILIATION_STRING_SCHEMA).json(workingDir + "/all_strings");
        Dataset newToMatch = affStrings.join(alreadyMatched, affStrings.col("raw_affiliation_string").equalTo((Object)alreadyMatched.col("Affiliation")), "left").filter(functions.col((String)"Affiliation").isNull()).select("raw_affiliation_string", new String[0]).distinct();
        newToMatch.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "/toMatch");
    }

    private static List<Row> getAuthorLines(String id, Author author, List<Affiliation> affiliations) {
        List<Integer> affiliationPositions = author.getAffiliationpositions();
        if (Optional.ofNullable(affiliationPositions).isPresent() && !affiliationPositions.isEmpty()) {
            return affiliationPositions.stream().map(pos -> {
                if (pos < affiliations.size()) {
                    return RowFactory.create((Object[])new Object[]{id, author.getAuthorfullname(), ((Affiliation)affiliations.get((int)pos)).getRawtext(), null, null});
                }
                return null;
            }).filter(Objects::nonNull).collect(Collectors.toList());
        }
        return new ArrayList<Row>();
    }
}

