package eu.dnetlib.data.mapreduce.hbase.index;

import com.google.common.collect.Lists;
import com.googlecode.protobuf.format.JsonFormat;
import eu.dnetlib.data.mapreduce.JobParams;
import eu.dnetlib.data.mapreduce.util.DedupUtils;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.transform.ProtoDocumentMapper;
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
import eu.dnetlib.miscutils.datetime.HumanTime;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.dom4j.DocumentException;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/hbase/index/DedupIndexFeedMapper.class */
public class DedupIndexFeedMapper extends TableMapper<Text, Text> {
    private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class);
    private CloudSolrServer solrServer;
    private String dsId;
    private String version;
    private List<SolrInputDocument> buffer;
    private static final int MAX_RETRIES = 10;
    private int shutdownWaitTime = 10000;
    private int bufferFlushThreshold = 100;
    private int backoffTimeMs = 5000;
    private boolean simulation = false;
    private String entityType = null;
    private String actionset = null;
    private ProtoDocumentMapper mapper = null;

    protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException {
        logConfiguration(context.getConfiguration());
        this.shutdownWaitTime = Integer.parseInt(context.getConfiguration().get(JobParams.INDEX_SHUTDOWN_WAIT));
        this.bufferFlushThreshold = Integer.parseInt(context.getConfiguration().get(JobParams.INDEX_BUFFER_FLUSH_TRESHOLD));
        this.dsId = context.getConfiguration().get(JobParams.INDEX_DSID);
        this.version = InputDocumentFactory.getParsedDateField(context.getConfiguration().get(JobParams.INDEX_FEED_TIME));
        this.buffer = Lists.newArrayList();
        this.simulation = Boolean.parseBoolean(context.getConfiguration().get(JobParams.INDEX_FEED_SIMULATION_MODE));
        this.entityType = context.getConfiguration().get("entityType");
        this.actionset = context.getConfiguration().get("actionset");
        String str = context.getConfiguration().get("index.fields");
        log.info("got fields: \n" + str);
        log.info("got dsId: " + this.dsId);
        log.info("got version: " + this.version);
        log.info("simulation: " + this.simulation);
        log.info("entityType: " + this.entityType);
        log.info("actionset: " + this.actionset);
        log.info("buffer size: " + this.bufferFlushThreshold);
        try {
            this.mapper = new ProtoDocumentMapper(str);
            String str2 = context.getConfiguration().get(JobParams.INDEX_SOLR_URL);
            log.info("solr server baseURL: " + str2);
            String str3 = context.getConfiguration().get(JobParams.INDEX_SOLR_COLLECTION);
            log.info("solr server collection: " + str3);
            while (true) {
                try {
                    log.info("initializing solr server...");
                    this.solrServer = new CloudSolrServer(str2);
                    this.solrServer.connect();
                    this.solrServer.setParallelUpdates(true);
                    this.solrServer.setDefaultCollection(str3);
                    SolrPingResponse ping = this.solrServer.ping();
                    if (ping.getStatus() != 0) {
                        throw new SolrServerException("bad init status: " + ping.getStatus());
                        break;
                    }
                    return;
                } catch (Throwable th) {
                    if (this.solrServer != null) {
                        this.solrServer.shutdown();
                    }
                    context.getCounter("index init", th.getMessage()).increment(1L);
                    log.info(String.format("failed to init solr client wait %dms", Integer.valueOf(this.backoffTimeMs)));
                    Thread.sleep(this.backoffTimeMs);
                }
            }
        } catch (DocumentException e) {
            log.error("unable to parse fields: " + str);
            throw new IllegalArgumentException((Throwable) e);
        }
    }

    protected void map(ImmutableBytesWritable immutableBytesWritable, Result result, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException {
        SolrInputDocument solrInputDocument = null;
        NavigableMap familyMap = result.getFamilyMap(Bytes.toBytes(this.entityType));
        if (MapUtils.isEmpty(familyMap) || !familyMap.containsKey(DedupUtils.BODY_B)) {
            context.getCounter(this.entityType, "missing body");
            return;
        }
        OafProtos.Oaf parseFrom = OafProtos.Oaf.parseFrom((byte[]) familyMap.get(DedupUtils.BODY_B));
        try {
            solrInputDocument = getDocument(parseFrom);
            int i = 0;
            while (i < MAX_RETRIES) {
                try {
                    addDocument(context, solrInputDocument);
                    return;
                } catch (Throwable th) {
                    i++;
                    context.getCounter("index feed", "retries").increment(1L);
                    handleError(immutableBytesWritable, JsonFormat.printToString(parseFrom), context, solrInputDocument, th);
                    log.info(String.format("failed to feed documents, waiting %dms", Integer.valueOf(this.backoffTimeMs)));
                    Thread.sleep(this.backoffTimeMs);
                }
            }
            if (i >= MAX_RETRIES) {
                throw new IOException("too many retries: " + i);
            }
        } catch (Throwable th2) {
            handleError(immutableBytesWritable, JsonFormat.printToString(parseFrom), context, solrInputDocument, th2);
        }
    }

    private SolrInputDocument getDocument(OafProtos.Oaf oaf) throws DocumentException {
        SolrInputDocument map = this.mapper.map(oaf, this.version, this.dsId, this.actionset);
        map.addField("actionset", this.actionset);
        return map;
    }

    private void addDocument(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context, SolrInputDocument solrInputDocument) throws SolrServerException, IOException {
        if (solrInputDocument.isEmpty()) {
            context.getCounter("index feed", "skipped records").increment(1L);
            return;
        }
        this.buffer.add(solrInputDocument);
        if (this.buffer.size() >= this.bufferFlushThreshold) {
            doAdd(this.buffer, context);
        }
    }

    private void doAdd(List<SolrInputDocument> list, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws SolrServerException, IOException {
        if (!this.simulation) {
            long currentTimeMillis = System.currentTimeMillis();
            UpdateResponse add = this.solrServer.add(list);
            log.info("feed time for " + list.size() + " records : " + HumanTime.exactly(System.currentTimeMillis() - currentTimeMillis) + "\n");
            int status = add.getStatus();
            context.getCounter("index feed", "status code: " + status).increment(list.size());
            if (status != 0) {
                throw new SolrServerException("bad status: " + status);
            }
        }
        list.clear();
    }

    protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) throws IOException, InterruptedException {
        super.cleanup(context);
        try {
            if (!this.buffer.isEmpty()) {
                doAdd(this.buffer, context);
            }
            log.info("\nwaiting " + this.shutdownWaitTime + "ms before shutdown");
            Thread.sleep(this.shutdownWaitTime);
            this.solrServer.shutdown();
        } catch (SolrServerException e) {
            System.err.println("couldn't shutdown server " + e.getMessage());
        }
    }

    private void handleError(ImmutableBytesWritable immutableBytesWritable, String str, Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context, SolrInputDocument solrInputDocument, Throwable th) throws IOException, InterruptedException {
        context.getCounter("index feed", th.getClass().getName()).increment(1L);
        context.write(new Text(immutableBytesWritable.copyBytes()), printRottenRecord(context.getTaskAttemptID().toString(), str, solrInputDocument));
    }

    private Text printRottenRecord(String str, String str2, SolrInputDocument solrInputDocument) {
        return new Text("\n**********************************\ntask: " + str + "\n" + check("original", str2.toString() + check("solrDoc", solrInputDocument)));
    }

    private String check(String str, Object obj) {
        return (obj == null || obj.toString().isEmpty()) ? "\n" : "\n " + str + ":\n" + obj + "\n";
    }

    private void logConfiguration(Configuration configuration) {
        log.info("job configutation #################");
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            log.info("'" + ((String) entry.getKey()) + "' : '" + ((String) entry.getValue()) + "'");
        }
        log.info("end of job configutation #################\n\n");
    }

    protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
        map((ImmutableBytesWritable) obj, (Result) obj2, (Mapper<ImmutableBytesWritable, Result, Text, Text>.Context) context);
    }
}
