/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupRecordFactory;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

public class SparkCreateDedupRecord
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkCreateDedupRecord.class);
    public static final String ROOT_TRUST = "0.8";

    public SparkCreateDedupRecord(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkCreateDedupRecord(parser, SparkCreateDedupRecord.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws ISLookUpException, DocumentException, IOException, SAXException {
        String graphBasePath = this.parser.get("graphBasePath");
        String isLookUpUrl = this.parser.get("isLookUpUrl");
        String actionSetId = this.parser.get("actionSetId");
        String workingPath = this.parser.get("workingPath");
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("isLookUpUrl:   '{}'", (Object)isLookUpUrl);
        log.info("actionSetId:   '{}'", (Object)actionSetId);
        log.info("workingPath:   '{}'", (Object)workingPath);
        for (DedupConfig dedupConf : this.getConfigurations(isLookUpService, actionSetId)) {
            String subEntity = dedupConf.getWf().getSubEntityValue();
            log.info("Creating deduprecords for: '{}'", (Object)subEntity);
            String outputPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, subEntity);
            SparkCreateDedupRecord.removeOutputDir(this.spark, outputPath);
            String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
            String entityPath = DedupUtility.createEntityPath(graphBasePath, subEntity);
            Class clazz = (Class)ModelSupport.entityTypes.get(EntityType.valueOf((String)subEntity));
            DataInfo dataInfo = SparkCreateDedupRecord.getDataInfo(dedupConf);
            DedupRecordFactory.createDedupRecord(this.spark, dataInfo, mergeRelPath, entityPath, clazz).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
            log.info("Updating mergerels for: '{}'", (Object)subEntity);
            Dataset dedupIds = this.spark.read().schema("`id` STRING, `mergedIds` ARRAY<STRING>").json(outputPath).selectExpr(new String[]{"id as source", "explode(mergedIds) as target"});
            this.spark.read().load(mergeRelPath).where("relClass == 'merges'").join(dedupIds, DHPUtils.toSeq(Arrays.asList("source", "target")).toSeq(), "left_semi").write().mode(SaveMode.Overwrite).option("compression", "gzip").save(workingPath + "/mergerel_filtered");
            Dataset validRels = this.spark.read().load(workingPath + "/mergerel_filtered");
            Dataset filteredMergeRels = validRels.unionByName(validRels.withColumnRenamed("source", "source_tmp").withColumnRenamed("target", "target_tmp").withColumn("relClass", functions.lit((Object)"isMergedIn")).withColumnRenamed("target_tmp", "source").withColumnRenamed("source_tmp", "target"));
            SparkCreateDedupRecord.saveParquet(filteredMergeRels, mergeRelPath, SaveMode.Overwrite);
            SparkCreateDedupRecord.removeOutputDir(this.spark, workingPath + "/mergerel_filtered");
        }
    }

    private static DataInfo getDataInfo(DedupConfig dedupConf) {
        DataInfo info = new DataInfo();
        info.setDeletedbyinference(Boolean.valueOf(false));
        info.setInferred(Boolean.valueOf(true));
        info.setInvisible(Boolean.valueOf(false));
        info.setTrust(ROOT_TRUST);
        info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
        Qualifier provenance = new Qualifier();
        provenance.setClassid("sysimport:dedup");
        provenance.setClassname("sysimport:dedup");
        provenance.setSchemeid("dnet:provenanceActions");
        provenance.setSchemename("dnet:provenanceActions");
        info.setProvenanceaction(provenance);
        return info;
    }
}

