package eu.dnetlib.data.mapreduce.wf.dataimport;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.Node;
import com.googlecode.sarasvati.NodeToken;
import com.mongodb.DB;
import com.mongodb.DBObject;
import eu.dnetlib.data.hadoop.hdfs.SequenceFileWriterFactory;
import eu.dnetlib.data.mapreduce.wf.HdfsJobNode;
import eu.dnetlib.enabling.inspector.msro.progress.ProgressProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.springframework.beans.factory.annotation.Required;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/wf/dataimport/MongodbBatchExporter.class */
public class MongodbBatchExporter extends HdfsJobNode implements ProgressProvider {
    private static final Log log = LogFactory.getLog(MongodbBatchExporter.class);
    private DB db;
    private String format;
    private String layout;
    private String interpretation;
    private String hdfsDestinationDirectory;
    private int currentValue;
    private List<String> mdStoreIds;

    @Resource
    private SequenceFileWriterFactory sequenceFileWriterFactory;
    private int totalValue = -1;
    private final Function<DBObject, String> mdIdFunction = new Function<DBObject, String>() { // from class: eu.dnetlib.data.mapreduce.wf.dataimport.MongodbBatchExporter.1
        public String apply(DBObject dBObject) {
            return (String) dBObject.get("mdId");
        }
    };

    public void init() {
        this.currentValue = 0;
        this.totalValue = -1;
    }

    public void executeAsync(Engine engine, NodeToken nodeToken) {
        init();
        Configuration configuration = (Configuration) nodeToken.getEnv().getTransientAttribute("hbaseConf");
        Path path = new Path(getFilePath());
        nodeToken.getEnv().setAttribute("mongo_db", getDb().getName());
        nodeToken.getEnv().setAttribute("sequenceFile", path.toString());
        log.info("exporting mdstore records to: " + path.toString());
        try {
            this.mdStoreIds = getMDStoreIds();
            this.totalValue = sumMdStoreSizes();
            setProgressProvider(this);
            deleteHdfsFile(configuration, path);
            write(configuration, path);
            engine.complete(nodeToken, Arc.DEFAULT_ARC);
        } catch (Throwable th) {
            failed(engine, nodeToken, th);
        }
    }

    private String getFilePath() {
        return getHdfsDestinationDirectory() + "/mdstores_" + getFormat() + "-" + getLayout() + "-" + getInterpretation() + ".seq";
    }

    private void write(Configuration configuration, Path path) throws IOException {
        SequenceFile.Writer sequenceFileWriter = this.sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, configuration, path);
        for (String str : this.mdStoreIds) {
            log.info("exporting mdStore: " + str);
            Iterator it = getDb().getCollection(str).find().iterator();
            while (it.hasNext()) {
                DBObject dBObject = (DBObject) it.next();
                Text text = getText(dBObject, "id");
                Text text2 = getText(dBObject, "body");
                if (text == null || text2 == null) {
                    log.warn("invalid record!\n" + dBObject.toMap());
                } else {
                    sequenceFileWriter.append(text, text2);
                    this.currentValue++;
                }
            }
        }
        sequenceFileWriter.close();
        log.info("written " + this.totalValue + " records in sequence file: " + path.toString());
    }

    public boolean isInaccurate() {
        return false;
    }

    public int getTotalValue(Node node, NodeToken nodeToken) {
        if (this.totalValue < 0) {
            this.totalValue = sumMdStoreSizes();
        }
        return this.totalValue;
    }

    private int sumMdStoreSizes() {
        int i = 0;
        Iterator<String> it = this.mdStoreIds.iterator();
        while (it.hasNext()) {
            i = (int) (i + getDb().getCollection(it.next()).count());
        }
        return i;
    }

    public int getCurrentValue(Node node, NodeToken nodeToken) {
        return this.currentValue;
    }

    private ArrayList<String> getMDStoreIds() {
        return Lists.newArrayList(Iterables.transform(Iterables.filter(getDb().getCollection("metadata").find(), new Predicate<DBObject>() { // from class: eu.dnetlib.data.mapreduce.wf.dataimport.MongodbBatchExporter.2
            public boolean apply(DBObject dBObject) {
                return dBObject.get("format").toString().equals(MongodbBatchExporter.this.getFormat()) && dBObject.get("layout").toString().equals(MongodbBatchExporter.this.getLayout()) && dBObject.get("interpretation").toString().equals(MongodbBatchExporter.this.getInterpretation());
            }
        }), this.mdIdFunction));
    }

    private Text getText(DBObject dBObject, String str) {
        String str2 = (String) dBObject.get(str);
        if (str2 == null || str2.isEmpty()) {
            return null;
        }
        return new Text(str2);
    }

    public DB getDb() {
        return this.db;
    }

    @Required
    public void setDb(DB db) {
        this.db = db;
    }

    public String getHdfsDestinationDirectory() {
        return this.hdfsDestinationDirectory;
    }

    @Required
    public void setHdfsDestinationDirectory(String str) {
        this.hdfsDestinationDirectory = str;
    }

    public String getFormat() {
        return this.format;
    }

    @Required
    public void setFormat(String str) {
        this.format = str;
    }

    public String getLayout() {
        return this.layout;
    }

    @Required
    public void setLayout(String str) {
        this.layout = str;
    }

    public String getInterpretation() {
        return this.interpretation;
    }

    @Required
    public void setInterpretation(String str) {
        this.interpretation = str;
    }
}
