/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.RelationAggregator;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;

public class SparkPropagateRelation
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);

    public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkPropagateRelation(parser, SparkPropagateRelation.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) {
        String graphBasePath = this.parser.get("graphBasePath");
        String workingPath = this.parser.get("workingPath");
        String graphOutputPath = this.parser.get("graphOutputPath");
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("workingPath: '{}'", (Object)workingPath);
        log.info("graphOutputPath: '{}'", (Object)graphOutputPath);
        String outputRelationPath = DedupUtility.createEntityPath(graphOutputPath, "relation");
        SparkPropagateRelation.removeOutputDir(this.spark, outputRelationPath);
        Dataset mergeRels = this.spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(Encoders.bean(Relation.class));
        Dataset mergedIds = mergeRels.where(functions.col((String)"relClass").equalTo((Object)"merges")).select(new Column[]{functions.col((String)"source"), functions.col((String)"target")}).distinct().map((MapFunction & Serializable)r -> new Tuple2((Object)r.getString(1), (Object)r.getString(0)), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).cache();
        String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
        Dataset rels = this.spark.read().textFile(relationPath).map(SparkPropagateRelation.patchRelFn(), Encoders.bean(Relation.class));
        Dataset<Relation> newRels = SparkPropagateRelation.createNewRels((Dataset<Relation>)rels, (Dataset<Tuple2<String, String>>)mergedIds, SparkPropagateRelation.getFixRelFn());
        Dataset<Relation> updated = SparkPropagateRelation.processDataset(SparkPropagateRelation.processDataset((Dataset<Relation>)rels, (Dataset<Tuple2<String, String>>)mergedIds, FieldType.SOURCE, SparkPropagateRelation.getDeletedFn()), (Dataset<Tuple2<String, String>>)mergedIds, FieldType.TARGET, SparkPropagateRelation.getDeletedFn());
        SparkPropagateRelation.save(this.distinctRelations((Dataset<Relation>)newRels.union(updated).union(mergeRels).map((MapFunction & Serializable)r -> r, Encoders.kryo(Relation.class))).filter((FilterFunction & Serializable)r -> !Objects.equals(r.getSource(), r.getTarget())), outputRelationPath, SaveMode.Overwrite);
    }

    private Dataset<Relation> distinctRelations(Dataset<Relation> rels) {
        return rels.filter(this.getRelationFilterFunction()).groupByKey((MapFunction & Serializable)r -> String.join((CharSequence)r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()).agg(new RelationAggregator().toColumn()).map(Tuple2::_2, Encoders.bean(Relation.class));
    }

    private static Dataset<Relation> createNewRels(Dataset<Relation> rels, Dataset<Tuple2<String, String>> mergedIds, MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> mapRel) {
        Dataset mapped = rels.map((MapFunction & Serializable)r -> new Tuple3((Object)SparkPropagateRelation.getId(r, FieldType.SOURCE), r, (Object)SparkPropagateRelation.getId(r, FieldType.TARGET)), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Relation.class), (Encoder)Encoders.STRING()));
        Dataset relSource = mapped.joinWith(mergedIds, mapped.col("_1").equalTo((Object)mergedIds.col("_1")), "left_outer");
        Dataset relSourceTarget = relSource.joinWith(mergedIds, relSource.col("_1._3").equalTo((Object)mergedIds.col("_1")), "left_outer");
        return relSourceTarget.filter((FilterFunction & Serializable)r -> ((Tuple2)r._1())._1() != null || r._2() != null).map(mapRel, Encoders.bean(Relation.class)).distinct();
    }

    private static Dataset<Relation> processDataset(Dataset<Relation> rels, Dataset<Tuple2<String, String>> mergedIds, FieldType type, MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, String>>, Relation> mapFn) {
        Dataset mapped = rels.map((MapFunction & Serializable)r -> new Tuple2((Object)SparkPropagateRelation.getId(r, type), r), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Relation.class)));
        return mapped.joinWith(mergedIds, mapped.col("_1").equalTo((Object)mergedIds.col("_1")), "left_outer").map(mapFn, Encoders.bean(Relation.class));
    }

    private FilterFunction<Relation> getRelationFilterFunction() {
        return (FilterFunction & Serializable)r -> StringUtils.isNotBlank((CharSequence)r.getSource()) || StringUtils.isNotBlank((CharSequence)r.getTarget()) || StringUtils.isNotBlank((CharSequence)r.getRelType()) || StringUtils.isNotBlank((CharSequence)r.getSubRelType()) || StringUtils.isNotBlank((CharSequence)r.getRelClass());
    }

    private static String getId(Relation r, FieldType type) {
        switch (type) {
            case SOURCE: {
                return r.getSource();
            }
            case TARGET: {
                return r.getTarget();
            }
        }
        throw new IllegalArgumentException("");
    }

    private static MapFunction<Tuple2<Tuple2<Tuple3<String, Relation, String>, Tuple2<String, String>>, Tuple2<String, String>>, Relation> getFixRelFn() {
        return (MapFunction & Serializable)value -> {
            String newTarget;
            Relation r = (Relation)((Tuple3)((Tuple2)value._1())._1())._2();
            String newSource = ((Tuple2)value._1())._2() != null ? (String)((Tuple2)((Tuple2)value._1())._2())._2() : null;
            String string = newTarget = value._2() != null ? (String)((Tuple2)value._2())._2() : null;
            if (r.getDataInfo() == null) {
                r.setDataInfo(new DataInfo());
            }
            r.getDataInfo().setDeletedbyinference(Boolean.valueOf(false));
            if (newSource != null) {
                r.setSource(newSource);
            }
            if (newTarget != null) {
                r.setTarget(newTarget);
            }
            return r;
        };
    }

    private static MapFunction<Tuple2<Tuple2<String, Relation>, Tuple2<String, String>>, Relation> getDeletedFn() {
        return (MapFunction & Serializable)value -> {
            if (value._2() != null) {
                Relation r = (Relation)((Tuple2)value._1())._2();
                if (r.getDataInfo() == null) {
                    r.setDataInfo(new DataInfo());
                }
                r.getDataInfo().setDeletedbyinference(Boolean.valueOf(true));
                return r;
            }
            return (Relation)((Tuple2)value._1())._2();
        };
    }

    static enum FieldType {
        SOURCE,
        TARGET;

    }
}

