/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.datacite;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.datacite.DataciteAPIImporter;
import eu.dnetlib.dhp.datacite.DataciteType;
import eu.dnetlib.dhp.datacite.ImportDatacite$;
import eu.dnetlib.dhp.datacite.ImportDatacite$$anon$1$;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions$;
import org.json4s.AsJsonInput$;
import org.json4s.DefaultFormats$;
import org.json4s.ExtractableJsonAstNode$;
import org.json4s.Formats;
import org.json4s.JValue;
import org.json4s.MonadicJValue$;
import org.json4s.jackson.JsonMethods$;
import org.json4s.package$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.StringOps$;
import scala.collection.immutable.Seq;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.reflect.Manifest;
import scala.reflect.ManifestFactory$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;
import scala.runtime.ScalaRunTime$;

public final class ImportDatacite$ {
    public static final ImportDatacite$ MODULE$ = new ImportDatacite$();
    private static final Logger log = LoggerFactory.getLogger(MODULE$.getClass());

    public Logger log() {
        return log;
    }

    public DataciteType convertAPIStringToDataciteItem(String input) {
        LazyRef formats$lzy = new LazyRef();
        LazyRef json$lzy = new LazyRef();
        String doi = ((String)ExtractableJsonAstNode$.MODULE$.extract$extension(package$.MODULE$.jvalue2extractable(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(ImportDatacite$.json$1(json$lzy, input)), "attributes")), "doi")), (Formats)ImportDatacite$.formats$1(formats$lzy), ManifestFactory$.MODULE$.classType(String.class))).toLowerCase();
        boolean isActive = BoxesRunTime.unboxToBoolean((Object)ExtractableJsonAstNode$.MODULE$.extract$extension(package$.MODULE$.jvalue2extractable(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(ImportDatacite$.json$1(json$lzy, input)), "attributes")), "isActive")), (Formats)ImportDatacite$.formats$1(formats$lzy), (Manifest)ManifestFactory$.MODULE$.Boolean()));
        String timestamp_string = (String)ExtractableJsonAstNode$.MODULE$.extract$extension(package$.MODULE$.jvalue2extractable(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(MonadicJValue$.MODULE$.$bslash$extension(package$.MODULE$.jvalue2monadic(ImportDatacite$.json$1(json$lzy, input)), "attributes")), "updated")), (Formats)ImportDatacite$.formats$1(formats$lzy), ManifestFactory$.MODULE$.classType(String.class));
        LocalDateTime dt = LocalDateTime.parse(timestamp_string, DateTimeFormatter.ISO_DATE_TIME);
        return new DataciteType(doi, dt.toInstant(ZoneOffset.UTC).toEpochMilli() / 1000L, isActive, input);
    }

    public void main(String[] args) {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(Source$.MODULE$.fromInputStream(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/datacite/import_from_api.json"), Codec$.MODULE$.fallbackSystemCodec()).mkString());
        parser.parseArgument(args);
        String master = parser.get("master");
        String hdfsuri = parser.get("namenode");
        this.log().info("namenode is " + hdfsuri);
        String targetPath = parser.get("targetPath");
        this.log().info("targetPath is " + targetPath);
        String dataciteDump = parser.get("dataciteDumpPath");
        this.log().info("dataciteDump is " + dataciteDump);
        Path hdfsTargetPath = new Path(targetPath);
        this.log().info("hdfsTargetPath is " + hdfsTargetPath);
        int bs = parser.get("blocksize") == null ? 100 : StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(parser.get("blocksize")));
        String spkipImport = parser.get("skipImport");
        this.log().info("skipImport is " + spkipImport);
        SparkSession spark = SparkSession$.MODULE$.builder().appName(this.getClass().getSimpleName()).master(master).getOrCreate();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsuri);
        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", LocalFileSystem.class.getName());
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("ERROR");
        Aggregator<DataciteType, DataciteType, DataciteType> dataciteAggregator = new Aggregator<DataciteType, DataciteType, DataciteType>(spark){
            private final SparkSession spark$1;

            public DataciteType zero() {
                return null;
            }

            public DataciteType reduce(DataciteType a, DataciteType b) {
                if (b == null) {
                    return a;
                }
                if (a == null) {
                    return b;
                }
                if (a.timestamp() > b.timestamp()) {
                    return a;
                }
                return b;
            }

            public DataciteType merge(DataciteType a, DataciteType b) {
                return this.reduce(a, b);
            }

            public Encoder<DataciteType> bufferEncoder() {
                JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(anon.1.class.getClassLoader());
                public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                    }

                    public Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1(anon.1 $outer) {
                    }
                }
                return (Encoder)Predef$.MODULE$.implicitly((Object)this.spark$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1(null))));
            }

            public Encoder<DataciteType> outputEncoder() {
                JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(anon.1.class.getClassLoader());
                public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$2
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                    }

                    public Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$2(anon.1 $outer) {
                    }
                }
                return (Encoder)Predef$.MODULE$.implicitly((Object)this.spark$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$2(null))));
            }

            public DataciteType finish(DataciteType reduction) {
                return reduction;
            }
            {
                this.spark$1 = spark$1;
            }
        };
        JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator5$3
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
            }

            public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator5$3() {
            }
        }
        Dataset dump = spark.read().load(dataciteDump).as(spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator5$3())));
        long ts = ((Row)dump.select((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.max("timestamp")})).first()).getLong(0);
        Predef$.MODULE$.println((Object)("last Timestamp is " + ts));
        long cnt = "true".equalsIgnoreCase(spkipImport) ? 1L : this.writeSequenceFile(hdfsTargetPath, ts, conf, bs);
        Predef$.MODULE$.println((Object)("Imported from Datacite API " + cnt + " documents"));
        if (cnt > 0L) {
            RDD inputRdd = sc.sequenceFile(targetPath, Integer.TYPE, Text.class).map((Function1 & Serializable)s -> ((Text)s._2()).toString(), ClassTag$.MODULE$.apply(String.class)).map((Function1 & Serializable)s -> MODULE$.convertAPIStringToDataciteItem((String)s), ClassTag$.MODULE$.apply(DataciteType.class));
            JavaUniverse $u2 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m2 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator10$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator10$1() {
                }
            }
            spark.createDataset(inputRdd, spark.implicits().newProductEncoder(((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator10$1()))).write().mode(SaveMode.Overwrite).save(targetPath + "_dataset");
            JavaUniverse $u3 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m3 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator15$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator15$1() {
                }
            }
            Dataset ds = spark.read().load(targetPath + "_dataset").as(spark.implicits().newProductEncoder(((TypeTags)$u3).TypeTag().apply((Mirror)$m3, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator15$1())));
            JavaUniverse $u4 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m4 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator24$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator24$1() {
                }
            }
            dump.union(ds).groupByKey((Function1 & Serializable)x$1 -> x$1.doi(), spark.implicits().newStringEncoder()).agg(dataciteAggregator.toColumn()).map((Function1 & Serializable)s -> (DataciteType)s._2(), spark.implicits().newProductEncoder(((TypeTags)$u4).TypeTag().apply((Mirror)$m4, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator24$1()))).repartition(4000).write().mode(SaveMode.Overwrite).save(dataciteDump + "_updated");
            FileSystem fs = FileSystem.get((Configuration)sc.hadoopConfiguration());
            fs.delete(new Path(String.valueOf(dataciteDump)), true);
            fs.rename(new Path(dataciteDump + "_updated"), new Path(String.valueOf(dataciteDump)));
            return;
        }
    }

    private long writeSequenceFile(Path hdfsTargetPath, long timestamp, Configuration conf, int bs) {
        long delta = 100000000L;
        DataciteAPIImporter client = null;
        long now = System.currentTimeMillis();
        int i = 0;
        try (SequenceFile.Writer writer = SequenceFile.createWriter((Configuration)conf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)hdfsTargetPath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class)});){
            try {
                long start = System.currentTimeMillis();
                for (long from = timestamp * 1000L; from < now; from += delta) {
                    client = new DataciteAPIImporter(from, bs, from + delta);
                    long end = 0L;
                    IntWritable key = new IntWritable(i);
                    Text value = new Text();
                    while (client.hasNext()) {
                        key.set(++i - 1);
                        value.set(client.next());
                        writer.append((Writable)key, (Writable)value);
                        writer.hflush();
                        if (i % 1000 != 0) continue;
                        end = System.currentTimeMillis();
                        float time = (float)(end - start) / 1000.0f;
                        Predef$.MODULE$.println((Object)("Imported " + i + " in " + time + " seconds"));
                        start = System.currentTimeMillis();
                    }
                    Predef$.MODULE$.println((Object)("updating from value: " + from + "  -> " + (from + delta)));
                }
            }
            catch (Throwable e) {
                Predef$.MODULE$.println((Object)new Tuple2((Object)"Error", (Object)e));
            }
        }
        catch (Throwable e) {
            this.log().error("Error", e);
        }
        return i;
    }

    private static final /* synthetic */ DefaultFormats$ formats$lzycompute$1(LazyRef formats$lzy$1) {
        DefaultFormats$ defaultFormats$;
        LazyRef lazyRef = formats$lzy$1;
        synchronized (lazyRef) {
            defaultFormats$ = formats$lzy$1.initialized() ? (DefaultFormats$)formats$lzy$1.value() : (DefaultFormats$)formats$lzy$1.initialize((Object)DefaultFormats$.MODULE$);
        }
        return defaultFormats$;
    }

    private static final DefaultFormats$ formats$1(LazyRef formats$lzy$1) {
        if (formats$lzy$1.initialized()) {
            return (DefaultFormats$)formats$lzy$1.value();
        }
        return ImportDatacite$.formats$lzycompute$1(formats$lzy$1);
    }

    private static final /* synthetic */ JValue json$lzycompute$1(LazyRef json$lzy$1, String input$1) {
        JValue jValue;
        LazyRef lazyRef = json$lzy$1;
        synchronized (lazyRef) {
            jValue = json$lzy$1.initialized() ? (JValue)json$lzy$1.value() : (JValue)json$lzy$1.initialize((Object)JsonMethods$.MODULE$.parse((Object)input$1, JsonMethods$.MODULE$.parse$default$2(), JsonMethods$.MODULE$.parse$default$3(), AsJsonInput$.MODULE$.stringAsJsonInput()));
        }
        return jValue;
    }

    private static final JValue json$1(LazyRef json$lzy$1, String input$1) {
        if (json$lzy$1.initialized()) {
            return (JValue)json$lzy$1.value();
        }
        return ImportDatacite$.json$lzycompute$1(json$lzy$1, input$1);
    }

    private ImportDatacite$() {
    }
}

