/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.opencitations;

import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.opencitations.model.COCI;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadCOCI
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(ReadCOCI.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)ReadCOCI.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/opencitations/input_readcoci_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String hdfsNameNode = parser.get("hdfsNameNode");
        log.info("hdfsNameNode {}", (Object)hdfsNameNode);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String workingPath = parser.get("inputPath");
        log.info("workingPath {}", (Object)workingPath);
        SparkConf sconf = new SparkConf();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsNameNode);
        FileSystem fileSystem = FileSystem.get((Configuration)conf);
        String delimiter = Optional.ofNullable(parser.get("delimiter")).orElse(",");
        SparkSessionSupport.runWithSparkSession((SparkConf)sconf, (Boolean)isSparkSessionManaged, spark -> ReadCOCI.doRead(spark, workingPath, fileSystem, outputPath, delimiter));
    }

    private static void doRead(SparkSession spark, String workingPath, FileSystem fileSystem, String outputPath, String delimiter) throws IOException {
        RemoteIterator fileStatusListIterator = fileSystem.listFiles(new Path(workingPath), true);
        while (fileStatusListIterator.hasNext()) {
            LocatedFileStatus fileStatus = (LocatedFileStatus)fileStatusListIterator.next();
            log.info("extracting file {}", (Object)fileStatus.getPath().toString());
            Dataset cociData = spark.read().format("csv").option("sep", delimiter).option("inferSchema", "true").option("header", "true").option("quotes", "\"").load(fileStatus.getPath().toString()).repartition(100);
            cociData.map((MapFunction & Serializable)row -> {
                COCI coci = new COCI();
                coci.setCiting(row.getString(1));
                coci.setCited(row.getString(2));
                coci.setOci(row.getString(0));
                return coci;
            }, Encoders.bean(COCI.class)).filter((FilterFunction & Serializable)c -> c != null).write().mode(SaveMode.Append).option("compression", "gzip").json(outputPath);
            fileSystem.delete(fileStatus.getPath());
        }
    }
}

