/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.index;

import com.google.common.collect.Lists;
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
import eu.dnetlib.functionality.index.solr.feed.SolrServerPool;
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
import eu.dnetlib.miscutils.datetime.HumanTime;
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;

public class IndexFeedMapper
extends Mapper<Text, Text, Text, Text> {
    private InputDocumentFactory documentFactory;
    private SolrServerPool serverPool;
    private String version;
    private String dsId;
    private int shutdownWaitTime = 10000;
    private int bufferFlushThreshold = 100;
    private ApplyXslt dmfToRecord;
    private List<SolrInputDocument> buffer;
    private boolean simulation = false;

    protected void setup(Mapper.Context context) throws IOException, InterruptedException {
        this.logConfiguration(context.getConfiguration());
        this.serverPool = new SolrServerPool(context.getConfiguration().get("index.solr.url.local"), context.getConfiguration().get("index.solr.url.list"), context.getConfiguration().get("index.solr.url.zk"), context.getConfiguration().get("index.solr.collection"), Boolean.parseBoolean(context.getConfiguration().get("index.solr.local.feeding")));
        this.dsId = context.getConfiguration().get("id");
        this.shutdownWaitTime = Integer.parseInt(context.getConfiguration().get("index.shutdown.wait.time"));
        this.bufferFlushThreshold = Integer.parseInt(context.getConfiguration().get("index.buffer.flush.threshold"));
        this.documentFactory = new StreamingInputDocumentFactory();
        this.version = InputDocumentFactory.getParsedDateField((String)context.getConfiguration().get("index.feed.timestamp"));
        this.buffer = Lists.newArrayList();
        this.simulation = Boolean.parseBoolean(context.getConfiguration().get("index.solr.sim.mode"));
        String xslt = new String(Base64.decodeBase64((String)context.getConfiguration().get("index.xslt")));
        System.out.print("got xslt: \n" + xslt + "\ngot version: " + this.version + "\nsimulation: " + this.simulation + "\nbuffer size: " + this.bufferFlushThreshold + "\n\n");
        this.dmfToRecord = new ApplyXslt(xslt);
    }

    protected void map(Text key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        String indexRecord = "";
        SolrInputDocument doc = null;
        try {
            long start = System.currentTimeMillis();
            indexRecord = this.dmfToRecord.evaluate((Object)value.toString());
            doc = this.documentFactory.parseDocument(this.version, indexRecord.replaceAll("&#", "&amp;#"), this.dsId);
            long stop = System.currentTimeMillis() - start;
            System.out.println("parse " + doc.getField("__indexrecordidentifier").getValue() + " : " + HumanTime.exactly((long)stop));
            if (!doc.isEmpty()) {
                this.buffer.add(doc);
                if (this.buffer.size() >= this.bufferFlushThreshold) {
                    this.doAdd(this.buffer, context);
                }
            } else {
                context.getCounter("index", "skipped records").increment(1L);
            }
        }
        catch (Throwable e) {
            context.getCounter("index", e.getClass().toString()).increment(1L);
            context.write((Object)key, (Object)this.printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc));
            e.printStackTrace(System.err);
        }
    }

    private void doAdd(List<SolrInputDocument> buffer, Mapper.Context context) throws SolrServerException, IOException {
        if (!this.simulation) {
            long start = System.currentTimeMillis();
            UpdateResponse rsp = this.serverPool.addAll(buffer);
            long stop = System.currentTimeMillis() - start;
            System.out.println("feed time for " + buffer.size() + " records : " + HumanTime.exactly((long)stop) + "\n");
            context.getCounter("index", "status code: " + rsp.getStatus()).increment((long)buffer.size());
        }
        buffer.clear();
    }

    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        try {
            if (!this.buffer.isEmpty()) {
                this.doAdd(this.buffer, context);
            }
            System.out.println("\nwaiting " + this.shutdownWaitTime + "ms before shutdown");
            Thread.sleep(this.shutdownWaitTime);
            this.serverPool.shutdownAll();
        }
        catch (SolrServerException e) {
            System.err.println("couldn't shutdown server " + e.getMessage());
        }
    }

    private Text printRottenRecord(String taskid, Text value, String indexRecord, SolrInputDocument doc) {
        return new Text("\n**********************************\ntask: " + taskid + "\n" + this.check("original", value.toString() + this.check("indexRecord", indexRecord) + this.check("solrDoc", doc)));
    }

    private String check(String label, Object value) {
        if (value != null && !value.toString().isEmpty()) {
            return "\n " + label + ":\n" + value + "\n";
        }
        return "\n";
    }

    private void logConfiguration(Configuration conf) {
        System.out.println("job configutation #################");
        for (Map.Entry e : conf) {
            System.out.println("'" + (String)e.getKey() + "' : '" + (String)e.getValue() + "'");
        }
        System.out.println("end of job configutation #################\n");
    }
}

