package eu.dnetlib.dhp.actionmanager.partition;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
import eu.dnetlib.dhp.common.ThrowingSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.mutable.Seq;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest.class */
public class PartitionActionSetsByPayloadTypeJobTest {
    private static Configuration configuration;
    private static SparkSession spark;
    private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())));

    @DisplayName("Job")
    @Nested
    /* loaded from: input_file:eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest$Main.class */
    class Main {

        @Mock
        private ISClient isClient;

        Main() {
        }

        @Test
        void shouldPartitionActionSetsByPayloadType(@TempDir Path path) throws Exception {
            Path resolve = path.resolve("input").resolve("action_sets");
            Path resolve2 = path.resolve("output");
            Map createActionSets = PartitionActionSetsByPayloadTypeJobTest.createActionSets(resolve);
            Mockito.when(this.isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn(PartitionActionSetsByPayloadTypeJobTest.this.resolveInputActionSetPaths(resolve));
            PartitionActionSetsByPayloadTypeJob partitionActionSetsByPayloadTypeJob = new PartitionActionSetsByPayloadTypeJob();
            partitionActionSetsByPayloadTypeJob.setIsClient(this.isClient);
            partitionActionSetsByPayloadTypeJob.run(Boolean.FALSE, "", resolve2.toString());
            Files.exists(resolve2, new LinkOption[0]);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Dataset.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Datasource.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Organization.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, OtherResearchProduct.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Project.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Publication.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Result.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Relation.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(resolve2, createActionSets, Software.class);
        }
    }

    @BeforeAll
    public static void beforeAll() throws IOException {
        configuration = Job.getInstance().getConfiguration();
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
        sparkConf.setMaster("local");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        spark = SparkSession.builder().config(sparkConf).getOrCreate();
    }

    @AfterAll
    public static void afterAll() {
        spark.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<String> resolveInputActionSetPaths(Path path) throws IOException {
        return (List) Files.list(getInputActionSetJsonDumpsDir()).map(path2 -> {
            return path.resolve(path2.getFileName().toString()).toString();
        }).collect(Collectors.toCollection(ArrayList::new));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, List<String>> createActionSets(Path path) throws IOException {
        Path inputActionSetJsonDumpsDir = getInputActionSetJsonDumpsDir();
        HashMap hashMap = new HashMap();
        Files.list(inputActionSetJsonDumpsDir).forEach(path2 -> {
            String path2 = path2.getFileName().toString();
            Path resolve = path.resolve(path2);
            org.apache.spark.sql.Dataset cache = readActionsFromJsonDump(path2.toString()).cache();
            writeActionsAsJobInput(cache, path2, resolve.toString());
            Map map = (Map) cache.withColumn("atomic_action", functions.from_json(functions.col("value"), ATOMIC_ACTION_SCHEMA)).select(new Column[]{functions.expr("atomic_action.*")}).groupBy(new Column[]{functions.col("clazz")}).agg(functions.collect_list(functions.col("payload")).as("payload_list"), new Column[0]).collectAsList().stream().map(row -> {
                return new AbstractMap.SimpleEntry(row.getAs("clazz"), JavaConversions.mutableSeqAsJavaList((Seq) row.getAs("payload_list")));
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
            map.keySet().forEach(str -> {
                if (!hashMap.containsKey(str)) {
                    hashMap.put(str, map.get(str));
                    return;
                }
                ArrayList arrayList = new ArrayList();
                arrayList.addAll((Collection) hashMap.get(str));
                arrayList.addAll((Collection) map.get(str));
                hashMap.put(str, arrayList);
            });
        });
        return hashMap;
    }

    private static Path getInputActionSetJsonDumpsDir() {
        return Paths.get(((URL) Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/"))).getFile(), new String[0]);
    }

    private static org.apache.spark.sql.Dataset<String> readActionsFromJsonDump(String str) {
        return spark.read().textFile(str);
    }

    private static void writeActionsAsJobInput(org.apache.spark.sql.Dataset<String> dataset, String str, String str2) {
        dataset.javaRDD().mapToPair(str3 -> {
            return new Tuple2(new Text(str), new Text(str3));
        }).saveAsNewAPIHadoopFile(str2, Text.class, Text.class, SequenceFileOutputFormat.class, configuration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T extends Oaf> void assertForOafType(Path path, Map<String, List<String>> map, Class<T> cls) {
        Path resolve = path.resolve(String.format("clazz=%s", cls.getCanonicalName()));
        Files.exists(resolve, new LinkOption[0]);
        List collectAsList = readActionPayloadFromJobOutput(resolve.toString(), cls).collectAsList();
        collectAsList.sort(Comparator.comparingInt((v0) -> {
            return v0.hashCode();
        }));
        Assertions.assertIterableEquals((List) map.get(cls.getCanonicalName()).stream().map(str -> {
            return mapToOaf(str, cls);
        }).sorted(Comparator.comparingInt((v0) -> {
            return v0.hashCode();
        })).collect(Collectors.toList()), collectAsList);
    }

    private static <T extends Oaf> org.apache.spark.sql.Dataset<T> readActionPayloadFromJobOutput(String str, Class<T> cls) {
        return spark.read().parquet(str).map(row -> {
            return (Oaf) OBJECT_MAPPER.readValue((String) row.getAs("payload"), cls);
        }, Encoders.bean(cls));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T extends Oaf> T mapToOaf(String str, Class<T> cls) {
        return (T) ThrowingSupport.rethrowAsRuntimeException(() -> {
            return (Oaf) OBJECT_MAPPER.readValue(str, cls);
        }, String.format("failed to map json to class: json=%s, class=%s", str, cls.getCanonicalName()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1167311898:
                if (implMethodName.equals("lambda$readActionPayloadFromJobOutput$6506563e$1")) {
                    z = false;
                    break;
                }
                break;
            case 1534584601:
                if (implMethodName.equals("lambda$writeActionsAsJobInput$dac10fab$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Lorg/apache/spark/sql/Row;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return row -> {
                        return (Oaf) OBJECT_MAPPER.readValue((String) row.getAs("payload"), cls);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJobTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lscala/Tuple2;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return str3 -> {
                        return new Tuple2(new Text(str), new Text(str3));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
