/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.partition;

import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionActionSetsByPayloadTypeJob {
    private static final Logger logger = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class);
    private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())));
    private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())));
    private ISClient isClient;

    public PartitionActionSetsByPayloadTypeJob(String isLookupUrl) {
        this.isClient = new ISClient(isLookupUrl);
    }

    public PartitionActionSetsByPayloadTypeJob() {
    }

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PromoteActionPayloadForGraphTableJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputActionSetIds = parser.get("inputActionSetIds");
        logger.info("inputActionSetIds: {}", (Object)inputActionSetIds);
        String outputPath = parser.get("outputPath");
        logger.info("outputPath: {}", (Object)outputPath);
        String isLookupUrl = parser.get("isLookupUrl");
        logger.info("isLookupUrl: {}", (Object)isLookupUrl);
        new PartitionActionSetsByPayloadTypeJob(isLookupUrl).run(isSparkSessionManaged, inputActionSetIds, outputPath);
    }

    protected void run(Boolean isSparkSessionManaged, String inputActionSetIds, String outputPath) {
        List<String> inputActionSetPaths = this.getIsClient().getLatestRawsetPaths(inputActionSetIds);
        logger.info("inputActionSetPaths: {}", (Object)String.join((CharSequence)",", inputActionSetPaths));
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PartitionActionSetsByPayloadTypeJob.removeOutputDir(spark, outputPath);
            PartitionActionSetsByPayloadTypeJob.readAndWriteActionSetsFromPaths(spark, inputActionSetPaths, outputPath);
        });
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static void readAndWriteActionSetsFromPaths(SparkSession spark, List<String> inputActionSetPaths, String outputPath) {
        inputActionSetPaths.stream().filter(path -> HdfsSupport.exists((String)path, (Configuration)spark.sparkContext().hadoopConfiguration())).map(inputActionSetPath -> PartitionActionSetsByPayloadTypeJob.readActionSetFromPath(spark, inputActionSetPath)).reduce(Dataset::union).ifPresent(actionDS -> PartitionActionSetsByPayloadTypeJob.saveActions((Dataset<Row>)actionDS, outputPath));
    }

    private static Dataset<Row> readActionSetFromPath(SparkSession spark, String path) {
        logger.info("Reading actions from path: {}", (Object)path);
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD rdd = sc.sequenceFile(path, Text.class, Text.class).map((Function & Serializable)x -> ((Text)x._2()).toString());
        return spark.read().schema(ATOMIC_ACTION_SCHEMA).json(rdd);
    }

    private static void saveActions(Dataset<Row> actionDS, String path) {
        logger.info("Saving actions to path: {}", (Object)path);
        actionDS.write().partitionBy(new String[]{"clazz"}).mode(SaveMode.Overwrite).parquet(path);
    }

    public ISClient getIsClient() {
        return this.isClient;
    }

    public void setIsClient(ISClient isClient) {
        this.isClient = isClient;
    }
}

