/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.bulktag;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.EntityCommunities;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.Community;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.CommunityConfigurationFactory;
import eu.dnetlib.dhp.bulktag.community.Pair;
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.bulktag.community.ResultTagger;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkBulkTagJob {
    private static String OPENAIRE_3 = "openaire3.0";
    private static String OPENAIRE_4 = "openaire-pub_4.0";
    private static String OPENAIRE_CRIS = "openaire-cris_1.1";
    private static String OPENAIRE_DATA = "openaire2.0_data";
    private static String EOSC = "10|openaire____::2e06c1122c7df43765fdcf91080824fa";
    private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class);
    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        CommunityConfiguration cc;
        String jsonConfiguration = IOUtils.toString((InputStream)SparkBulkTagJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json"));
        log.info(args.toString());
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String baseURL = parser.get("baseURL");
        log.info("baseURL: {}", (Object)baseURL);
        log.info("pathMap: {}", (Object)parser.get("pathMap"));
        String protoMappingPath = parser.get("pathMap");
        String hdfsNameNode = parser.get("nameNode");
        log.info("nameNode: {}", (Object)hdfsNameNode);
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", hdfsNameNode);
        FileSystem fs = FileSystem.get((Configuration)configuration);
        String temp = IOUtils.toString((InputStream)fs.open(new Path(protoMappingPath)), (Charset)StandardCharsets.UTF_8);
        log.info("protoMap: {}", (Object)temp);
        ProtoMap protoMap = (ProtoMap)new Gson().fromJson(temp, ProtoMap.class);
        log.info("pathMap: {}", (Object)new Gson().toJson((Object)protoMap));
        String dbUrl = parser.get("dbUrl");
        log.info("dbUrl: {}", (Object)dbUrl);
        String dbUser = parser.get("dbUser");
        log.info("dbUser: {}", (Object)dbUser);
        String dbPassword = parser.get("dbPassword");
        log.info("dbPassword: {}", (Object)dbPassword);
        String hdfsPath = outputPath + "masterDuplicate";
        log.info("hdfsPath: {}", (Object)hdfsPath);
        String configurationPath = parser.get("configurationPath");
        SparkConf conf = new SparkConf();
        String taggingConf = Optional.ofNullable(parser.get("taggingConf")).map(String::valueOf).orElse(null);
        if (taggingConf != null) {
            cc = CommunityConfigurationFactory.newInstance(taggingConf);
        } else {
            cc = Utils.getCommunityConfiguration(baseURL);
            SparkBulkTagJob.writeCommunityConfiguration(configurationPath, hdfsNameNode, cc);
        }
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkBulkTagJob.extendCommunityConfigurationForEOSC(spark, inputPath, cc);
            ReadDatasourceMasterDuplicateFromDB.execute((String)dbUrl, (String)dbUser, (String)dbPassword, (String)hdfsPath, (String)hdfsNameNode);
            SparkBulkTagJob.execBulkTag(spark, inputPath, outputPath, protoMap, cc);
            SparkBulkTagJob.execEntityTag(spark, inputPath + "organization", outputPath + "organization", SparkBulkTagJob.mapWithRepresentativeOrganization(spark, inputPath + "relation", Utils.getOrganizationCommunityMap(baseURL)), Organization.class, "community:organization", "Bulktagging for Community - Organization");
            SparkBulkTagJob.execEntityTag(spark, inputPath + "project", outputPath + "project", Utils.getProjectCommunityMap(baseURL), Project.class, "community:project", "Bulktagging for Community - Project");
            SparkBulkTagJob.execEntityTag(spark, inputPath + "datasource", outputPath + "datasource", SparkBulkTagJob.mapWithMasterDatasource(spark, hdfsPath, Utils.getDatasourceCommunities(baseURL)), Datasource.class, "community:datasource", "Bulktagging for Community - Datasource");
        });
    }

    private static void writeCommunityConfiguration(String configurationPath, String hdfsNameNode, CommunityConfiguration cc) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsNameNode);
        FileSystem fileSystem = FileSystem.get((Configuration)conf);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
        String formattedDate = LocalDate.now().format(formatter);
        FSDataOutputStream fos = fileSystem.create(new Path(configurationPath + formattedDate));
        try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter((OutputStream)fos, StandardCharsets.UTF_8));){
            for (Community c : cc.getCommunities().values()) {
                writer.write(new ObjectMapper().writeValueAsString((Object)c));
                writer.write("\n");
            }
        }
    }

    private static CommunityEntityMap mapWithMasterDatasource(SparkSession spark, String masterDuplicatePath, CommunityEntityMap datasourceCommunityMap) {
        Dataset masterDuplicate = spark.read().schema(Encoders.bean(MasterDuplicate.class).schema()).json(masterDuplicatePath).as(Encoders.bean(MasterDuplicate.class));
        List<String> idList = SparkBulkTagJob.entityIdList(datasourceCommunityMap);
        Dataset datasourceIdentifiers = spark.createDataset(idList, Encoders.STRING());
        List mappedKeys = masterDuplicate.join(datasourceIdentifiers, datasourceIdentifiers.col("value").equalTo((Object)masterDuplicate.col("duplicateId")), "left_semi").selectExpr(new String[]{"masterId as source", "duplicateId as target"}).collectAsList();
        return SparkBulkTagJob.remapCommunityEntityMap(datasourceCommunityMap, mappedKeys);
    }

    private static List<String> entityIdList(CommunityEntityMap datasourceCommunityMap) {
        return new ArrayList<String>(datasourceCommunityMap.keySet());
    }

    private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession spark, String relationPath, CommunityEntityMap organizationCommunityMap) {
        Dataset mergesRel = spark.read().schema(Encoders.bean(Relation.class).schema()).json(relationPath).filter("datainfo.deletedbyinference != true and relClass = 'merges'").select("source", new String[]{"target"});
        List<String> idList = SparkBulkTagJob.entityIdList(organizationCommunityMap);
        Dataset organizationIdentifiers = spark.createDataset(idList, Encoders.STRING());
        List mappedKeys = mergesRel.join(organizationIdentifiers, organizationIdentifiers.col("value").equalTo((Object)mergesRel.col("target")), "left_semi").select("source", new String[]{"target"}).collectAsList();
        return SparkBulkTagJob.remapCommunityEntityMap(organizationCommunityMap, mappedKeys);
    }

    private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap entityCommunityMap, List<Row> mappedKeys) {
        for (Row mappedEntry : mappedKeys) {
            String oldKey = (String)mappedEntry.getAs("target");
            String newKey = (String)mappedEntry.getAs("source");
            if (!entityCommunityMap.containsKey(oldKey)) continue;
            List content = (List)entityCommunityMap.remove(oldKey);
            entityCommunityMap.put(newKey, content);
        }
        return entityCommunityMap;
    }

    private static <E extends OafEntity> void execEntityTag(SparkSession spark, String inputPath, String outputPath, CommunityEntityMap communityEntity, Class<E> entityClass, String classID, String calssName) {
        Dataset<E> entity = SparkBulkTagJob.readPath(spark, inputPath, entityClass);
        Dataset pc = spark.createDataset(communityEntity.keySet().stream().map(k -> EntityCommunities.newInstance(k, communityEntity.get((String)k))).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class));
        entity.joinWith(pc, entity.col("id").equalTo((Object)pc.col("entityId")), "left").map((MapFunction & Serializable)t2 -> {
            OafEntity ds = (OafEntity)t2._1();
            if (t2._2() != null) {
                List context = Optional.ofNullable(ds.getContext()).map(v -> v.stream().map(c -> c.getId()).collect(Collectors.toList())).orElse(new ArrayList());
                if (!Optional.ofNullable(ds.getContext()).isPresent()) {
                    ds.setContext(new ArrayList());
                }
                ((EntityCommunities)t2._2()).getCommunitiesId().forEach(c -> {
                    if (!context.contains(c)) {
                        Context con = new Context();
                        con.setId(c);
                        con.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo((Boolean)false, (String)"bulktagging", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)classID, (String)calssName, (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"1")));
                        ds.getContext().add(con);
                    }
                });
            }
            return ds;
        }, Encoders.bean(entityClass)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
        SparkBulkTagJob.readPath(spark, outputPath, entityClass).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(inputPath);
    }

    private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) {
        Dataset datasources = SparkBulkTagJob.readPath(spark, inputPath + "datasource", Datasource.class).filter((FilterFunction & Serializable)ds -> SparkBulkTagJob.isOKDatasource(ds)).map((MapFunction & Serializable)ds -> ds.getId(), Encoders.STRING());
        Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
        for (String ds2 : datasources.collectAsList()) {
            if (dsm.containsKey(ds2)) continue;
            ArrayList eoscList = new ArrayList();
            dsm.put(ds2, eoscList);
        }
    }

    private static boolean isOKDatasource(Datasource ds) {
        String compatibility = ds.getOpenairecompatibility().getClassid();
        return (compatibility.equalsIgnoreCase(OPENAIRE_3) || compatibility.equalsIgnoreCase(OPENAIRE_4) || compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || compatibility.equalsIgnoreCase(OPENAIRE_DATA)) && ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC));
    }

    private static <R extends Result> void execBulkTag(SparkSession spark, String inputPath, String outputPath, ProtoMap protoMappingParams, CommunityConfiguration communityConfiguration) {
        ModelSupport.entityTypes.keySet().parallelStream().filter(ModelSupport::isResult).forEach(e -> {
            PropagationConstant.removeOutputDir(spark, outputPath + e.name());
            ResultTagger resultTagger = new ResultTagger();
            Class resultClazz = (Class)ModelSupport.entityTypes.get(e);
            SparkBulkTagJob.readPath(spark, inputPath + e.name(), resultClazz).map(SparkBulkTagJob.patchResult(), Encoders.bean((Class)resultClazz)).filter(Objects::nonNull).map((MapFunction & Serializable)value -> resultTagger.enrichContextCriteria(value, communityConfiguration, protoMappingParams), Encoders.bean((Class)resultClazz)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + e.name());
            SparkBulkTagJob.readPath(spark, outputPath + e.name(), resultClazz).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(inputPath + e.name());
        });
    }

    public static <R> Dataset<R> readPath(SparkSession spark, String inputPath, Class<R> clazz) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
    }

    private static <R extends Result> MapFunction<R, R> patchResult() {
        return (MapFunction & Serializable)r -> {
            if (Objects.isNull(r.getDataInfo())) {
                r.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, (String)"", (Boolean)false, (Boolean)false, (Qualifier)OafMapperUtils.unknown((String)"", (String)""), (String)""));
            } else if (r.getDataInfo().getDeletedbyinference() == null) {
                r.getDataInfo().setDeletedbyinference(Boolean.valueOf(false));
            }
            if (Objects.isNull(r.getContext())) {
                r.setContext(new ArrayList());
            }
            return r;
        };
    }
}

