/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.affro;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.affro.AggregateAuthorUDF;
import eu.dnetlib.dhp.actionmanager.affro.AggregateResultUDF;
import eu.dnetlib.dhp.actionmanager.affro.PrepareDataset;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.InputStream;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedistributeRelations
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(PrepareDataset.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareDataset.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/affro/input_redistribute_relations_parameter.json"));
        log.info("read parameter file");
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String explodedPath = parser.get("explodedResultPath");
        log.info("explodedPath: {}", (Object)explodedPath);
        String matchingsPath = parser.get("matchingsPath");
        log.info("matchingsPath: {}", (Object)matchingsPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> RedistributeRelations.redistributeRelations(spark, explodedPath, matchingsPath, outputPath));
    }

    private static void redistributeRelations(SparkSession spark, String explodedPath, String matchingsPath, String outputPath) {
        String[] entities;
        spark.udf().register("insertKey", (UDF2 & Serializable)(id, fullname) -> StringUtils.isNotEmpty((CharSequence)fullname) ? id + fullname : id, DataTypes.StringType);
        spark.udf().register("aggregateAuthor", (UDF1)new AggregateAuthorUDF(), (DataType)eu.dnetlib.dhp.actionmanager.affro.Constants.AUTHOR_AGGREGATED_SCHEMA);
        spark.udf().register("aggregateResult", (UDF1)new AggregateResultUDF(), (DataType)eu.dnetlib.dhp.actionmanager.affro.Constants.RESULT_MATCHED_SCHEMA);
        for (String datasource : entities = new String[]{"iis", "oalex", "oaire", "publishers"}) {
            RedistributeRelations.redistributeFroDatasource(spark, explodedPath + datasource, matchingsPath, outputPath + datasource);
        }
    }

    private static void redistributeFroDatasource(SparkSession spark, String explodedPath, String matchingsPath, String outputPath) {
        Dataset exploded = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.DATASET_SCHEMA).json(explodedPath);
        Dataset matchings = spark.read().schema(eu.dnetlib.dhp.actionmanager.affro.Constants.AFFILIATION_SCHEMA).json(matchingsPath);
        int numSalts = 100;
        Dataset explodedWithSalt = exploded.withColumn("salt", functions.expr((String)("CAST(FLOOR(RAND() * " + numSalts + ") AS INT)")));
        Dataset saltedMatchings = matchings.withColumn("salt", functions.explode((Column)functions.expr((String)("sequence(0, " + (numSalts - 1) + ")"))));
        Dataset joined = explodedWithSalt.join(saltedMatchings, explodedWithSalt.col("raw_affiliation_string").equalTo((Object)saltedMatchings.col("Affiliation")).and(explodedWithSalt.col("salt").equalTo((Object)saltedMatchings.col("salt")))).filter(functions.col((String)"Matchings").isNotNull().and(functions.size((Column)functions.col((String)"Matchings")).gt((Object)0))).select(new Column[]{explodedWithSalt.col("id"), explodedWithSalt.col("fullname"), explodedWithSalt.col("raw_affiliation_string"), functions.col((String)"Matchings"), functions.col((String)"corresponding"), functions.col((String)"contributor_roles")}).withColumn("key", functions.expr((String)"insertKey(id, fullname)"));
        Dataset groupedDf = joined.groupBy("key", new String[0]).agg(functions.collect_list((Column)functions.struct((Column[])new Column[]{joined.col("*")})).alias("group"), new Column[0]).withColumn("aggAuthor", functions.expr((String)"aggregateAuthor(group)")).select("aggAuthor.*", new String[0]);
        Dataset resultDf = groupedDf.groupBy("id", new String[0]).agg(functions.collect_list((Column)functions.struct((Column[])new Column[]{groupedDf.col("*")})).alias("group"), new Column[0]).withColumn("result", functions.expr((String)"aggregateResult(group)")).select("result.*", new String[0]);
        resultDf.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }
}

