/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.solr;

import com.clearspring.analytics.util.Lists;
import eu.dnetlib.dhp.solr.CacheCloudSolrClient;
import eu.dnetlib.dhp.solr.CloudClientParams;
import eu.dnetlib.dhp.solr.mapping.RowToSolrInputDocumentMapper;
import eu.dnetlib.dhp.utils.SparkSessionSupport;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordImporter
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(RecordImporter.class);
    public static final int BATCH_SIZE = 1000;
    private static final int MAX_RETRIES = 3;
    public static final int RETRY_DELAY = 3000;
    private static final StructType PAYLOAD_SCHEMA = StructType.fromDDL((String)"xml STRING, json STRING");

    public static void importRecords(SparkConf conf, String zkHost, String collection, String path, int batchSize, boolean shouldCommit, boolean shouldFilterXmlPayload) {
        SparkSessionSupport.runWithSparkSession(conf, true, spark -> {
            CloudClientParams params = new CloudClientParams(zkHost, collection);
            RecordImporter.indexDocs(params, batchSize, (Dataset<Row>)spark.read().schema(PAYLOAD_SCHEMA).json(path), shouldFilterXmlPayload);
            log.info("record import completed");
            if (shouldCommit) {
                CloudSolrClient client = CacheCloudSolrClient.getCachedCloudClient(params);
                UpdateResponse commitRsp = client.commit(collection);
                if (commitRsp.getStatus() != 0) {
                    log.error("got exception during commit operation", (Throwable)commitRsp.getException());
                    throw commitRsp.getException();
                }
                log.info("commit done");
            }
        });
    }

    private static void indexDocs(CloudClientParams params, int batchSize, Dataset<Row> docs, boolean shouldFilterXmlPayload) {
        docs.foreachPartition((ForeachPartitionFunction & Serializable)solrDocs -> {
            try {
                CloudSolrClient client = CacheCloudSolrClient.getCachedCloudClient(params);
                List batch = Lists.newArrayList();
                while (solrDocs.hasNext()) {
                    SolrInputDocument doc = RowToSolrInputDocumentMapper.map((Row)solrDocs.next(), shouldFilterXmlPayload);
                    if (RecordImporter.wouldBatchBeFull(batch.size(), batchSize)) {
                        RecordImporter.sendBatchToSolrWithRetry(params, (SolrClient)client, batch);
                        batch.clear();
                    }
                    batch.add(doc);
                }
                if (!batch.isEmpty()) {
                    RecordImporter.sendBatchToSolrWithRetry(params, (SolrClient)client, batch);
                    batch.clear();
                }
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static void sendBatchToSolrWithRetry(CloudClientParams params, SolrClient solrClient, List<SolrInputDocument> batch) throws ExecutionException {
        try {
            RecordImporter.sendBatchToSolr(params, solrClient, batch, 1, 3000);
        }
        catch (Exception e) {
            Throwable e1 = SolrException.getRootCause((Throwable)e);
            if (e1 instanceof KeeperException.SessionExpiredException || e1 instanceof KeeperException.OperationTimeoutException) {
                log.error("Error indexing batch to collection {} ; will retry ... \n\nERROR: {}", (Object)params.getCollection(), (Object)e.toString());
                CacheCloudSolrClient.invalidateCachedClient(params);
                CloudSolrClient newClient = CacheCloudSolrClient.getCachedCloudClient(params);
                RecordImporter.sendBatchToSolr(params, (SolrClient)newClient, batch, 1, 3000);
            }
            throw new ExecutionException(e.getMessage(), e);
        }
    }

    private static void sendBatchToSolr(CloudClientParams params, SolrClient solrClient, List<SolrInputDocument> batch, int attempt, int retryDelay) throws ExecutionException {
        if (attempt > 3) {
            String msg = String.format("Reached max number of allowed retries %d, failing...", 3);
            log.error(msg);
            throw new ExecutionException(new RuntimeException(msg));
        }
        UpdateRequest req = new UpdateRequest();
        req.setParam("collection", params.getCollection());
        long initialTime = System.currentTimeMillis();
        log.info("Sending batch of {} to collection {} attempt {}", new Object[]{batch.size(), params.getCollection(), attempt});
        req.add(batch);
        try {
            solrClient.request((SolrRequest)req);
            double timeTaken = (double)(System.currentTimeMillis() - initialTime) / 1000.0;
            log.info("Took '{}' secs to index {} documents", (Object)timeTaken, (Object)batch.size());
        }
        catch (Exception e) {
            log.error("Error indexing batch to collection {} ; attempt {} ; will retry ... \n\nERROR: {}", new Object[]{params.getCollection(), attempt, e.toString()});
            try {
                Thread.sleep(retryDelay);
            }
            catch (InterruptedException ie) {
                Thread.interrupted();
            }
            RecordImporter.sendBatchToSolr(params, solrClient, batch, attempt + 1, retryDelay * 2);
        }
    }

    private static boolean wouldBatchBeFull(int numDocsInBatch, int batchSize) {
        return numDocsInBatch > 0 && numDocsInBatch >= batchSize;
    }
}

