package eu.dnetlib.dhp.oa.dedup;

import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.graph.ConnectedComponent;
import eu.dnetlib.dhp.oa.dedup.graph.GraphProcessor;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.graphx.Edge;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.class */
public class SparkCreateMergeRels extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);

    public SparkCreateMergeRels(ArgumentApplicationParser argumentApplicationParser, SparkSession sparkSession) {
        super(argumentApplicationParser, sparkSession);
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("isLookUpUrl");
        log.info("isLookupUrl {}", str);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkCreateMergeRels(argumentApplicationParser, getSparkSession(sparkConf)).run(ISLookupClientFactory.getLookUpService(str));
    }

    @Override // eu.dnetlib.dhp.oa.dedup.AbstractSparkAction
    public void run(ISLookUpService iSLookUpService) throws ISLookUpException, DocumentException, IOException {
        String str = this.parser.get("graphBasePath");
        String str2 = this.parser.get("workingPath");
        String str3 = this.parser.get("isLookUpUrl");
        String str4 = this.parser.get("actionSetId");
        int intValue = ((Integer) Optional.ofNullable(this.parser.get("cutConnectedComponent")).map(Integer::valueOf).orElse(0)).intValue();
        log.info("connected component cut: '{}'", Integer.valueOf(intValue));
        log.info("graphBasePath: '{}'", str);
        log.info("isLookUpUrl:   '{}'", str3);
        log.info("actionSetId:   '{}'", str4);
        log.info("workingPath:   '{}'", str2);
        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(this.spark.sparkContext());
        for (DedupConfig dedupConfig : getConfigurations(iSLookUpService, str4)) {
            String subEntityValue = dedupConfig.getWf().getSubEntityValue();
            Class cls = (Class) ModelSupport.entityTypes.get(EntityType.valueOf(subEntityValue));
            log.info("Creating mergerels for: '{}'", subEntityValue);
            int maxIterations = dedupConfig.getWf().getMaxIterations();
            log.info("Max iterations {}", Integer.valueOf(maxIterations));
            String createMergeRelPath = DedupUtility.createMergeRelPath(str2, str4, subEntityValue);
            JavaPairRDD<Object, String> createVertexes = createVertexes(fromSparkContext, str, subEntityValue, dedupConfig);
            Dataset createDataset = this.spark.createDataset(GraphProcessor.findCCs(createVertexes.rdd(), this.spark.read().load(DedupUtility.createSimRelPath(str2, str4, subEntityValue)).as(Encoders.bean(Relation.class)).javaRDD().map(relation -> {
                return new Edge(hash(relation.getSource()), hash(relation.getTarget()), relation.getRelClass());
            }).rdd(), maxIterations, intValue).toJavaRDD().filter(connectedComponent -> {
                return Boolean.valueOf(connectedComponent.getIds().size() > 1);
            }).flatMap(this::ccToRels).rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()));
            Dataset map = this.spark.read().textFile(DedupUtility.createEntityPath(str, subEntityValue)).map(str5 -> {
                OafEntity oafEntity = (OafEntity) OBJECT_MAPPER.readValue(str5, cls);
                return new Tuple2(oafEntity.getId(), oafEntity);
            }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(cls)));
            createDataset.joinWith(map, createDataset.col("_2").equalTo(map.col("_1")), "inner").map(tuple2 -> {
                return new Tuple2(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._2())._2());
            }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(cls))).groupByKey((v0) -> {
                return v0._1();
            }, Encoders.STRING()).mapGroups(this::generateID, Encoders.bean(ConnectedComponent.class)).flatMap(connectedComponent2 -> {
                return ccToMergeRel(connectedComponent2, dedupConfig);
            }, Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).parquet(createMergeRelPath);
        }
    }

    private <T extends OafEntity> ConnectedComponent generateID(String str, Iterator<Tuple2<String, T>> it) {
        List list = (List) Lists.newArrayList(it).stream().map(tuple2 -> {
            return Identifier.newInstance((OafEntity) tuple2._2());
        }).collect(Collectors.toList());
        String generate = IdGenerator.generate(list, str);
        if (Objects.equals(generate, str)) {
            throw new IllegalStateException("generated default ID: " + generate);
        }
        return new ConnectedComponent(generate, (Set<String>) list.stream().map(identifier -> {
            return identifier.getEntity().getId();
        }).collect(Collectors.toSet()));
    }

    private JavaPairRDD<Object, String> createVertexes(JavaSparkContext javaSparkContext, String str, String str2, DedupConfig dedupConfig) {
        return javaSparkContext.textFile(DedupUtility.createEntityPath(str, str2)).mapToPair(str3 -> {
            String jPathString = MapDocumentUtil.getJPathString(dedupConfig.getWf().getIdPath(), str3);
            return new Tuple2(Long.valueOf(hash(jPathString)), jPathString);
        });
    }

    private Iterator<Tuple2<String, String>> ccToRels(ConnectedComponent connectedComponent) {
        return connectedComponent.getIds().stream().map(str -> {
            return new Tuple2(connectedComponent.getCcId(), str);
        }).iterator();
    }

    private Iterator<Relation> ccToMergeRel(ConnectedComponent connectedComponent, DedupConfig dedupConfig) {
        return connectedComponent.getIds().stream().flatMap(str -> {
            ArrayList arrayList = new ArrayList();
            arrayList.add(rel(connectedComponent.getCcId(), str, "merges", dedupConfig));
            arrayList.add(rel(str, connectedComponent.getCcId(), "isMergedIn", dedupConfig));
            return arrayList.stream();
        }).iterator();
    }

    private Relation rel(String str, String str2, String str3, DedupConfig dedupConfig) {
        String entityType = dedupConfig.getWf().getEntityType();
        Relation relation = new Relation();
        relation.setSource(str);
        relation.setTarget(str2);
        relation.setRelClass(str3);
        relation.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
        relation.setSubRelType("dedup");
        DataInfo dataInfo = new DataInfo();
        dataInfo.setDeletedbyinference(false);
        dataInfo.setInferred(true);
        dataInfo.setInvisible(false);
        dataInfo.setInferenceprovenance(dedupConfig.getWf().getConfigurationId());
        Qualifier qualifier = new Qualifier();
        qualifier.setClassid("sysimport:dedup");
        qualifier.setClassname("sysimport:dedup");
        qualifier.setSchemeid("dnet:provenanceActions");
        qualifier.setSchemename("dnet:provenanceActions");
        dataInfo.setProvenanceaction(qualifier);
        relation.setDataInfo(dataInfo);
        return relation;
    }

    public static long hash(String str) {
        return Hashing.murmur3_128().hashString(str).asLong();
    }

    @Override // eu.dnetlib.dhp.oa.dedup.AbstractSparkAction
    public /* bridge */ /* synthetic */ List getConfigurations(ISLookUpService iSLookUpService, String str) throws ISLookUpException, DocumentException, IOException {
        return super.getConfigurations(iSLookUpService, str);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1961048848:
                if (implMethodName.equals("lambda$run$5e1e647b$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1305368322:
                if (implMethodName.equals("lambda$createVertexes$9bd97b2a$1")) {
                    z = 8;
                    break;
                }
                break;
            case -636927776:
                if (implMethodName.equals("lambda$run$712dded4$1")) {
                    z = 5;
                    break;
                }
                break;
            case 2994:
                if (implMethodName.equals("_1")) {
                    z = 2;
                    break;
                }
                break;
            case 305698128:
                if (implMethodName.equals("generateID")) {
                    z = 3;
                    break;
                }
                break;
            case 917985429:
                if (implMethodName.equals("ccToRels")) {
                    z = false;
                    break;
                }
                break;
            case 1036920684:
                if (implMethodName.equals("lambda$run$aa55ec7b$1")) {
                    z = true;
                    break;
                }
                break;
            case 1234349099:
                if (implMethodName.equals("lambda$run$24a7872c$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1364769223:
                if (implMethodName.equals("lambda$run$59115a2f$1")) {
                    z = 7;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent;)Ljava/util/Iterator;")) {
                    SparkCreateMergeRels sparkCreateMergeRels = (SparkCreateMergeRels) serializedLambda.getCapturedArg(0);
                    return sparkCreateMergeRels::ccToRels;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Lscala/Tuple2;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str5 -> {
                        OafEntity oafEntity = (OafEntity) OBJECT_MAPPER.readValue(str5, cls);
                        return new Tuple2(oafEntity.getId(), oafEntity);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Tuple2") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0._1();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapGroupsFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/util/Iterator;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/util/Iterator;)Leu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent;")) {
                    SparkCreateMergeRels sparkCreateMergeRels2 = (SparkCreateMergeRels) serializedLambda.getCapturedArg(0);
                    return sparkCreateMergeRels2::generateID;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Relation;)Lorg/apache/spark/graphx/Edge;")) {
                    return relation -> {
                        return new Edge(hash(relation.getSource()), hash(relation.getTarget()), relation.getRelClass());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/pace/config/DedupConfig;Leu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent;)Ljava/util/Iterator;")) {
                    SparkCreateMergeRels sparkCreateMergeRels3 = (SparkCreateMergeRels) serializedLambda.getCapturedArg(0);
                    DedupConfig dedupConfig = (DedupConfig) serializedLambda.getCapturedArg(1);
                    return connectedComponent2 -> {
                        return ccToMergeRel(connectedComponent2, dedupConfig);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lscala/Tuple2;")) {
                    return tuple2 -> {
                        return new Tuple2(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._2())._2());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/oa/dedup/graph/ConnectedComponent;)Ljava/lang/Boolean;")) {
                    return connectedComponent -> {
                        return Boolean.valueOf(connectedComponent.getIds().size() > 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/pace/config/DedupConfig;Ljava/lang/String;)Lscala/Tuple2;")) {
                    DedupConfig dedupConfig2 = (DedupConfig) serializedLambda.getCapturedArg(0);
                    return str3 -> {
                        String jPathString = MapDocumentUtil.getJPathString(dedupConfig2.getWf().getIdPath(), str3);
                        return new Tuple2(Long.valueOf(hash(jPathString)), jPathString);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
