/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.resolver.mdstore.plugin;

import com.mongodb.BasicDBObject;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ListIndexesIterable;
import com.mongodb.client.MongoCollection;
import eu.dnetlib.data.mdstore.modular.MDStoreFeeder;
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
import eu.dnetlib.rmi.data.MDStoreServiceException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Autowired;

public class DLIMergeRecord
implements MDStorePlugin {
    private static final Log log = LogFactory.getLog(DLIMergeRecord.class);
    @Autowired
    private MDStoreTransactionManagerImpl transactionManager;
    @Autowired
    private MDStoreFeeder mdStoreFeeder;

    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback, FailedCallback failedCallback) {
        String id = params.get("mdStoreId");
        String host = params.get("mongoHost");
        String nsPrefix = params.get("nsPrefix");
        String sparkPath = params.get("sparkPath");
        String sparkJobPath = params.get("sparkJobPath");
        String sparkApplicationName = params.get("sparkApplicationName");
        String mongoDBName = params.get("mongoDBName");
        String number_of_core = params.get("numExecutor");
        if (StringUtils.isNotBlank((CharSequence)id) && StringUtils.isNotBlank((CharSequence)host) && StringUtils.isNotBlank((CharSequence)nsPrefix) && StringUtils.isNotBlank((CharSequence)sparkJobPath) && StringUtils.isNotBlank((CharSequence)sparkPath)) {
            try {
                String line;
                log.debug((Object)"starting spark job");
                String mdStoreCollection = this.transactionManager.getMDStoreCollection(id);
                String[] command = new String[]{sparkPath + "bin/spark-submit", sparkJobPath, host, this.transactionManager.getDb().getName(), mdStoreCollection, nsPrefix, number_of_core, sparkApplicationName};
                ProcessBuilder builder = new ProcessBuilder(command);
                Process p = builder.start();
                BufferedReader reader = new BufferedReader(new InputStreamReader(p.getErrorStream()));
                while ((line = reader.readLine()) != null) {
                    log.info((Object)line);
                }
                p.waitFor();
                if (p.exitValue() != 0) {
                    throw new MDStoreServiceException("The spark job exit with error");
                }
                log.info((Object)"Merging complete... creating index in the new collection");
                ListIndexesIterable documents = this.transactionManager.getDb().getCollection(mdStoreCollection).listIndexes();
                MongoCollection outMdStore = this.transactionManager.getDb().getCollection("out" + mdStoreCollection);
                for (Document d : documents) {
                    ((Map)d.get((Object)"key", Map.class)).keySet().forEach(it -> {
                        String s = it.toString();
                        outMdStore.createIndex((Bson)new BasicDBObject(s, (Object)1));
                    });
                }
                log.info((Object)"index Created, dropping old collection and rename the new one");
                this.transactionManager.getDb().getCollection(mdStoreCollection).drop();
                int size = (int)outMdStore.count();
                outMdStore.renameCollection(new MongoNamespace(mongoDBName, mdStoreCollection));
                this.mdStoreFeeder.touch(id, size);
                doneCallback.call(params);
            }
            catch (Throwable e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new RuntimeException("missing one of the following parameters {mdStoreId,mongoHost,nsPrefix,sparkPath,sparkJobPath}");
        }
    }

    public String getStatus() {
        return "30/100";
    }
}

