package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.Predicate;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob.class */
public class GroupEntitiesSparkJob {
    private static final String ID_JPATH = "$.id";
    private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    /* loaded from: input_file:eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob$GroupingAggregator.class */
    public static class GroupingAggregator extends Aggregator<OafEntity, OafEntity, OafEntity> {
        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public OafEntity m5zero() {
            return null;
        }

        public OafEntity reduce(OafEntity oafEntity, OafEntity oafEntity2) {
            return mergeAndGet(oafEntity, oafEntity2);
        }

        private OafEntity mergeAndGet(OafEntity oafEntity, OafEntity oafEntity2) {
            return (Objects.nonNull(oafEntity2) && Objects.nonNull(oafEntity)) ? OafMapperUtils.mergeEntities(oafEntity, oafEntity2) : Objects.isNull(oafEntity2) ? oafEntity : oafEntity2;
        }

        public OafEntity merge(OafEntity oafEntity, OafEntity oafEntity2) {
            return mergeAndGet(oafEntity, oafEntity2);
        }

        public OafEntity finish(OafEntity oafEntity) {
            return oafEntity;
        }

        public Encoder<OafEntity> bufferEncoder() {
            return Encoders.kryo(OafEntity.class);
        }

        public Encoder<OafEntity> outputEncoder() {
            return Encoders.kryo(OafEntity.class);
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(GroupEntitiesSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("graphInputPath");
        log.info("graphInputPath: {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str2);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            HdfsSupport.remove(str2, sparkSession.sparkContext().hadoopConfiguration());
            groupEntities(sparkSession, str, str2);
        });
    }

    private static void groupEntities(SparkSession sparkSession, String str, String str2) {
        sparkSession.read().textFile(DHPUtils.toSeq(listEntityPaths(str, JavaSparkContext.fromSparkContext(sparkSession.sparkContext())))).map(str3 -> {
            return parseOaf(str3);
        }, Encoders.kryo(OafEntity.class)).filter(oafEntity -> {
            return StringUtils.isNotBlank((CharSequence) ModelSupport.idFn().apply(oafEntity));
        }).groupByKey(oafEntity2 -> {
            return (String) ModelSupport.idFn().apply(oafEntity2);
        }, Encoders.STRING()).agg(new GroupingAggregator().toColumn()).map(tuple2 -> {
            return ((OafEntity) tuple2._2()).getClass().getName() + "|" + OBJECT_MAPPER.writeValueAsString(tuple2._2());
        }, Encoders.STRING()).write().option("compression", "gzip").mode(SaveMode.Overwrite).text(str2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static OafEntity parseOaf(String str) {
        DocumentContext parse = JsonPath.parse(str, Configuration.defaultConfiguration().addOptions(new Option[]{Option.SUPPRESS_EXCEPTIONS}));
        String str2 = (String) parse.read(ID_JPATH, new Predicate[0]);
        if (!StringUtils.isNotBlank(str2)) {
            throw new IllegalArgumentException(String.format("invalid oaf: '%s'", str));
        }
        String substringBefore = StringUtils.substringBefore(str2, "|");
        boolean z = -1;
        switch (substringBefore.hashCode()) {
            case 1567:
                if (substringBefore.equals("10")) {
                    z = false;
                    break;
                }
                break;
            case 1598:
                if (substringBefore.equals("20")) {
                    z = true;
                    break;
                }
                break;
            case 1660:
                if (substringBefore.equals("40")) {
                    z = 2;
                    break;
                }
                break;
            case 1691:
                if (substringBefore.equals("50")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return parse(str, Datasource.class);
            case true:
                return parse(str, Organization.class);
            case true:
                return parse(str, Project.class);
            case true:
                String str3 = (String) parse.read("$.resulttype.classid", new Predicate[0]);
                boolean z2 = -1;
                switch (str3.hashCode()) {
                    case -1078222292:
                        if (str3.equals("publication")) {
                            z2 = false;
                            break;
                        }
                        break;
                    case 106069776:
                        if (str3.equals("other")) {
                            z2 = 3;
                            break;
                        }
                        break;
                    case 1319330215:
                        if (str3.equals("software")) {
                            z2 = 2;
                            break;
                        }
                        break;
                    case 1443214456:
                        if (str3.equals("dataset")) {
                            z2 = true;
                            break;
                        }
                        break;
                }
                switch (z2) {
                    case false:
                        return parse(str, Publication.class);
                    case true:
                        return parse(str, Dataset.class);
                    case true:
                        return parse(str, Software.class);
                    case true:
                        return parse(str, OtherResearchProduct.class);
                    default:
                        throw new IllegalArgumentException(String.format("invalid resultType: '%s'", str3));
                }
            default:
                throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", substringBefore));
        }
    }

    private static <T extends OafEntity> OafEntity parse(String str, Class<T> cls) {
        try {
            return (OafEntity) OBJECT_MAPPER.readValue(str, cls);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<String> listEntityPaths(String str, JavaSparkContext javaSparkContext) {
        return (List) HdfsSupport.listFiles(str, javaSparkContext.hadoopConfiguration()).stream().filter(str2 -> {
            return !str2.toLowerCase().contains("relation");
        }).collect(Collectors.toList());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 300101300:
                if (implMethodName.equals("lambda$groupEntities$de069a7$1")) {
                    z = false;
                    break;
                }
                break;
            case 848959825:
                if (implMethodName.equals("lambda$groupEntities$13e5db5$1")) {
                    z = 3;
                    break;
                }
                break;
            case 848959826:
                if (implMethodName.equals("lambda$groupEntities$13e5db5$2")) {
                    z = true;
                    break;
                }
                break;
            case 848959827:
                if (implMethodName.equals("lambda$groupEntities$13e5db5$3")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/OafEntity;)Z")) {
                    return oafEntity -> {
                        return StringUtils.isNotBlank((CharSequence) ModelSupport.idFn().apply(oafEntity));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/OafEntity;)Ljava/lang/String;")) {
                    return oafEntity2 -> {
                        return (String) ModelSupport.idFn().apply(oafEntity2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Ljava/lang/String;")) {
                    return tuple2 -> {
                        return ((OafEntity) tuple2._2()).getClass().getName() + "|" + OBJECT_MAPPER.writeValueAsString(tuple2._2());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/GroupEntitiesSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Leu/dnetlib/dhp/schema/oaf/OafEntity;")) {
                    return str3 -> {
                        return parseOaf(str3);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
