/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.person;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.common.person.Coauthors;
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
import eu.dnetlib.dhp.person.OrcidIndicators;
import eu.dnetlib.dhp.person.ResultSubset;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Person;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkExtractPersonRelationsAndAddIndicators {
    private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
    private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
    public static final DataInfo DATAINFO = OafMapperUtils.dataInfo((Boolean)false, (String)"openaire", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"sysimport:crosswalk:repository", (String)"sysimport:crosswalk:repository", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"0.85");

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath") + "/";
        log.info("sourcePath: {}", (Object)sourcePath);
        String workingPath = parser.get("outputPath");
        log.info("workingPath: {}", (Object)workingPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkExtractPersonRelationsAndAddIndicators.extractRelations(spark, sourcePath, workingPath);
            SparkExtractPersonRelationsAndAddIndicators.addIndicators(spark, sourcePath, workingPath);
            SparkExtractPersonRelationsAndAddIndicators.removeIsolatedPerson(spark, sourcePath, workingPath);
        });
    }

    private static void addIndicators(SparkSession spark, String sourcePath, String workingPath) {
        ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult).forEach(e -> spark.read().schema(Encoders.bean(Result.class).schema()).json(sourcePath + e.name()).as(Encoders.bean(Result.class)).filter((FilterFunction & Serializable)r -> r.getDataInfo().getDeletedbyinference() == false && r.getDataInfo().getInvisible() == false && Optional.ofNullable(r.getAuthor()).isPresent()).filter((FilterFunction & Serializable)r -> r.getAuthor().stream().anyMatch(a -> Optional.ofNullable(a.getPid()).isPresent() && a.getPid().stream().anyMatch(p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid().toLowerCase())))).map(ResultSubset::newInstance, Encoders.bean(ResultSubset.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath + "/resultWithPid"));
        Dataset resultSubset = spark.read().schema(Encoders.bean(ResultSubset.class).schema()).json(workingPath + "/resultWithPid").as(Encoders.bean(ResultSubset.class));
        resultSubset.filter((FilterFunction & Serializable)rs -> Optional.ofNullable(rs.getMeasures()).isPresent()).flatMap((FlatMapFunction & Serializable)r -> {
            ArrayList oi = new ArrayList();
            r.getAuthor().forEach(a -> {
                List orcid = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).collect(Collectors.toList());
                if (!orcid.isEmpty()) {
                    oi.add(OrcidIndicators.newInstance(r.getId(), ((StructuredProperty)orcid.get(0)).getValue(), r.getMeasures()));
                } else {
                    orcid = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).collect(Collectors.toList());
                    if (!orcid.isEmpty()) {
                        oi.add(OrcidIndicators.newInstance(r.getId(), ((StructuredProperty)orcid.get(0)).getValue(), r.getMeasures()));
                    }
                }
            });
            return oi.iterator();
        }, Encoders.bean(OrcidIndicators.class)).distinct().groupByKey(OrcidIndicators::getOrcid, Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            OrcidIndicators acc = (OrcidIndicators)it.next();
            it.forEachRemaining(oi -> acc.addIndicators(oi.getDownloads(), oi.getCitations()));
            return acc;
        }, Encoders.bean(OrcidIndicators.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath + "/orcidIndicators");
        Dataset person = spark.read().schema(Encoders.bean(Person.class).schema()).json(sourcePath + "person").as(Encoders.bean(Person.class));
        Dataset orcidIndicators = spark.read().schema(Encoders.bean(OrcidIndicators.class).schema()).json(workingPath + "/orcidIndicators").as(Encoders.bean(OrcidIndicators.class));
        person.joinWith(orcidIndicators, person.col("id").equalTo((Object)orcidIndicators.col("orcid")), "left").map((MapFunction & Serializable)t2 -> {
            Person p = (Person)t2._1();
            if (t2._2() != null) {
                p.setMeasures(Arrays.asList(SparkExtractPersonRelationsAndAddIndicators.getMeasure("downloads", String.valueOf(((OrcidIndicators)t2._2()).getDownloads())), SparkExtractPersonRelationsAndAddIndicators.getMeasure("citations", String.valueOf(((OrcidIndicators)t2._2()).getCitations()))));
            }
            return p;
        }, Encoders.bean(Person.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingPath + "/person");
        spark.read().schema(Encoders.bean(Person.class).schema()).json(workingPath + "/person").write().mode(SaveMode.Overwrite).option("compression", "gzip").json(sourcePath + "person");
    }

    private static Measure getMeasure(String measureName, String measureValue) {
        Measure measure = new Measure();
        measure.setId(measureName);
        KeyValue kv = new KeyValue();
        kv.setKey("score");
        kv.setValue(measureValue);
        measure.setUnit(Arrays.asList(kv));
        return measure;
    }

    private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) {
        Dataset personDataset = spark.read().schema(Encoders.bean(Person.class).schema()).json(sourcePath + "person").as(Encoders.bean(Person.class));
        Dataset relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema()).json(sourcePath + "relation").as(Encoders.bean(Relation.class));
        personDataset.join(relationDataset, personDataset.col("id").equalTo((Object)relationDataset.col("source")), "left_semi").write().option("compression", "gzip").mode(SaveMode.Overwrite).json(workingPath + "person");
        spark.read().schema(Encoders.bean(Person.class).schema()).json(workingPath + "person").write().mode(SaveMode.Overwrite).option("compression", "gzip").json(sourcePath + "person");
    }

    private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) {
        ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult).forEach(e -> {
            Dataset resultWithOrcids = spark.read().schema(Encoders.bean(Result.class).schema()).json(sourcePath + e.name()).as(Encoders.bean(Result.class)).filter((FilterFunction & Serializable)r -> r.getDataInfo().getDeletedbyinference() == false && r.getDataInfo().getInvisible() == false && Optional.ofNullable(r.getAuthor()).isPresent()).filter((FilterFunction & Serializable)r -> r.getAuthor().stream().anyMatch(a -> Optional.ofNullable(a.getPid()).isPresent() && a.getPid().stream().anyMatch(p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid().toLowerCase()))));
            resultWithOrcids.flatMap(SparkExtractPersonRelationsAndAddIndicators::getAuthorshipRelations, Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath);
            resultWithOrcids.map(SparkExtractPersonRelationsAndAddIndicators::getAuthorsPidList, Encoders.bean(Coauthors.class)).flatMap((FlatMapFunction & Serializable)c -> new CoAuthorshipIterator(c.getCoauthors()), Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath);
        });
        spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingPath).as(Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Append).option("compression", "gzip").json(sourcePath + "relation");
    }

    private static Coauthors getAuthorsPidList(Result r) {
        Coauthors coauth = new Coauthors();
        coauth.setCoauthors(r.getAuthor().stream().filter(a -> a.getPid().stream().anyMatch(p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid()))).map(a -> {
            Optional<StructuredProperty> tmp = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).findFirst();
            if (tmp.isPresent()) {
                return tmp.get().getValue();
            }
            tmp = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).findFirst();
            return tmp.map(StructuredProperty::getValue).orElse(null);
        }).filter(Objects::nonNull).collect(Collectors.toList()));
        return coauth;
    }

    private static Iterator<Relation> getAuthorshipRelations(Result r) {
        ArrayList<Relation> relationList = new ArrayList<Relation>();
        List<Object> orcids = new ArrayList();
        for (Author a : r.getAuthor()) {
            orcids = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).collect(Collectors.toList());
            if (orcids.isEmpty()) {
                orcids = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).collect(Collectors.toList());
            }
            if (orcids.isEmpty()) continue;
            relationList.add(SparkExtractPersonRelationsAndAddIndicators.getRelation(((StructuredProperty)orcids.get(0)).getValue(), r.getId()));
        }
        return relationList.iterator();
    }

    private static Relation getRelation(String orcid, String resultId) {
        String source = PERSON_PREFIX + "::" + IdentifierFactory.md5((String)orcid);
        Relation relation = OafMapperUtils.getRelation((String)source, (String)resultId, (String)"resultPerson", (String)"authorship", (String)"hasAuthored", null, (DataInfo)DATAINFO, null);
        return relation;
    }
}

