package org.gcube.socialnetworking.socialdataindexer;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.NoNodeAvailableException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.gcube.portal.databook.server.DBCassandraAstyanaxImpl;
import org.gcube.portal.databook.server.DatabookStore;
import org.gcube.portal.databook.shared.Comment;
import org.gcube.portal.databook.shared.EnhancedFeed;
import org.gcube.portal.databook.shared.Feed;
import org.gcube.socialnetworking.social_data_indexing_common.ex.BulkInsertionFailedException;
import org.gcube.socialnetworking.social_data_indexing_common.utils.ElasticSearchRunningCluster;
import org.gcube.socialnetworking.social_data_indexing_common.utils.IndexFields;
import org.gcube.vremanagement.executor.plugin.Plugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/gcube/socialnetworking/socialdataindexer/SocialDataIndexerPlugin.class */
public class SocialDataIndexerPlugin extends Plugin<SocialDataIndexerPluginDeclaration> {
    private static Logger logger = LoggerFactory.getLogger(SocialDataIndexerPlugin.class);
    private String clusterName;
    private List<String> hostsToContact;
    private List<Integer> portNumbers;
    private TransportClient client;
    private DatabookStore store;
    private int count;

    public SocialDataIndexerPlugin(SocialDataIndexerPluginDeclaration socialDataIndexerPluginDeclaration) {
        super(socialDataIndexerPluginDeclaration);
        this.count = 0;
        logger.debug("Constructor");
    }

    @Override // org.gcube.vremanagement.executor.plugin.Plugin
    public void launch(Map<String, Object> map) {
        try {
            String str = null;
            if (map.containsKey("scope")) {
                str = (String) map.get("scope");
            } else {
                logger.error("Scope variable is not set. The context will be evaluated later...");
            }
            this.store = new DBCassandraAstyanaxImpl(str);
            ElasticSearchRunningCluster elasticSearchRunningCluster = new ElasticSearchRunningCluster(str);
            this.clusterName = elasticSearchRunningCluster.getClusterName();
            this.hostsToContact = elasticSearchRunningCluster.getHosts();
            this.portNumbers = elasticSearchRunningCluster.getPorts();
            logger.debug("Creating elasticsearch client connection for hosts = " + this.hostsToContact + ", ports = " + this.portNumbers + " and  cluster's name = " + this.clusterName);
            this.client = TransportClient.builder().settings(Settings.settingsBuilder().put(ClusterName.SETTING, this.clusterName).put("client.transport.sniff", true).build()).build();
            int i = 0;
            for (int i2 = 0; i2 < this.hostsToContact.size(); i2++) {
                try {
                    this.client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(this.hostsToContact.get(i2)), this.portNumbers.get(i2).intValue()));
                    i++;
                } catch (UnknownHostException e) {
                    logger.error("Error while adding " + this.hostsToContact.get(i2) + ":" + this.portNumbers.get(i2) + " as host to be contacted.");
                }
            }
            if (i == 0) {
                logger.error("Unable to reach elasticsearch cluster. Exiting ...");
                return;
            }
            logger.debug("Connection to ElasticSearch cluster done. Synchronization starts running...");
            long currentTimeMillis = System.currentTimeMillis();
            for (String str2 : this.store.getAllVREIds()) {
                try {
                    List<Feed> allFeedsByVRE = this.store.getAllFeedsByVRE(str2);
                    addEnhancedFeedsInBulk(allFeedsByVRE, currentTimeMillis);
                    logger.debug("Number of indexed feeds is " + allFeedsByVRE.size() + " for vre " + str2);
                } catch (Exception e2) {
                    logger.debug("Exception while saving feeds/comments into the index for vre " + str2, e2);
                }
            }
            logger.debug("Inserted " + this.count + " docs");
            this.client.admin().indices().prepareRefresh(new String[0]).execute().actionGet();
            deleteDocumentsWithTimestampLowerThan(currentTimeMillis);
            logger.debug("Synchronization thread ends running. It took " + (System.currentTimeMillis() - currentTimeMillis) + " milliseconds  that is " + ((r0 - currentTimeMillis) / 60000.0d) + " minutes.");
        } catch (Exception e3) {
            logger.error("Error while synchronizing data.", e3);
        }
    }

    protected void finalize() {
        if (this.client != null) {
            logger.debug("Closing connection to elasticsearch cluster. " + this.client.toString());
            this.client.close();
        }
    }

    private void addEnhancedFeedsInBulk(List<Feed> list, long j) throws BulkInsertionFailedException {
        logger.debug("Starting bulk insert enhanced feeds operation");
        BulkProcessor build = BulkProcessor.builder(this.client, new BulkProcessor.Listener() { // from class: org.gcube.socialnetworking.socialdataindexer.SocialDataIndexerPlugin.1
            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void beforeBulk(long j2, BulkRequest bulkRequest) {
                SocialDataIndexerPlugin.logger.debug("Going to execute new bulk composed of {} actions", Integer.valueOf(bulkRequest.numberOfActions()));
            }

            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void afterBulk(long j2, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                SocialDataIndexerPlugin.logger.debug("Executed bulk composed of {} actions", Integer.valueOf(bulkRequest.numberOfActions()));
                if (bulkResponse.hasFailures()) {
                    SocialDataIndexerPlugin.logger.warn("There was failures while executing bulk", bulkResponse.buildFailureMessage());
                    if (SocialDataIndexerPlugin.logger.isDebugEnabled()) {
                        for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                            if (bulkItemResponse.isFailed()) {
                                SocialDataIndexerPlugin.logger.debug("Error for {}/{}/{} for {} operation: {}", new Object[]{bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId(), bulkItemResponse.getOpType(), bulkItemResponse.getFailureMessage()});
                            }
                        }
                    }
                }
            }

            @Override // org.elasticsearch.action.bulk.BulkProcessor.Listener
            public void afterBulk(long j2, BulkRequest bulkRequest, Throwable th) {
                SocialDataIndexerPlugin.logger.error("Error executing bulk", th);
                if (th instanceof NoNodeAvailableException) {
                    throw new RuntimeException("No node available. Exiting...");
                }
            }
        }).setBulkActions(1000).setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(5L)).setConcurrentRequests(0).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(50L), 8)).build();
        for (Feed feed : list) {
            String str = null;
            try {
                String enhanceAndConvertToJson = enhanceAndConvertToJson(feed);
                str = feed.getKey();
                build.add(new IndexRequest(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, str).timestamp(String.valueOf(j)).source(enhanceAndConvertToJson));
                this.count++;
            } catch (Exception e) {
                logger.error("Skip inserting feed with id " + str, e);
            }
        }
        try {
            build.awaitClose(60000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
            logger.debug("Interrupted while waiting for awaitClose()", e2);
        }
    }

    private String enhanceAndConvertToJson(Feed feed) throws Exception {
        boolean isMultiFileUpload = feed.isMultiFileUpload();
        ArrayList arrayList = new ArrayList();
        if (isMultiFileUpload) {
            logger.debug("Retrieving attachments for feed with id=" + feed.getKey());
            arrayList = (ArrayList) this.store.getAttachmentsByFeedId(feed.getKey());
        }
        return new ObjectMapper().writeValueAsString(new EnhancedFeed(feed, false, false, getAllCommentsByFeed(feed.getKey()), arrayList));
    }

    private ArrayList<Comment> getAllCommentsByFeed(String str) {
        ArrayList<Comment> arrayList = (ArrayList) this.store.getAllCommentByFeed(str);
        Collections.sort(arrayList);
        return arrayList;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void deleteDocumentsWithTimestampLowerThan(long j) {
        logger.debug("Removing docs with timestamp lower than " + j);
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        boolQuery.must(QueryBuilders.matchAllQuery());
        boolQuery.filter(QueryBuilders.rangeQuery("_timestamp").gte(0).lt(j));
        SearchResponse actionGet = this.client.prepareSearch(IndexFields.INDEX_NAME).setSize(100).setScroll(new TimeValue(60000L)).setQuery(boolQuery).execute().actionGet();
        int i = 0;
        do {
            for (SearchHit searchHit : actionGet.getHits().getHits()) {
                if (((DeleteResponse) this.client.prepareDelete(IndexFields.INDEX_NAME, IndexFields.EF_FEEDS_TABLE, searchHit.getId()).get()).isFound()) {
                    i++;
                }
            }
            actionGet = this.client.prepareSearchScroll(actionGet.getScrollId()).setScroll(new TimeValue(60000L)).execute().actionGet();
        } while (actionGet.getHits().getHits().length != 0);
        logger.debug("No more hits to delete");
        logger.debug("Number of delete documents is " + i);
    }

    @Override // org.gcube.vremanagement.executor.plugin.Plugin
    protected void onStop() throws Exception {
        logger.debug("onStop()");
        Thread.currentThread().interrupt();
    }
}
