/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.opencitations;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.opencitations.CreateActionSetSparkJob;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class MapOCIdsInPids
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(CreateActionSetSparkJob.class);
    private static final String DELIMITER = ",";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws IOException, ParseException {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)Objects.requireNonNull(MapOCIdsInPids.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/opencitations/remap_parameters.json"))));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("inputPath");
        log.info("inputPath {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath {}", (Object)outputPath);
        String nameNode = parser.get("nameNode");
        log.info("nameNode {}", (Object)nameNode);
        MapOCIdsInPids.unzipCorrespondenceFile(inputPath, nameNode);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> MapOCIdsInPids.mapIdentifiers(spark, inputPath, outputPath));
    }

    private static void unzipCorrespondenceFile(String inputPath, String hdfsNameNode) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsNameNode);
        Path path = new Path(inputPath + "/correspondence/omid.zip");
        FileSystem fileSystem = FileSystem.get((Configuration)conf);
        FSDataInputStream project_zip = fileSystem.open(path);
        try (ZipInputStream zis = new ZipInputStream((InputStream)project_zip);){
            ZipEntry entry = null;
            while ((entry = zis.getNextEntry()) != null) {
                if (entry.isDirectory()) continue;
                String fileName = entry.getName();
                byte[] buffer = new byte[1024];
                FSDataOutputStream out = fileSystem.create(new Path(inputPath + "/correspondence/omid.csv"));
                try {
                    int count;
                    while ((count = zis.read(buffer, 0, buffer.length)) != -1) {
                        out.write(buffer, 0, count);
                    }
                }
                finally {
                    if (out == null) continue;
                    out.close();
                }
            }
        }
    }

    private static void mapIdentifiers(SparkSession spark, String inputPath, String outputPath) {
        Dataset coci = spark.read().textFile(inputPath + "/JSON").map((MapFunction & Serializable)value -> (COCI)OBJECT_MAPPER.readValue(value, COCI.class), Encoders.bean(COCI.class));
        Dataset correspondenceData = spark.read().format("csv").option("sep", DELIMITER).option("inferSchema", "true").option("header", "true").option("quotes", "\"").load(inputPath + "/correspondence/omid.csv").repartition(5000).flatMap((FlatMapFunction & Serializable)r -> {
            String ocIdentifier = (String)r.getAs("omid");
            String[] correspondentIdentifiers = ((String)r.getAs("id")).split(" ");
            return Arrays.stream(correspondentIdentifiers).map(ci -> new Tuple2((Object)ocIdentifier, ci)).collect(Collectors.toList()).iterator();
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        Dataset mappedCitingDataset = coci.joinWith(correspondenceData, coci.col("citing").equalTo((Object)correspondenceData.col("_1"))).map((MapFunction & Serializable)t2 -> {
            String correspondent = (String)((Tuple2)t2._2())._2();
            ((COCI)t2._1()).setCiting_pid(correspondent.substring(0, correspondent.indexOf(":")));
            ((COCI)t2._1()).setCiting(correspondent.substring(correspondent.indexOf(":") + 1));
            return (COCI)t2._1();
        }, Encoders.bean(COCI.class));
        mappedCitingDataset.joinWith(correspondenceData, mappedCitingDataset.col("cited").equalTo((Object)correspondenceData.col("_1"))).map((MapFunction & Serializable)t2 -> {
            String correspondent = (String)((Tuple2)t2._2())._2();
            ((COCI)t2._1()).setCited_pid(correspondent.substring(0, correspondent.indexOf(":")));
            ((COCI)t2._1()).setCited(correspondent.substring(correspondent.indexOf(":") + 1));
            return (COCI)t2._1();
        }, Encoders.bean(COCI.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(outputPath);
    }
}

