/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkReporter;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;
import eu.dnetlib.pace.util.Reporter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;

public class Deduper
implements scala.Serializable {
    private static final Log log = LogFactory.getLog(Deduper.class);

    public static JavaPairRDD<String, String> dedup(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
        Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
        JavaPairRDD<String, MapDocument> mapDocs = Deduper.mapToVertexes(context, entities, config);
        JavaPairRDD<String, Iterable<MapDocument>> blocks = Deduper.createBlocks(context, mapDocs, config);
        return Deduper.computeRelations(context, blocks, config);
    }

    public static JavaPairRDD<String, String> computeRelations(JavaSparkContext context, JavaPairRDD<String, Iterable<MapDocument>> blocks, DedupConfig config) {
        Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
        return blocks.flatMapToPair((PairFlatMapFunction & Serializable)it -> {
            SparkReporter reporter = new SparkReporter(accumulators);
            new BlockProcessor(config).process((String)it._1(), (Iterable)it._2(), (Reporter)reporter);
            return reporter.getRelations().iterator();
        }).mapToPair((PairFunction & Serializable)item -> new Tuple2((Object)((String)item._1() + (String)item._2()), item)).reduceByKey((Function2 & Serializable)(a, b) -> a).mapToPair(Tuple2::_2);
    }

    public static JavaPairRDD<String, Iterable<MapDocument>> createBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
        return mapDocs.reduceByKey((Function2 & Serializable)(a, b) -> a).map(Tuple2::_2).flatMapToPair((PairFlatMapFunction & Serializable)a -> DedupUtility.getGroupingKeys(config, a).stream().map(it -> new Tuple2(it, a)).collect(Collectors.toList()).iterator()).groupByKey();
    }

    public static JavaPairRDD<String, List<MapDocument>> createsortedBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
        String of = config.getWf().getOrderField();
        int maxQueueSize = config.getWf().getGroupMaxSize();
        return mapDocs.reduceByKey((Function2 & Serializable)(a, b) -> a).map(Tuple2::_2).flatMapToPair((PairFlatMapFunction & Serializable)a -> DedupUtility.getGroupingKeys(config, a).stream().map(it -> {
            ArrayList<MapDocument> tmp = new ArrayList<MapDocument>();
            tmp.add((MapDocument)a);
            return new Tuple2(it, tmp);
        }).collect(Collectors.toList()).iterator()).reduceByKey((Function2 & Serializable)(v1, v2) -> {
            v1.addAll(v2);
            v1.sort(Comparator.comparing(a -> ((Field)a.getFieldMap().get(of)).stringValue()));
            if (v1.size() > maxQueueSize) {
                return new ArrayList(v1.subList(0, maxQueueSize));
            }
            return v1;
        });
    }

    public static JavaPairRDD<String, MapDocument> mapToVertexes(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
        return entities.mapToPair((PairFunction & Serializable)s -> {
            MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath((DedupConfig)config, (String)s);
            return new Tuple2((Object)mapDocument.getIdentifier(), (Object)mapDocument);
        });
    }

    public static JavaPairRDD<String, String> computeRelations2(JavaSparkContext context, JavaPairRDD<String, List<MapDocument>> blocks, DedupConfig config) {
        Map<String, LongAccumulator> accumulators = DedupUtility.constructAccumulator(config, context.sc());
        return blocks.flatMapToPair((PairFlatMapFunction & Serializable)it -> {
            try {
                SparkReporter reporter = new SparkReporter(accumulators);
                new BlockProcessor(config).processSortedBlock((String)it._1(), (List)it._2(), (Reporter)reporter);
                return reporter.getRelations().iterator();
            }
            catch (Exception e) {
                throw new RuntimeException(((MapDocument)((List)it._2()).get(0)).getIdentifier(), e);
            }
        }).mapToPair((PairFunction & Serializable)item -> new Tuple2((Object)((String)item._1() + (String)item._2()), item)).reduceByKey((Function2 & Serializable)(a, b) -> a).mapToPair(Tuple2::_2);
    }
}

