/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.entitytoorganizationfromsemrel;

import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.entitytoorganizationfromsemrel.Leaves;
import eu.dnetlib.dhp.entitytoorganizationfromsemrel.PropagationCounter;
import eu.dnetlib.dhp.entitytoorganizationfromsemrel.StepActions;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkEntityToOrganizationFromSemRel
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SparkEntityToOrganizationFromSemRel.class);
    private static final int MAX_ITERATION = 5;
    public static final String NEW_RESULT_RELATION_PATH = "/newResultRelation";
    public static final String NEW_PROJECT_RELATION_PATH = "/newProjectRelation";

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/entitytoorganizationfromsemrel/input_propagation_parameter.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String relationPath = parser.get("relationPath");
        log.info("relationPath: {}", (Object)relationPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String leavesPath = parser.get("leavesPath");
        log.info("leavesPath: {}", (Object)leavesPath);
        String childParentPath = parser.get("childParentPath");
        log.info("childParentPath: {}", (Object)childParentPath);
        String resultOrganizationPath = parser.get("resultOrgPath");
        log.info("resultOrganizationPath: {}", (Object)resultOrganizationPath);
        String projectOrganizationPath = parser.get("projectOrganizationPath");
        log.info("projectOrganizationPath: {}", (Object)projectOrganizationPath);
        String workingPath = parser.get("workingDir");
        log.info("workingPath: {}", (Object)workingPath);
        int iterations = Optional.ofNullable(parser.get("iterations")).map(v -> {
            if (Integer.valueOf(v) < 5) {
                return Integer.valueOf(v);
            }
            return 5;
        }).orElse(5);
        log.info("iterations: {}", (Object)iterations);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> SparkEntityToOrganizationFromSemRel.execPropagation(spark, leavesPath, childParentPath, resultOrganizationPath, projectOrganizationPath, relationPath, workingPath, outputPath, iterations));
    }

    public static void execPropagation(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, String outputPath, int iterations) {
        if (iterations == 1) {
            SparkEntityToOrganizationFromSemRel.doPropagateOnce(spark, leavesPath, childParentPath, resultOrganizationPath, projectOrganizationPath, graphPath, workingPath, outputPath);
        } else {
            LongAccumulator iterationOne = spark.sparkContext().longAccumulator("ExitAtFirstIteration");
            LongAccumulator iterationTwo = spark.sparkContext().longAccumulator("ExitAtSecondIteration");
            LongAccumulator iterationThree = spark.sparkContext().longAccumulator("ExitAtThirdIteration");
            LongAccumulator iterationFour = spark.sparkContext().longAccumulator("ExitAtFourthIteration");
            LongAccumulator iterationFive = spark.sparkContext().longAccumulator("ExitAtFifthIteration");
            LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator("ExitAtNoFirstParentReached");
            PropagationCounter propagationCounter = new PropagationCounter(iterationOne, iterationTwo, iterationThree, iterationFour, iterationFive, notReachedFirstParent);
            SparkEntityToOrganizationFromSemRel.doPropagate(spark, leavesPath, childParentPath, resultOrganizationPath, projectOrganizationPath, graphPath, workingPath, outputPath, propagationCounter, iterations);
        }
    }

    private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, String outputPath) {
        StepActions.execStep(spark, graphPath + "/result", workingPath + NEW_RESULT_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath, "hasAuthorInstitution");
        SparkEntityToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_RESULT_RELATION_PATH, outputPath);
        StepActions.execStep(spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, leavesPath, childParentPath, projectOrganizationPath, "hasParticipant");
        SparkEntityToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_PROJECT_RELATION_PATH, outputPath);
    }

    private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String projectOrganizationPath, String graphPath, String workingPath, String outputPath, PropagationCounter propagationCounter, int iterations) {
        long leavesCount;
        int iteration = 0;
        do {
            StepActions.execStep(spark, graphPath + "/result", workingPath + NEW_RESULT_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath, "hasAuthorInstitution");
            StepActions.execStep(spark, graphPath + "/project", workingPath + NEW_PROJECT_RELATION_PATH, leavesPath, childParentPath, projectOrganizationPath, "hasParticipant");
            StepActions.prepareForNextStep(spark, workingPath, resultOrganizationPath, projectOrganizationPath, leavesPath, childParentPath, workingPath + "/leaves", workingPath + "/resOrg", workingPath + "/projOrg");
            SparkEntityToOrganizationFromSemRel.moveOutput(spark, workingPath, leavesPath, resultOrganizationPath, projectOrganizationPath);
        } while ((leavesCount = PropagationConstant.readPath(spark, leavesPath, Leaves.class).count()) > 0L && ++iteration < iterations);
        if (leavesCount == 0L) {
            switch (String.valueOf(iteration)) {
                case "1": {
                    propagationCounter.getIterationOne().add(1L);
                    break;
                }
                case "2": {
                    propagationCounter.getIterationTwo().add(1L);
                    break;
                }
                case "3": {
                    propagationCounter.getIterationThree().add(1L);
                    break;
                }
                case "4": {
                    propagationCounter.getIterationFour().add(1L);
                    break;
                }
                case "5": {
                    propagationCounter.getIterationFive().add(1L);
                    break;
                }
            }
        } else {
            propagationCounter.getNotReachedFirstParent().add(1L);
        }
        SparkEntityToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_RESULT_RELATION_PATH, outputPath);
        SparkEntityToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_PROJECT_RELATION_PATH, outputPath);
    }

    private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, String resultOrganizationPath) {
        PropagationConstant.readPath(spark, workingPath + "/leaves", Leaves.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(leavesPath);
        PropagationConstant.readPath(spark, workingPath + "/resOrg", KeyValueSet.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(resultOrganizationPath);
    }

    private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, String resultOrganizationPath, String projectOrganizationPath) {
        PropagationConstant.readPath(spark, workingPath + "/leaves", Leaves.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(leavesPath);
        PropagationConstant.readPath(spark, workingPath + "/resOrg", KeyValueSet.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(resultOrganizationPath);
        PropagationConstant.readPath(spark, workingPath + "/projOrg", KeyValueSet.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(projectOrganizationPath);
    }

    private static void addNewRelations(SparkSession spark, String newRelationPath, String outputPath) {
        Dataset<Relation> relation = PropagationConstant.readPath(spark, newRelationPath, Relation.class);
        relation.groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> (Relation)it.next(), Encoders.bean(Relation.class)).flatMap((FlatMapFunction & Serializable)r -> {
            if (r.getSource().startsWith("50|")) {
                return Arrays.asList(r, PropagationConstant.getAffiliationRelation(r.getTarget(), r.getSource(), "isAuthorInstitutionOf")).iterator();
            }
            return Arrays.asList(r, PropagationConstant.getParticipantRelation(r.getTarget(), r.getSource(), "isParticipant")).iterator();
        }, Encoders.bean(Relation.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(outputPath);
    }
}

