/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.orcid;

import eu.dnetlib.dhp.application.AbstractScalaApplication;
import eu.dnetlib.dhp.collection.orcid.SparkApplyUpdate$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005ma\u0001B\u0001\u0003\u00015\u0011\u0001c\u00159be.\f\u0005\u000f\u001d7z+B$\u0017\r^3\u000b\u0005\r!\u0011!B8sG&$'BA\u0003\u0007\u0003)\u0019w\u000e\u001c7fGRLwN\u001c\u0006\u0003\u000f!\t1\u0001\u001a5q\u0015\tI!\"A\u0004e]\u0016$H.\u001b2\u000b\u0003-\t!!Z;\u0004\u0001M\u0011\u0001A\u0004\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0003#\u0019\t1\"\u00199qY&\u001c\u0017\r^5p]&\u00111\u0003\u0005\u0002\u0019\u0003\n\u001cHO]1diN\u001b\u0017\r\\1BaBd\u0017nY1uS>t\u0007\"C\u000b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\f!\u00031\u0001(o\u001c9feRL\b+\u0019;i!\t9RD\u0004\u0002\u001975\t\u0011DC\u0001\u001b\u0003\u0015\u00198-\u00197b\u0013\ta\u0012$\u0001\u0004Qe\u0016$WMZ\u0005\u0003=}\u0011aa\u0015;sS:<'B\u0001\u000f\u001a\u0013\t)\"\u0003C\u0005#\u0001\t\u0005\t\u0015!\u0003$M\u0005!\u0011M]4t!\rABEF\u0005\u0003Ke\u0011Q!\u0011:sCfL!A\t\n\t\u0011!\u0002!\u0011!Q\u0001\n%\n1\u0001\\8h!\tQs&D\u0001,\u0015\taS&A\u0003tY\u001a$$NC\u0001/\u0003\ry'oZ\u0005\u0003a-\u0012a\u0001T8hO\u0016\u0014\b\"\u0002\u001a\u0001\t\u0003\u0019\u0014A\u0002\u001fj]&$h\b\u0006\u00035m]B\u0004CA\u001b\u0001\u001b\u0005\u0011\u0001\"B\u000b2\u0001\u00041\u0002\"\u0002\u00122\u0001\u0004\u0019\u0003\"\u0002\u00152\u0001\u0004I\u0003\"\u0002\u001e\u0001\t\u0003Z\u0014a\u0001:v]R\tA\b\u0005\u0002\u0019{%\u0011a(\u0007\u0002\u0005+:LG\u000fC\u0003A\u0001\u0011%\u0011)A\u0005n_Z,G+\u00192mKR!AHQ'P\u0011\u0015\u0019u\b1\u0001E\u0003\u0015\u0019\b/\u0019:l!\t)5*D\u0001G\u0015\t9\u0005*A\u0002tc2T!aQ%\u000b\u0005)k\u0013AB1qC\u000eDW-\u0003\u0002M\r\na1\u000b]1sWN+7o]5p]\")aj\u0010a\u0001-\u0005IqM]1qQB\u000bG\u000f\u001b\u0005\u0006!~\u0002\rAF\u0001\u000bkB$\u0017\r^3QCRD\u0007\"\u0002*\u0001\t\u0013\u0019\u0016!D;qI\u0006$X\rR1uCN,G\u000fF\u0003=)\"TG\u000eC\u0003V#\u0002\u0007a+\u0001\u0007j]B,H\u000fR1uCN,G\u000f\u0005\u0002XK:\u0011\u0001l\u0019\b\u00033\nt!AW1\u000f\u0005m\u0003gB\u0001/`\u001b\u0005i&B\u00010\r\u0003\u0019a$o\\8u}%\ta&\u0003\u0002K[%\u00111)S\u0005\u0003\u000f\"K!\u0001\u001a$\u0002\u000fA\f7m[1hK&\u0011am\u001a\u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!\u0001\u001a$\t\u000b%\f\u0006\u0019\u0001,\u0002\u0011%$W\u000b\u001d3bi\u0016DQa[)A\u0002Y\u000bq\"\u001e9eCR,G)\u0019;bMJ\fW.\u001a\u0005\u0006[F\u0003\rAF\u0001\u000bi\u0006\u0014x-\u001a;QCRD\u0007\"B8\u0001\t\u0013\u0001\u0018aC2iK\u000e\\W\u000b\u001d3bi\u0016$B\u0001P9sg\")1I\u001ca\u0001\t\")aJ\u001ca\u0001-!)\u0001K\u001ca\u0001-!)Q\u000f\u0001C\u0005m\u0006\u0001\u0012\r\u001d9msR\u000b'\r\\3Va\u0012\fG/\u001a\u000b\u0006y]D\u0018P\u001f\u0005\u0006\u0007R\u0004\r\u0001\u0012\u0005\u0006\u001dR\u0004\rA\u0006\u0005\u0006!R\u0004\rA\u0006\u0005\u0006[R\u0004\rAF\u0004\u0006y\nA\t!`\u0001\u0011'B\f'o[!qa2LX\u000b\u001d3bi\u0016\u0004\"!\u000e@\u0007\u000b\u0005\u0011\u0001\u0012A@\u0014\u0007y\f\t\u0001E\u0002\u0019\u0003\u0007I1!!\u0002\u001a\u0005\u0019\te.\u001f*fM\"1!G C\u0001\u0003\u0013!\u0012! \u0005\tQy\u0014\r\u0011\"\u0001\u0002\u000eU\t\u0011\u0006C\u0004\u0002\u0012y\u0004\u000b\u0011B\u0015\u0002\t1|w\r\t\u0005\b\u0003+qH\u0011AA\f\u0003\u0011i\u0017-\u001b8\u0015\u0007q\nI\u0002\u0003\u0004#\u0003'\u0001\ra\t")
public class SparkApplyUpdate
extends AbstractScalaApplication {
    private final Logger log;

    public static void main(String[] stringArray) {
        SparkApplyUpdate$.MODULE$.main(stringArray);
    }

    public void run() {
        String graphPath = this.parser().get("graphPath");
        this.log.info("found parameters graphPath: {}", new Object[]{graphPath});
        String updatePath = this.parser().get("updatePath");
        this.log.info("found parameters updatePath: {}", new Object[]{updatePath});
        String targetPath = this.parser().get("targetPath");
        this.log.info("found parameters targetPath: {}", new Object[]{targetPath});
        this.applyTableUpdate(this.spark(), graphPath, updatePath, targetPath);
        this.checkUpdate(this.spark(), graphPath, targetPath);
        this.moveTable(this.spark(), graphPath, targetPath);
    }

    private void moveTable(SparkSession spark, String graphPath, String updatePath) {
        spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).repartition(1000).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath})));
        spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).repartition(1000).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath})));
        spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).repartition(1000).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath})));
    }

    private void updateDataset(Dataset<Row> inputDataset, Dataset<Row> idUpdate, Dataset<Row> updateDataframe, String targetPath) {
        inputDataset.join(idUpdate, inputDataset.apply("orcid").equalTo((Object)idUpdate.apply("orcid")), "leftanti").select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{inputDataset.apply("*")})).unionByName(updateDataframe).write().mode(SaveMode.Overwrite).save(targetPath);
    }

    private void checkUpdate(SparkSession spark, String graphPath, String updatePath) {
        long totalOriginalAuthors = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))).count();
        long totalOriginalWorks = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))).count();
        long totalOriginalEmployments = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))).count();
        long totalUpdateAuthors = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).count();
        long totalUpdateWorks = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).count();
        long totalUpdateEmployments = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).count();
        this.log.info("totalOriginalAuthors: {}", (Object)BoxesRunTime.boxToLong((long)totalOriginalAuthors));
        this.log.info("totalOriginalWorks: {}", (Object)BoxesRunTime.boxToLong((long)totalOriginalWorks));
        this.log.info("totalOriginalEmployments: {}", (Object)BoxesRunTime.boxToLong((long)totalOriginalEmployments));
        this.log.info("totalUpdateAuthors: {}", (Object)BoxesRunTime.boxToLong((long)totalUpdateAuthors));
        this.log.info("totalUpdateWorks: {}", (Object)BoxesRunTime.boxToLong((long)totalUpdateWorks));
        this.log.info("totalUpdateEmployments: {}", (Object)BoxesRunTime.boxToLong((long)totalUpdateEmployments));
        if (totalUpdateAuthors < totalOriginalAuthors || totalUpdateEmployments < totalOriginalEmployments || totalUpdateWorks < totalOriginalWorks) {
            throw new RuntimeException("The updated Graph contains less elements of the original one");
        }
    }

    private void applyTableUpdate(SparkSession spark, String graphPath, String updatePath, String targetPath) {
        Dataset orcidIDUpdate = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))).select("orcid", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[0]));
        this.updateDataset((Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))), (Dataset<Row>)orcidIDUpdate, (Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Authors"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        this.updateDataset((Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))), (Dataset<Row>)orcidIDUpdate, (Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Employments"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        this.updateDataset((Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{graphPath}))), (Dataset<Row>)orcidIDUpdate, (Dataset<Row>)spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{updatePath}))), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/Works"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
    }

    public SparkApplyUpdate(String propertyPath, String[] args, Logger log) {
        this.log = log;
        super(propertyPath, args, log);
    }
}

