/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.graph.raw;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import java.io.InputStream;
import java.io.Reader;
import java.io.Serializable;
import java.io.StringReader;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class MigrateHdfsMdstoresApplication
extends AbstractMigrationApplication {
    private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class);
    private static final Namespace DRI_NS_PREFIX = new Namespace("dri", "http://www.driver-repository.eu/namespace/dri");

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)MigrateHdfsMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String mdstoreManagerUrl = parser.get("mdstoreManagerUrl");
        log.info("mdstoreManagerUrl: {}", (Object)mdstoreManagerUrl);
        String mdFormat = parser.get("mdFormat");
        log.info("mdFormat: {}", (Object)mdFormat);
        String mdLayout = parser.get("mdLayout");
        log.info("mdLayout: {}", (Object)mdLayout);
        String mdInterpretation = parser.get("mdInterpretation");
        log.info("mdInterpretation: {}", (Object)mdInterpretation);
        String hdfsPath = parser.get("hdfsPath");
        log.info("hdfsPath: {}", (Object)hdfsPath);
        Set<String> paths = MigrateHdfsMdstoresApplication.mdstorePaths(mdstoreManagerUrl, mdFormat, mdLayout, mdInterpretation);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            HdfsSupport.remove((String)hdfsPath, (Configuration)spark.sparkContext().hadoopConfiguration());
            MigrateHdfsMdstoresApplication.processPaths(spark, hdfsPath, paths, String.format("%s-%s-%s", mdFormat, mdLayout, mdInterpretation));
        });
    }

    public static void processPaths(SparkSession spark, String outputPath, Set<String> paths, String type) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        log.info("Found {} not empty mdstores", (Object)paths.size());
        paths.forEach(arg_0 -> ((Logger)log).info(arg_0));
        String[] validPaths = (String[])paths.stream().filter(p -> HdfsSupport.exists((String)p, (Configuration)sc.hadoopConfiguration())).toArray(String[]::new);
        log.info("Processing existing paths {}", Arrays.asList(validPaths));
        if (validPaths.length > 0) {
            spark.read().parquet(validPaths).map(MigrateHdfsMdstoresApplication::enrichRecord, Encoders.STRING()).filter(Objects::nonNull).toJavaRDD().mapToPair((PairFunction & Serializable)xml -> new Tuple2((Object)new Text(UUID.randomUUID() + ":" + type), (Object)new Text(xml))).saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
        } else {
            spark.emptyDataFrame().toJavaRDD().mapToPair((PairFunction & Serializable)xml -> new Tuple2((Object)new Text(), (Object)new Text())).saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
        }
    }

    private static String enrichRecord(Row r) {
        String xml = (String)r.getAs("body");
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
        String collDate = dateFormat.format(new Date((Long)r.getAs("dateOfCollection")));
        String tranDate = dateFormat.format(new Date((Long)r.getAs("dateOfTransformation")));
        try {
            SAXReader reader = new SAXReader();
            reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
            Document doc = reader.read((Reader)new StringReader(xml));
            Element head = (Element)doc.selectSingleNode("//*[local-name() = 'header']");
            head.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText((String)r.getAs("id"));
            head.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(collDate);
            head.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(tranDate);
            return doc.asXML();
        }
        catch (Exception e) {
            log.error("Error patching record: " + xml);
            return null;
        }
    }
}

