/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.promote;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.promote.MergeAndGet;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class PromoteActionPayloadForGraphTableJobTest {
    private static final ClassLoader cl = PromoteActionPayloadForGraphTableJobTest.class.getClassLoader();
    private static SparkSession spark;
    private Path workingDir;
    private Path inputDir;
    private Path inputGraphRootDir;
    private Path inputActionPayloadRootDir;
    private Path outputDir;
    private static final ObjectMapper OBJECT_MAPPER;

    @BeforeAll
    public static void beforeAll() {
        SparkConf conf = new SparkConf();
        conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
        conf.setMaster("local");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        spark = SparkSession.builder().config(conf).getOrCreate();
    }

    @BeforeEach
    public void beforeEach() throws IOException {
        this.workingDir = Files.createTempDirectory(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName(), new FileAttribute[0]);
        this.inputDir = this.workingDir.resolve("input");
        this.inputGraphRootDir = this.inputDir.resolve("graph");
        this.inputActionPayloadRootDir = this.inputDir.resolve("action_payload");
        this.outputDir = this.workingDir.resolve("output");
    }

    @AfterEach
    public void afterEach() throws IOException {
        FileUtils.deleteDirectory((File)this.workingDir.toFile());
    }

    @AfterAll
    public static void afterAll() {
        spark.stop();
    }

    @Test
    void shouldPromoteActionPayload_custom() throws Exception {
        Class<Publication> rowClazz = Publication.class;
        Class<Result> actionPayloadClazz = Result.class;
        MergeAndGet.Strategy strategy = MergeAndGet.Strategy.MERGE_FROM_AND_GET;
        Path inputGraphTableDir = PromoteActionPayloadForGraphTableJobTest.createGraphTable(this.inputGraphRootDir, rowClazz);
        Path inputActionPayloadDir = PromoteActionPayloadForGraphTableJobTest.createActionPayload(this.inputActionPayloadRootDir, rowClazz, actionPayloadClazz);
        Path outputGraphTableDir = this.outputDir.resolve("graph").resolve(rowClazz.getSimpleName().toLowerCase());
        PromoteActionPayloadForGraphTableJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", inputGraphTableDir.toString(), "-graphTableClassName", rowClazz.getCanonicalName(), "-inputActionPayloadPath", inputActionPayloadDir.toString(), "-actionPayloadClassName", actionPayloadClazz.getCanonicalName(), "-outputGraphTablePath", outputGraphTableDir.toString(), "-mergeAndGetStrategy", strategy.name(), "--shouldGroupById", "true"});
        Assertions.assertTrue((boolean)Files.exists(outputGraphTableDir, new LinkOption[0]));
        List actualOutputRows = PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJobOutput(outputGraphTableDir.toString(), rowClazz).collectAsList().stream().sorted(Comparator.comparingInt(Object::hashCode)).collect(Collectors.toList());
        Publication p = actualOutputRows.stream().map(o -> (Publication)o).filter(o -> "50|4ScienceCRIS::6a67ed3daba1c380bf9de3c13ed9c879".equals(o.getId())).findFirst().get();
        Assertions.assertNotNull((Object)p.getMeasures());
        Assertions.assertTrue((p.getMeasures().size() > 0 ? 1 : 0) != 0);
    }

    public static Stream<Arguments> promoteJobTestParams() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Dataset.class, Dataset.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Dataset.class, Result.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Datasource.class, Datasource.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Organization.class, Organization.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, OtherResearchProduct.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, Result.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Project.class, Project.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Publication.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Result.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Relation.class, Relation.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Software.class}), Arguments.arguments((Object[])new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Result.class}));
    }

    private static <G extends Oaf> Path createGraphTable(Path inputGraphRootDir, Class<G> rowClazz) {
        String inputGraphTableJsonDumpPath = PromoteActionPayloadForGraphTableJobTest.inputGraphTableJsonDumpLocation(rowClazz);
        Path inputGraphTableJsonDumpFile = Paths.get(Objects.requireNonNull(cl.getResource(inputGraphTableJsonDumpPath)).getFile(), new String[0]);
        org.apache.spark.sql.Dataset<G> rowDS = PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJsonDump(inputGraphTableJsonDumpFile.toString(), rowClazz);
        String inputGraphTableName = rowClazz.getSimpleName().toLowerCase();
        Path inputGraphTableDir = inputGraphRootDir.resolve(inputGraphTableName);
        PromoteActionPayloadForGraphTableJobTest.writeGraphTableAaJobInput(rowDS, inputGraphTableDir.toString());
        return inputGraphTableDir;
    }

    private static String inputGraphTableJsonDumpLocation(Class<? extends Oaf> rowClazz) {
        return String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/promote/input/graph", rowClazz.getSimpleName().toLowerCase());
    }

    private static <G extends Oaf> org.apache.spark.sql.Dataset<G> readGraphTableFromJsonDump(String path, Class<G> rowClazz) {
        return spark.read().textFile(path).map((MapFunction & Serializable)json -> (Oaf)OBJECT_MAPPER.readValue(json, rowClazz), Encoders.bean(rowClazz));
    }

    private static <G extends Oaf> void writeGraphTableAaJobInput(org.apache.spark.sql.Dataset<G> rowDS, String path) {
        rowDS.write().option("compression", "gzip").json(path);
    }

    private static <G extends Oaf, A extends Oaf> Path createActionPayload(Path inputActionPayloadRootDir, Class<G> rowClazz, Class<A> actionPayloadClazz) {
        String inputActionPayloadJsonDumpPath = PromoteActionPayloadForGraphTableJobTest.inputActionPayloadJsonDumpLocation(rowClazz, actionPayloadClazz);
        Path inputActionPayloadJsonDumpFile = Paths.get(Objects.requireNonNull(cl.getResource(inputActionPayloadJsonDumpPath)).getFile(), new String[0]);
        org.apache.spark.sql.Dataset<String> actionPayloadDS = PromoteActionPayloadForGraphTableJobTest.readActionPayloadFromJsonDump(inputActionPayloadJsonDumpFile.toString());
        Path inputActionPayloadDir = inputActionPayloadRootDir.resolve(actionPayloadClazz.getSimpleName().toLowerCase());
        PromoteActionPayloadForGraphTableJobTest.writeActionPayloadAsJobInput(actionPayloadDS, inputActionPayloadDir.toString());
        return inputActionPayloadDir;
    }

    private static String inputActionPayloadJsonDumpLocation(Class<? extends Oaf> rowClazz, Class<? extends Oaf> actionPayloadClazz) {
        return String.format("eu/dnetlib/dhp/actionmanager/promote/input/action_payload/%s_table/%s.json", rowClazz.getSimpleName().toLowerCase(), actionPayloadClazz.getSimpleName().toLowerCase());
    }

    private static org.apache.spark.sql.Dataset<String> readActionPayloadFromJsonDump(String path) {
        return spark.read().textFile(path);
    }

    private static void writeActionPayloadAsJobInput(org.apache.spark.sql.Dataset<String> actionPayloadDS, String path) {
        actionPayloadDS.withColumnRenamed("value", "payload").write().parquet(path);
    }

    private static <G extends Oaf> org.apache.spark.sql.Dataset<G> readGraphTableFromJobOutput(String path, Class<G> rowClazz) {
        return spark.read().textFile(path).map((MapFunction & Serializable)json -> (Oaf)OBJECT_MAPPER.readValue(json, rowClazz), Encoders.bean(rowClazz));
    }

    private static String resultFileLocation(MergeAndGet.Strategy strategy, Class<? extends Oaf> rowClazz, Class<? extends Oaf> actionPayloadClazz) {
        return String.format("eu/dnetlib/dhp/actionmanager/promote/output/graph/%s/%s/%s_action_payload/result.json", strategy.name().toLowerCase(), rowClazz.getSimpleName().toLowerCase(), actionPayloadClazz.getSimpleName().toLowerCase());
    }

    static {
        OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }

    @DisplayName(value="Job")
    @Nested
    class Main {
        Main() {
        }

        @Test
        void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
            Class<Relation> rowClazz = Relation.class;
            Class<OafEntity> actionPayloadClazz = OafEntity.class;
            RuntimeException exception = (RuntimeException)Assertions.assertThrows(RuntimeException.class, () -> PromoteActionPayloadForGraphTableJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", "", "-graphTableClassName", rowClazz.getCanonicalName(), "-inputActionPayloadPath", "", "-actionPayloadClassName", actionPayloadClazz.getCanonicalName(), "-outputGraphTablePath", "", "-mergeAndGetStrategy", MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name(), "--shouldGroupById", "true"}));
            String msg = String.format("graph table class is not a subclass of action payload class: graph=%s, action=%s", rowClazz.getCanonicalName(), actionPayloadClazz.getCanonicalName());
            Assertions.assertTrue((boolean)exception.getMessage().contains(msg));
        }

        @ParameterizedTest(name="strategy: {0}, graph table: {1}, action payload: {2}")
        @MethodSource(value={"eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest#promoteJobTestParams"})
        void shouldPromoteActionPayloadForGraphTable(MergeAndGet.Strategy strategy, Class<? extends Oaf> rowClazz, Class<? extends Oaf> actionPayloadClazz) throws Exception {
            Path inputGraphTableDir = PromoteActionPayloadForGraphTableJobTest.createGraphTable(PromoteActionPayloadForGraphTableJobTest.this.inputGraphRootDir, rowClazz);
            Path inputActionPayloadDir = PromoteActionPayloadForGraphTableJobTest.createActionPayload(PromoteActionPayloadForGraphTableJobTest.this.inputActionPayloadRootDir, rowClazz, actionPayloadClazz);
            Path outputGraphTableDir = PromoteActionPayloadForGraphTableJobTest.this.outputDir.resolve("graph").resolve(rowClazz.getSimpleName().toLowerCase());
            PromoteActionPayloadForGraphTableJob.main((String[])new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", inputGraphTableDir.toString(), "-graphTableClassName", rowClazz.getCanonicalName(), "-inputActionPayloadPath", inputActionPayloadDir.toString(), "-actionPayloadClassName", actionPayloadClazz.getCanonicalName(), "-outputGraphTablePath", outputGraphTableDir.toString(), "-mergeAndGetStrategy", strategy.name(), "--shouldGroupById", "true"});
            Assertions.assertTrue((boolean)Files.exists(outputGraphTableDir, new LinkOption[0]));
            List actualOutputRows = PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJobOutput(outputGraphTableDir.toString(), rowClazz).collectAsList().stream().map(s -> {
                s.setLastupdatetimestamp(Long.valueOf(0L));
                return s;
            }).sorted(Comparator.comparingInt(Object::hashCode)).collect(Collectors.toList());
            String expectedOutputGraphTableJsonDumpPath = PromoteActionPayloadForGraphTableJobTest.resultFileLocation(strategy, rowClazz, actionPayloadClazz);
            Path expectedOutputGraphTableJsonDumpFile = Paths.get(Objects.requireNonNull(cl.getResource(expectedOutputGraphTableJsonDumpPath)).getFile(), new String[0]);
            List expectedOutputRows = PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJsonDump(expectedOutputGraphTableJsonDumpFile.toString(), rowClazz).collectAsList().stream().map(s -> {
                s.setLastupdatetimestamp(Long.valueOf(0L));
                return s;
            }).sorted(Comparator.comparingInt(Object::hashCode)).collect(Collectors.toList());
            Assertions.assertIterableEquals(expectedOutputRows, actualOutputRows);
        }
    }
}

