/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttocommunityfromsemrel;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkResultToCommunityThroughSemRelJob {
    private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityThroughSemRelJob.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToCommunityThroughSemRelJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromsemrel/input_communitytoresult_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String preparedInfoPath = parser.get("preparedInfoPath");
        log.info("preparedInfoPath: {}", (Object)preparedInfoPath);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        String resultClassName = parser.get("resultTableName");
        log.info("resultTableName: {}", (Object)resultClassName);
        Boolean saveGraph = Optional.ofNullable(parser.get("saveGraph")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("saveGraph: {}", (Object)saveGraph);
        Class<?> resultClazz = Class.forName(resultClassName);
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            if (PropagationConstant.isTest(parser).booleanValue()) {
                PropagationConstant.removeOutputDir(spark, outputPath);
            }
            if (saveGraph.booleanValue()) {
                SparkResultToCommunityThroughSemRelJob.execPropagation(spark, inputPath, outputPath, preparedInfoPath, resultClazz);
            }
        });
    }

    private static <R extends Result> void execPropagation(SparkSession spark, String inputPath, String outputPath, String preparedInfoPath, Class<R> resultClazz) {
        Dataset<ResultCommunityList> possibleUpdates = PropagationConstant.readPath(spark, preparedInfoPath, ResultCommunityList.class);
        Dataset<R> result = PropagationConstant.readPath(spark, inputPath, resultClazz);
        result.joinWith(possibleUpdates, result.col("id").equalTo((Object)possibleUpdates.col("resultId")), "left_outer").map(SparkResultToCommunityThroughSemRelJob.contextUpdaterFn(), Encoders.bean(resultClazz)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> contextUpdaterFn() {
        return (MapFunction & Serializable)value -> {
            Result ret = (Result)value._1();
            Optional<Object> rcl = Optional.ofNullable(value._2());
            if (rcl.isPresent()) {
                HashSet contexts = new HashSet();
                ret.getContext().forEach(c -> contexts.add(c.getId()));
                ((ResultCommunityList)rcl.get()).getCommunityList().stream().forEach(c -> {
                    if (!contexts.contains(c)) {
                        Context newContext = new Context();
                        newContext.setId(c);
                        newContext.setDataInfo(Arrays.asList(PropagationConstant.getDataInfo("propagation", "result:community:semrel", " Propagation of result belonging to community through semantic relation", "dnet:provenanceActions")));
                        ret.getContext().add(newContext);
                    }
                });
            }
            return ret;
        };
    }
}

