/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.TupleWrapper;
import eu.dnetlib.dhp.oa.provision.utils.ContextMapper;
import eu.dnetlib.dhp.oa.provision.utils.XmlRecordFactory;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.utils.ModelHardLimits;
import eu.dnetlib.dhp.schema.solr.SolrRecord;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PayloadConverterJob {
    private static final Logger log = LoggerFactory.getLogger(PayloadConverterJob.class);
    public static final String schemaLocation = "https://www.openaire.eu/schema/1.0/oaf-1.0.xsd";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_EMPTY);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)PayloadConverterJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_payload_converter.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("inputPath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        Boolean validateXML = Optional.ofNullable(parser.get("validateXML")).map(Boolean::valueOf).orElse(Boolean.FALSE);
        log.info("validateXML: {}", (Object)validateXML);
        String contextApiBaseUrl = parser.get("contextApiBaseUrl");
        log.info("contextApiBaseUrl: {}", (Object)contextApiBaseUrl);
        String isLookupUrl = parser.get("isLookupUrl");
        log.info("isLookupUrl: {}", (Object)isLookupUrl);
        ISLookUpService isLookup = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PayloadConverterJob.removeOutputDir(spark, outputPath);
            PayloadConverterJob.createPayloads(spark, inputPath, outputPath, ContextMapper.fromAPI(contextApiBaseUrl), VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookup), validateXML);
        });
    }

    private static void createPayloads(SparkSession spark, String inputPath, String outputPath, ContextMapper contextMapper, VocabularyGroup vocabularies, Boolean validateXML) {
        XmlRecordFactory recordFactory = new XmlRecordFactory(PayloadConverterJob.prepareAccumulators(spark.sparkContext()), contextMapper, false, schemaLocation);
        List paths = HdfsSupport.listFiles((String)inputPath, (Configuration)spark.sparkContext().hadoopConfiguration());
        log.info("Found paths: {}", (Object)String.join((CharSequence)",", paths));
        spark.read().load(DHPUtils.toSeq((List)paths).toSeq()).as(Encoders.kryo(JoinedEntity.class)).filter(PayloadConverterJob.filterDeletedByInferenceFn()).map(PayloadConverterJob.pruneRelatedEntitiesFn(), Encoders.kryo(JoinedEntity.class)).map(PayloadConverterJob.mapPayloadTuple(contextMapper, vocabularies, validateXML, recordFactory), Encoders.bean(TupleWrapper.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static FilterFunction<JoinedEntity> filterDeletedByInferenceFn() {
        return (FilterFunction & Serializable)je -> Optional.ofNullable(je.getEntity()).map(Oaf::getDataInfo).map(DataInfo::getDeletedbyinference).orElse(false) == false;
    }

    private static MapFunction<JoinedEntity, TupleWrapper> mapPayloadTuple(ContextMapper contextMapper, VocabularyGroup vocabularies, Boolean validateXML, XmlRecordFactory recordFactory) {
        return (MapFunction & Serializable)je -> {
            SolrRecord solrRecord = ProvisionModelSupport.transform(je, contextMapper, vocabularies);
            String xml = recordFactory.build((JoinedEntity)je, validateXML);
            return new TupleWrapper(xml, OBJECT_MAPPER.writeValueAsString((Object)solrRecord));
        };
    }

    private static MapFunction<JoinedEntity, JoinedEntity> pruneRelatedEntitiesFn() {
        return (MapFunction & Serializable)je -> {
            HashMap freqs = Maps.newHashMap();
            ArrayList rew = Lists.newArrayList();
            if (je.getLinks() != null) {
                je.getLinks().forEach(link -> {
                    String relClass = link.getRelation().getRelClass();
                    Long count = freqs.getOrDefault(relClass, 0L);
                    Long max = ModelHardLimits.MAX_RELATIONS_BY_RELCLASS.getOrDefault(relClass, Long.MAX_VALUE);
                    if (count <= max) {
                        rew.add(link);
                        freqs.put(relClass, freqs.getOrDefault(relClass, 0L) + 1L);
                    }
                });
                je.setLinks(rew);
            }
            return je;
        };
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static Map<String, LongAccumulator> prepareAccumulators(SparkContext sc) {
        HashMap accumulators = Maps.newHashMap();
        accumulators.put("resultResult_similarity_isAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_isAmongTopNSimilarDocuments"));
        accumulators.put("resultResult_similarity_hasAmongTopNSimilarDocuments", sc.longAccumulator("resultResult_similarity_hasAmongTopNSimilarDocuments"));
        accumulators.put("resultResult_supplement_isSupplementTo", sc.longAccumulator("resultResult_supplement_isSupplementTo"));
        accumulators.put("resultResult_supplement_isSupplementedBy", sc.longAccumulator("resultResult_supplement_isSupplementedBy"));
        accumulators.put("resultResult_dedup_isMergedIn", sc.longAccumulator("resultResult_dedup_isMergedIn"));
        accumulators.put("resultResult_dedup_merges", sc.longAccumulator("resultResult_dedup_merges"));
        accumulators.put("resultResult_publicationDataset_isRelatedTo", sc.longAccumulator("resultResult_publicationDataset_isRelatedTo"));
        accumulators.put("resultResult_relationship_isRelatedTo", sc.longAccumulator("resultResult_relationship_isRelatedTo"));
        accumulators.put("resultProject_outcome_isProducedBy", sc.longAccumulator("resultProject_outcome_isProducedBy"));
        accumulators.put("resultProject_outcome_produces", sc.longAccumulator("resultProject_outcome_produces"));
        accumulators.put("resultOrganization_affiliation_isAuthorInstitutionOf", sc.longAccumulator("resultOrganization_affiliation_isAuthorInstitutionOf"));
        accumulators.put("resultOrganization_affiliation_hasAuthorInstitution", sc.longAccumulator("resultOrganization_affiliation_hasAuthorInstitution"));
        accumulators.put("projectOrganization_participation_hasParticipant", sc.longAccumulator("projectOrganization_participation_hasParticipant"));
        accumulators.put("projectOrganization_participation_isParticipant", sc.longAccumulator("projectOrganization_participation_isParticipant"));
        accumulators.put("organizationOrganization_dedup_isMergedIn", sc.longAccumulator("organizationOrganization_dedup_isMergedIn"));
        accumulators.put("organizationOrganization_dedup_merges", sc.longAccumulator("resultProject_outcome_produces"));
        accumulators.put("datasourceOrganization_provision_isProvidedBy", sc.longAccumulator("datasourceOrganization_provision_isProvidedBy"));
        accumulators.put("datasourceOrganization_provision_provides", sc.longAccumulator("datasourceOrganization_provision_provides"));
        return accumulators;
    }
}

