/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.collection.CollectorException;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData;
import eu.dnetlib.doiboost.orcid.util.DownloadsReport;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import eu.dnetlib.doiboost.orcid.util.MultiAttemptsHttpConnector;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkDownloadOrcidAuthors {
    static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class);
    static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkDownloadOrcidAuthors.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String workingPath = parser.get("workingPath");
        logger.info("workingPath: {}", (Object)workingPath);
        String outputPath = parser.get("outputPath");
        logger.info("outputPath: {}", (Object)outputPath);
        String token = parser.get("token");
        String lambdaFileName = parser.get("lambdaFileName");
        logger.info("lambdaFileName: {}", (Object)lambdaFileName);
        String hdfsServerUri = parser.get("hdfsServerUri");
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            String lastUpdate = HDFSUtil.readFromTextFile(hdfsServerUri, workingPath, "last_update.txt");
            logger.info("lastUpdate: {}", (Object)lastUpdate);
            if (StringUtils.isBlank((CharSequence)lastUpdate)) {
                throw new FileNotFoundException("last update info not found");
            }
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            LongAccumulator parsedRecordsAcc = spark.sparkContext().longAccumulator("parsed_records");
            LongAccumulator modifiedRecordsAcc = spark.sparkContext().longAccumulator("to_download_records");
            LongAccumulator downloadedRecordsAcc = spark.sparkContext().longAccumulator("downloaded_records");
            LongAccumulator errorsAcc = spark.sparkContext().longAccumulator("errors");
            String lambdaFilePath = workingPath + lambdaFileName;
            logger.info("Retrieving data from lamda sequence file: " + lambdaFilePath);
            JavaPairRDD lamdaFileRDD = sc.sequenceFile(lambdaFilePath, Text.class, Text.class);
            long lamdaFileRDDCount = lamdaFileRDD.count();
            logger.info("Data retrieved: {}", (Object)lamdaFileRDDCount);
            Function & Serializable isModifiedAfterFilter = (Function & Serializable)data -> {
                String orcidId = ((Text)data._1()).toString();
                String lastModifiedDate = ((Text)data._2()).toString();
                parsedRecordsAcc.add(1L);
                if (SparkDownloadOrcidAuthors.isModified(orcidId, lastModifiedDate, lastUpdate)) {
                    modifiedRecordsAcc.add(1L);
                    return true;
                }
                return false;
            };
            Function & Serializable downloadRecordFn = (Function & Serializable)data -> {
                String orcidId = ((Text)data._1()).toString();
                String lastModifiedDate = ((Text)data._2()).toString();
                DownloadedRecordData downloaded = new DownloadedRecordData();
                downloaded.setOrcidId(orcidId);
                downloaded.setLastModifiedDate(lastModifiedDate);
                HttpClientParams clientParams = new HttpClientParams();
                MultiAttemptsHttpConnector httpConnector = new MultiAttemptsHttpConnector(clientParams);
                httpConnector.setAuthMethod("BEARER");
                httpConnector.setAcceptHeaderValue("application/vnd.orcid+xml");
                httpConnector.setAuthToken(token);
                String apiUrl = "https://api.orcid.org/v3.0/" + orcidId + "/record";
                DownloadsReport report = new DownloadsReport();
                long startReq = System.currentTimeMillis();
                boolean downloadCompleted = false;
                String record = "";
                try {
                    record = httpConnector.getInputSource(apiUrl, report);
                    downloadCompleted = true;
                }
                catch (CollectorException ce) {
                    if (!report.isEmpty()) {
                        int errCode = (Integer)report.keySet().stream().findFirst().get();
                        report.forEach((k, v) -> logger.error(k + " " + v));
                        downloaded.setStatusCode(errCode);
                    } else {
                        downloaded.setStatusCode(-4);
                    }
                    errorsAcc.add(1L);
                }
                long endReq = System.currentTimeMillis();
                long reqTime = endReq - startReq;
                if (reqTime < 1000L) {
                    Thread.sleep(1000L - reqTime);
                }
                if (downloadCompleted) {
                    downloaded.setStatusCode(200);
                    downloadedRecordsAcc.add(1L);
                    downloaded.setCompressedData(ArgumentApplicationParser.compressArgument((String)record));
                }
                return downloaded.toTuple2();
            };
            sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
            logger.info("Start execution ...");
            JavaPairRDD authorsModifiedRDD = lamdaFileRDD.filter((Function)isModifiedAfterFilter);
            long authorsModifiedCount = authorsModifiedRDD.count();
            logger.info("Authors modified count: {}", (Object)authorsModifiedCount);
            JavaPairRDD pairRDD = authorsModifiedRDD.repartition(100).map((Function)downloadRecordFn).mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)new Text((String)t._1()), (Object)new Text((String)t._2())));
            SparkDownloadOrcidAuthors.saveAsSequenceFile(workingPath, outputPath, sc, (JavaPairRDD<Text, Text>)pairRDD);
            logger.info("parsedRecordsAcc: {}", (Object)parsedRecordsAcc.value());
            logger.info("modifiedRecordsAcc: {}", (Object)modifiedRecordsAcc.value());
            logger.info("downloadedRecordsAcc: {}", (Object)downloadedRecordsAcc.value());
            logger.info("errorsAcc: {}", (Object)errorsAcc.value());
        });
    }

    private static void saveAsSequenceFile(String workingPath, String outputPath, JavaSparkContext sc, JavaPairRDD<Text, Text> pairRDD) {
        pairRDD.saveAsNewAPIHadoopFile(workingPath.concat(outputPath), Text.class, Text.class, SequenceFileOutputFormat.class, sc.hadoopConfiguration());
    }

    public static boolean isModified(String orcidId, String modifiedDate, String lastUpdate) {
        Date lastUpdateDt;
        Date modifiedDateDt;
        String lastUpdateRedux = "";
        try {
            if (modifiedDate.equals("last_modified")) {
                return false;
            }
            if (modifiedDate.length() != 19) {
                modifiedDate = modifiedDate.substring(0, 19);
            }
            lastUpdateRedux = lastUpdate.length() != 19 ? lastUpdate.substring(0, 19) : lastUpdate;
            modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
            lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdateRedux);
        }
        catch (Exception e) {
            throw new RuntimeException("[" + orcidId + "] modifiedDate <" + modifiedDate + "> lastUpdate <" + lastUpdate + "> Parsing date: " + e.getMessage());
        }
        return modifiedDateDt.after(lastUpdateDt);
    }
}

