/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.sx.bio.ebi;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.CollectionUtils$;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf$;
import eu.dnetlib.dhp.sx.bio.pubmed.PMArticle;
import eu.dnetlib.dhp.sx.bio.pubmed.PMAuthor;
import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal;
import eu.dnetlib.dhp.sx.bio.pubmed.PMParser;
import eu.dnetlib.dhp.sx.bio.pubmed.PubMedToOaf$;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class SparkCreateBaselineDataFrame$ {
    public static SparkCreateBaselineDataFrame$ MODULE$;
    private final Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator;

    static {
        new SparkCreateBaselineDataFrame$();
    }

    public List<Tuple2<String, String>> requestBaseLineUpdatePage(String maxFile) {
        String data = this.requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/");
        List result = new StringOps(Predef$.MODULE$.augmentString(data)).linesWithSeparators().map((Function1 & Serializable & scala.Serializable)l -> new StringOps(Predef$.MODULE$.augmentString(l)).stripLineEnd()).filter((Function1 & Serializable & scala.Serializable)l -> BoxesRunTime.boxToBoolean((boolean)l.startsWith("<a href="))).map((Function1 & Serializable & scala.Serializable)l -> {
            int end = l.lastIndexOf("\">");
            int start = l.indexOf("<a href=\"");
            if (start >= 0 && end > start) {
                return l.substring(start + 9, end - start);
            }
            return "";
        }).filter((Function1 & Serializable & scala.Serializable)s -> BoxesRunTime.boxToBoolean((boolean)s.endsWith(".gz"))).filter((Function1 & Serializable & scala.Serializable)s -> BoxesRunTime.boxToBoolean((boolean)SparkCreateBaselineDataFrame$.$anonfun$requestBaseLineUpdatePage$5(maxFile, s))).map((Function1 & Serializable & scala.Serializable)s -> new Tuple2(s, (Object)new StringBuilder(48).append("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/").append((String)s).toString())).toList();
        return result;
    }

    public InputStream downloadBaselinePart(String url) {
        HttpGet r = new HttpGet(url);
        int timeout = 60;
        RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
        CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
        CloseableHttpResponse response = client.execute((HttpUriRequest)r);
        Predef$.MODULE$.println((Object)new StringBuilder(24).append("get response with status").append(response.getStatusLine().getStatusCode()).toString());
        return response.getEntity().getContent();
    }

    public String requestPage(String url) {
        String string;
        block9: {
            String string2;
            block8: {
                HttpGet r = new HttpGet(url);
                int timeout = 60;
                RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
                try (CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();){
                    int tries = 4;
                    while (tries > 0) {
                        Predef$.MODULE$.println((Object)new StringBuilder(11).append("requesting ").append(r.getURI()).toString());
                        try {
                            CloseableHttpResponse response = client.execute((HttpUriRequest)r);
                            Predef$.MODULE$.println((Object)new StringBuilder(24).append("get response with status").append(response.getStatusLine().getStatusCode()).toString());
                            if (response.getStatusLine().getStatusCode() > 400) {
                                --tries;
                                continue;
                            }
                            string2 = IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)Charset.defaultCharset());
                            break block8;
                        }
                        catch (Throwable e) {
                            Predef$.MODULE$.println((Object)new StringBuilder(20).append("Error on requesting ").append(r.getURI()).toString());
                            e.printStackTrace();
                            --tries;
                        }
                    }
                    string = "";
                    break block9;
                }
            }
            return string2;
        }
        return string;
    }

    public void downloadBaseLineUpdate(String baselinePath, String hdfsServerUri) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsServerUri);
        FileSystem fs = FileSystem.get((Configuration)conf);
        Path p = new Path(baselinePath);
        RemoteIterator files = fs.listFiles(p, false);
        String max_file = "";
        while (files.hasNext()) {
            LocatedFileStatus c = (LocatedFileStatus)files.next();
            String data = c.getPath().toString();
            String fileName = data.substring(data.lastIndexOf("/") + 1);
            if (!new StringOps(Predef$.MODULE$.augmentString(fileName)).$greater((Object)max_file)) continue;
            max_file = fileName;
        }
        List<Tuple2<String, String>> files_to_download = this.requestBaseLineUpdatePage(max_file);
        files_to_download.foreach((Function1 & Serializable & scala.Serializable)u -> {
            SparkCreateBaselineDataFrame$.$anonfun$downloadBaseLineUpdate$1(baselinePath, fs, u);
            return BoxedUnit.UNIT;
        });
    }

    public Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator() {
        return this.pmArticleAggregator;
    }

    public void main(String[] args) {
        SparkConf conf = new SparkConf();
        Logger log = LoggerFactory.getLogger(this.getClass());
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkEBILinksToOaf$.MODULE$.getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json"), (Charset)Charset.defaultCharset()));
        parser.parseArgument(args);
        String isLookupUrl = parser.get("isLookupUrl");
        log.info("isLookupUrl: {}", new Object[]{isLookupUrl});
        String workingPath = parser.get("workingPath");
        log.info("workingPath: {}", new Object[]{workingPath});
        String targetPath = parser.get("targetPath");
        log.info("targetPath: {}", new Object[]{targetPath});
        String hdfsServerUri = parser.get("hdfsServerUri");
        log.info("hdfsServerUri: {}", new Object[]{targetPath});
        String skipUpdate = parser.get("skipUpdate");
        log.info("skipUpdate: {}", new Object[]{skipUpdate});
        ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookupService);
        SparkSession spark = SparkSession$.MODULE$.builder().config(conf).appName(SparkEBILinksToOaf$.MODULE$.getClass().getSimpleName()).master(parser.get("master")).getOrCreate();
        SparkContext sc = spark.sparkContext();
        Encoder PMEncoder = Encoders$.MODULE$.kryo(PMArticle.class);
        Encoder PMJEncoder = Encoders$.MODULE$.kryo(PMJournal.class);
        Encoder PMAEncoder = Encoders$.MODULE$.kryo(PMAuthor.class);
        Encoder resultEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        if (!"true".equalsIgnoreCase(skipUpdate)) {
            this.downloadBaseLineUpdate(new StringBuilder(9).append(workingPath).append("/baseline").toString(), hdfsServerUri);
            RDD k = sc.wholeTextFiles(new StringBuilder(9).append(workingPath).append("/baseline").toString(), 2000);
            XMLInputFactory inputFactory = XMLInputFactory.newInstance();
            Dataset ds = spark.createDataset(k.filter((Function1 & Serializable & scala.Serializable)i -> BoxesRunTime.boxToBoolean((boolean)SparkCreateBaselineDataFrame$.$anonfun$main$1(i))).flatMap((Function1 & Serializable & scala.Serializable)i -> {
                XMLEventReader xml = inputFactory.createXMLEventReader(new ByteArrayInputStream(((String)i._2()).getBytes()));
                return new PMParser(xml);
            }, ClassTag$.MODULE$.apply(PMArticle.class)), PMEncoder);
            ds.map((Function1 & Serializable & scala.Serializable)p -> new Tuple2((Object)p.getPmid(), p), Encoders$.MODULE$.tuple(Encoders$.MODULE$.STRING(), PMEncoder)).groupByKey((Function1 & Serializable & scala.Serializable)x$1 -> (String)x$1._1(), spark.implicits().newStringEncoder()).agg(this.pmArticleAggregator().toColumn()).map((Function1 & Serializable & scala.Serializable)p -> (PMArticle)p._2(), PMEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(17).append(workingPath).append("/baseline_dataset").toString());
        }
        Dataset exported_dataset = spark.read().load(new StringBuilder(17).append(workingPath).append("/baseline_dataset").toString()).as(PMEncoder);
        CollectionUtils$.MODULE$.saveDataset((Dataset<Oaf>)exported_dataset.map((Function1 & Serializable & scala.Serializable)a -> PubMedToOaf$.MODULE$.convert((PMArticle)a, vocabularies), resultEncoder).as(resultEncoder).filter((Function1 & Serializable & scala.Serializable)p -> BoxesRunTime.boxToBoolean((boolean)SparkCreateBaselineDataFrame$.$anonfun$main$7(p))), targetPath);
    }

    public static final /* synthetic */ boolean $anonfun$requestBaseLineUpdatePage$5(String maxFile$1, String s) {
        return new StringOps(Predef$.MODULE$.augmentString(s)).$greater((Object)maxFile$1);
    }

    public static final /* synthetic */ void $anonfun$downloadBaseLineUpdate$1(String baselinePath$1, FileSystem fs$1, Tuple2 u) {
        Path hdfsWritePath = new Path(new StringBuilder(1).append(baselinePath$1).append("/").append(u._1()).toString());
        FSDataOutputStream fsDataOutputStream = fs$1.create(hdfsWritePath, true);
        InputStream i = MODULE$.downloadBaselinePart((String)u._2());
        IOUtils.copy((InputStream)i, (OutputStream)fsDataOutputStream);
        Predef$.MODULE$.println((Object)new StringBuilder(18).append("Downloaded ").append(u._2()).append(" into ").append(baselinePath$1).append("/").append(u._1()).toString());
        fsDataOutputStream.close();
    }

    public static final /* synthetic */ boolean $anonfun$main$1(Tuple2 i) {
        return ((String)i._1()).endsWith(".gz");
    }

    public static final /* synthetic */ boolean $anonfun$main$7(Oaf p) {
        return p != null;
    }

    private SparkCreateBaselineDataFrame$() {
        MODULE$ = this;
        this.pmArticleAggregator = new Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle>(){

            public PMArticle zero() {
                return new PMArticle();
            }

            public PMArticle reduce(PMArticle b, Tuple2<String, PMArticle> a) {
                if (b != null && b.getPmid() != null) {
                    return b;
                }
                return (PMArticle)a._2();
            }

            public PMArticle merge(PMArticle b1, PMArticle b2) {
                if (b1 != null && b1.getPmid() != null) {
                    return b1;
                }
                return b2;
            }

            public PMArticle finish(PMArticle reduction) {
                return reduction;
            }

            public Encoder<PMArticle> bufferEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }

            public Encoder<PMArticle> outputEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }
        };
    }
}

