package eu.dnetlib.doiboost.orcid;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import java.lang.invoke.SerializedLambda;
import java.text.SimpleDateFormat;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors.class */
public class SparkDownloadOrcidAuthors {
    static Logger logger = LoggerFactory.getLogger(SparkDownloadOrcidAuthors.class);
    static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(SparkDownloadOrcidAuthors.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("workingPath");
        logger.info("workingPath: {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        logger.info("outputPath: {}", str2);
        String str3 = argumentApplicationParser.get("token");
        String str4 = argumentApplicationParser.get("lambdaFileName");
        logger.info("lambdaFileName: {}", str4);
        String str5 = argumentApplicationParser.get("hdfsServerUri");
        SparkSessionSupport.runWithSparkSession(new SparkConf(), bool, sparkSession -> {
            String readFromTextFile = HDFSUtil.readFromTextFile(str5, str, "last_update.txt");
            logger.info("lastUpdate: {}", readFromTextFile);
            if (StringUtils.isBlank(readFromTextFile)) {
                throw new RuntimeException("last update info not found");
            }
            JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
            LongAccumulator longAccumulator = sparkSession.sparkContext().longAccumulator("parsed_records");
            LongAccumulator longAccumulator2 = sparkSession.sparkContext().longAccumulator("to_download_records");
            LongAccumulator longAccumulator3 = sparkSession.sparkContext().longAccumulator("downloaded_records");
            LongAccumulator longAccumulator4 = sparkSession.sparkContext().longAccumulator("error_HTTP_403");
            LongAccumulator longAccumulator5 = sparkSession.sparkContext().longAccumulator("error_HTTP_404");
            LongAccumulator longAccumulator6 = sparkSession.sparkContext().longAccumulator("error_HTTP_409");
            LongAccumulator longAccumulator7 = sparkSession.sparkContext().longAccumulator("error_HTTP_503");
            LongAccumulator longAccumulator8 = sparkSession.sparkContext().longAccumulator("error_HTTP_525");
            LongAccumulator longAccumulator9 = sparkSession.sparkContext().longAccumulator("error_HTTP_Generic");
            logger.info("Retrieving data from lamda sequence file");
            JavaPairRDD sequenceFile = fromSparkContext.sequenceFile(str + str4, Text.class, Text.class);
            logger.info("Data retrieved: " + sequenceFile.count());
            Function function = tuple2 -> {
                String text = ((Text) tuple2._1()).toString();
                String text2 = ((Text) tuple2._2()).toString();
                longAccumulator.add(1L);
                if (!isModified(text, text2, readFromTextFile)) {
                    return false;
                }
                longAccumulator2.add(1L);
                return true;
            };
            Function function2 = tuple22 -> {
                String text = ((Text) tuple22._1()).toString();
                String text2 = ((Text) tuple22._2()).toString();
                DownloadedRecordData downloadedRecordData = new DownloadedRecordData();
                downloadedRecordData.setOrcidId(text);
                downloadedRecordData.setLastModifiedDate(text2);
                CloseableHttpClient createDefault = HttpClients.createDefault();
                HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + text + "/record");
                httpGet.addHeader("Accept", "application/vnd.orcid+xml");
                httpGet.addHeader("Authorization", String.format("Bearer %s", str3));
                long currentTimeMillis = System.currentTimeMillis();
                CloseableHttpResponse execute = createDefault.execute(httpGet);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 < 1000) {
                    Thread.sleep(1000 - currentTimeMillis2);
                }
                int statusCode = execute.getStatusLine().getStatusCode();
                downloadedRecordData.setStatusCode(statusCode);
                if (statusCode == 200) {
                    longAccumulator3.add(1L);
                    downloadedRecordData.setCompressedData(ArgumentApplicationParser.compressArgument(IOUtils.toString(execute.getEntity().getContent())));
                    createDefault.close();
                    return downloadedRecordData.toTuple2();
                }
                switch (statusCode) {
                    case 403:
                        longAccumulator4.add(1L);
                    case 404:
                        longAccumulator5.add(1L);
                    case 409:
                        longAccumulator6.add(1L);
                    case 503:
                        longAccumulator7.add(1L);
                    case 525:
                        longAccumulator8.add(1L);
                        break;
                }
                longAccumulator9.add(1L);
                return downloadedRecordData.toTuple2();
            };
            fromSparkContext.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
            logger.info("Start execution ...");
            JavaPairRDD filter = sequenceFile.filter(function);
            logger.info("Authors modified count: " + filter.count());
            logger.info("Start downloading ...");
            filter.repartition(100).map(function2).mapToPair(tuple23 -> {
                return new Tuple2(new Text((String) tuple23._1()), new Text((String) tuple23._2()));
            }).saveAsNewAPIHadoopFile(str.concat(str2), Text.class, Text.class, SequenceFileOutputFormat.class, fromSparkContext.hadoopConfiguration());
            logger.info("parsedRecordsAcc: " + longAccumulator.value().toString());
            logger.info("modifiedRecordsAcc: " + longAccumulator2.value().toString());
            logger.info("downloadedRecordsAcc: " + longAccumulator3.value().toString());
            logger.info("errorHTTP403Acc: " + longAccumulator4.value().toString());
            logger.info("errorHTTP404Acc: " + longAccumulator5.value().toString());
            logger.info("errorHTTP409Acc: " + longAccumulator6.value().toString());
            logger.info("errorHTTP503Acc: " + longAccumulator7.value().toString());
            logger.info("errorHTTP525Acc: " + longAccumulator8.value().toString());
            logger.info("errorHTTPGenericAcc: " + longAccumulator9.value().toString());
        });
    }

    public static boolean isModified(String str, String str2, String str3) {
        try {
            if (str2.equals("last_modified")) {
                return false;
            }
            if (str2.length() != 19) {
                str2 = str2.substring(0, 19);
            }
            return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str2).after(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(str3.length() != 19 ? str3.substring(0, 19) : str3));
        } catch (Exception e) {
            throw new RuntimeException("[" + str + "] modifiedDate <" + str2 + "> lastUpdate <" + str3 + "> Parsing date: " + e.getMessage());
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -340902887:
                if (implMethodName.equals("lambda$null$31c43dd0$1")) {
                    z = 2;
                    break;
                }
                break;
            case 11295434:
                if (implMethodName.equals("lambda$null$2da4c869$1")) {
                    z = true;
                    break;
                }
                break;
            case 181416437:
                if (implMethodName.equals("lambda$null$9bc927bf$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lscala/Tuple2;)Lscala/Tuple2;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    LongAccumulator longAccumulator = (LongAccumulator) serializedLambda.getCapturedArg(1);
                    LongAccumulator longAccumulator2 = (LongAccumulator) serializedLambda.getCapturedArg(2);
                    LongAccumulator longAccumulator3 = (LongAccumulator) serializedLambda.getCapturedArg(3);
                    LongAccumulator longAccumulator4 = (LongAccumulator) serializedLambda.getCapturedArg(4);
                    LongAccumulator longAccumulator5 = (LongAccumulator) serializedLambda.getCapturedArg(5);
                    LongAccumulator longAccumulator6 = (LongAccumulator) serializedLambda.getCapturedArg(6);
                    LongAccumulator longAccumulator7 = (LongAccumulator) serializedLambda.getCapturedArg(7);
                    return tuple22 -> {
                        String text = ((Text) tuple22._1()).toString();
                        String text2 = ((Text) tuple22._2()).toString();
                        DownloadedRecordData downloadedRecordData = new DownloadedRecordData();
                        downloadedRecordData.setOrcidId(text);
                        downloadedRecordData.setLastModifiedDate(text2);
                        CloseableHttpClient createDefault = HttpClients.createDefault();
                        HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + text + "/record");
                        httpGet.addHeader("Accept", "application/vnd.orcid+xml");
                        httpGet.addHeader("Authorization", String.format("Bearer %s", str));
                        long currentTimeMillis = System.currentTimeMillis();
                        CloseableHttpResponse execute = createDefault.execute(httpGet);
                        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                        if (currentTimeMillis2 < 1000) {
                            Thread.sleep(1000 - currentTimeMillis2);
                        }
                        int statusCode = execute.getStatusLine().getStatusCode();
                        downloadedRecordData.setStatusCode(statusCode);
                        if (statusCode == 200) {
                            longAccumulator7.add(1L);
                            downloadedRecordData.setCompressedData(ArgumentApplicationParser.compressArgument(IOUtils.toString(execute.getEntity().getContent())));
                            createDefault.close();
                            return downloadedRecordData.toTuple2();
                        }
                        switch (statusCode) {
                            case 403:
                                longAccumulator.add(1L);
                            case 404:
                                longAccumulator2.add(1L);
                            case 409:
                                longAccumulator3.add(1L);
                            case 503:
                                longAccumulator4.add(1L);
                            case 525:
                                longAccumulator5.add(1L);
                                break;
                        }
                        longAccumulator6.add(1L);
                        return downloadedRecordData.toTuple2();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lscala/Tuple2;")) {
                    return tuple23 -> {
                        return new Tuple2(new Text((String) tuple23._1()), new Text((String) tuple23._2()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/doiboost/orcid/SparkDownloadOrcidAuthors") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/util/LongAccumulator;Ljava/lang/String;Lorg/apache/spark/util/LongAccumulator;Lscala/Tuple2;)Ljava/lang/Boolean;")) {
                    LongAccumulator longAccumulator8 = (LongAccumulator) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    LongAccumulator longAccumulator9 = (LongAccumulator) serializedLambda.getCapturedArg(2);
                    return tuple2 -> {
                        String text = ((Text) tuple2._1()).toString();
                        String text2 = ((Text) tuple2._2()).toString();
                        longAccumulator8.add(1L);
                        if (!isModified(text, text2, str2)) {
                            return false;
                        }
                        longAccumulator9.add(1L);
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
