/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.pid.resolver.mdstore.plugin;

import com.mongodb.BasicDBObject;
import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.ReplaceOneModel;
import com.mongodb.client.model.UpdateOptions;
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
import eu.dnetlib.pid.resolver.PIDResolver;
import eu.dnetlib.pid.resolver.mdstore.plugin.RecordResolver;
import eu.dnetlib.pid.resolver.mdstore.plugin.RecordResolverFactory;
import eu.dnetlib.pid.resolver.mdstore.plugin.ResolverSerializer;
import eu.dnetlib.rmi.data.MDStoreServiceException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Autowired;

public class ResolverMDStorePlugin
implements MDStorePlugin {
    private static final Log log = LogFactory.getLog(ResolverMDStorePlugin.class);
    public static DBObject DONE = new BasicDBObject();
    @Autowired
    private List<PIDResolver> pluginResolver;
    @Autowired
    private ResolverSerializer resolverSerializer;
    @Autowired
    private RecordResolverFactory recordResolverFactory;
    @Autowired
    private MDStoreTransactionManagerImpl transactionManager;

    public static void save(MongoCollection<DBObject> collection, DBObject document) {
        Object id = document.get("_id");
        if (id == null) {
            collection.insertOne((Object)document);
        } else {
            collection.replaceOne(Filters.eq((String)"_id", (Object)id), (Object)document, new UpdateOptions().upsert(true));
        }
    }

    public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException {
        try {
            String id = params.get("mdStoreId");
            boolean offline = "true".equals(params.get("offline"));
            int numberOfThreads = 4;
            String numOfThreadsString = params.get("numberOfThreads");
            String resolvingMode = params.get("collectionMode");
            String lastResolveDate = params.get("lastResolveDate");
            try {
                if (!StringUtils.isEmpty((CharSequence)numOfThreadsString)) {
                    numberOfThreads = Integer.parseInt(numOfThreadsString);
                }
            }
            catch (Throwable e) {
                log.error((Object)"Number of threads Param is not an int value it will apply by default 4");
            }
            boolean refresh = "refresh".equalsIgnoreCase(resolvingMode);
            String internalId = this.transactionManager.readMdStore(id);
            MongoDatabase db = this.transactionManager.getDb();
            MongoCollection currentMdStoreCollection = db.getCollection(internalId, DBObject.class);
            MongoCollection resolvedRecord = db.getCollection("resolved_" + StringUtils.substringBefore((String)id, (String)"_"), DBObject.class);
            BasicDBObject idx = new BasicDBObject();
            idx.put((Object)"resolved_ts", (Object)1);
            resolvedRecord.createIndex((Bson)idx);
            if (refresh) {
                resolvedRecord.drop();
            }
            if (!"INCREMENTAL".equalsIgnoreCase(resolvingMode)) {
                this.upsertResolved((MongoCollection<DBObject>)currentMdStoreCollection, (MongoCollection<DBObject>)resolvedRecord, 0L);
            }
            FindIterable mdstoreRecords = "refresh".equalsIgnoreCase(resolvingMode) ? currentMdStoreCollection.find() : currentMdStoreCollection.find(this.dateQuery(lastResolveDate == null ? 0L : Long.parseLong(lastResolveDate), null));
            mdstoreRecords.noCursorTimeout(true);
            ArrayBlockingQueue<DBObject> queue = new ArrayBlockingQueue<DBObject>(100);
            ArrayList<Future<Boolean>> responses = new ArrayList<Future<Boolean>>();
            ExecutorService executor = Executors.newFixedThreadPool(100);
            long total = "refresh".equalsIgnoreCase(resolvingMode) ? currentMdStoreCollection.count() : currentMdStoreCollection.count(this.dateQuery(lastResolveDate == null ? 0L : Long.parseLong(lastResolveDate), null));
            int previousPrintValue = -1;
            long ts = System.currentTimeMillis();
            Collections.sort(this.pluginResolver);
            for (int i = 0; i < numberOfThreads; ++i) {
                RecordResolver resolver = this.recordResolverFactory.createResolver(ts, queue, (MongoCollection<DBObject>)resolvedRecord, this.resolverSerializer, this.pluginResolver, offline, false);
                responses.add(executor.submit(resolver));
            }
            int parsed = 0;
            for (DBObject dBObject : mdstoreRecords) {
                int currentPerc;
                queue.put(dBObject);
                if ((currentPerc = Math.round((float)(++parsed) / (float)total * 100.0f)) == previousPrintValue) continue;
                log.info((Object)("Resolving process " + currentPerc + " %"));
                previousPrintValue = currentPerc;
            }
            queue.put(DONE);
            for (Future future : responses) {
                future.get();
            }
            this.upsertResolved((MongoCollection<DBObject>)currentMdStoreCollection, (MongoCollection<DBObject>)resolvedRecord, ts - 1L);
            doneCallback.call(params);
        }
        catch (Throwable e) {
            log.error((Object)e);
            throw new MDStoreServiceException("Error on resolving records ", e);
        }
    }

    private Bson dateQuery(Long from, Long until) {
        if (from != null & until != null) {
            return Filters.and((Bson[])new Bson[]{Filters.gt((String)"timestamp", (Object)from), Filters.lt((String)"timestamp", (Object)until)});
        }
        if (from != null) {
            return Filters.gt((String)"timestamp", (Object)from);
        }
        if (until != null) {
            return Filters.lt((String)"timestamp", (Object)until);
        }
        return null;
    }

    private void upsertResolved(MongoCollection<DBObject> currentMdStoreCollection, MongoCollection<DBObject> resolvedRecord, long timestamp) {
        log.info((Object)"Updating resolved objects");
        Bson queryByTs = Filters.gte((String)"resolved_ts", (Object)timestamp);
        int i = 0;
        FindIterable dbObjects = timestamp == 0L ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
        UpdateOptions f = new UpdateOptions().upsert(true);
        ArrayList<ReplaceOneModel> upsertList = new ArrayList<ReplaceOneModel>();
        BulkWriteOptions writeOptions = new BulkWriteOptions().ordered(false);
        int bulkSize = 1000;
        long validOpCounter = 0L;
        for (DBObject object : dbObjects) {
            if (!StringUtils.isNotBlank((CharSequence)object.get("id").toString())) continue;
            DBObject replacedObj = BasicDBObjectBuilder.start().add("body", (Object)object.get("body").toString()).add("id", (Object)object.get("id").toString()).add("resolved_ts", object.get("resolved_ts")).get();
            upsertList.add(new ReplaceOneModel((Bson)new BasicDBObject("id", (Object)object.get("id").toString()), (Object)replacedObj, f));
            if (++validOpCounter % 1000L != 0L || validOpCounter == 0L) continue;
            currentMdStoreCollection.bulkWrite(upsertList, writeOptions);
            upsertList.clear();
            log.info((Object)("Transaction commit: Upserting: " + validOpCounter));
        }
        if (upsertList.size() > 0) {
            currentMdStoreCollection.bulkWrite(upsertList, writeOptions);
        }
        log.info((Object)("Updated " + i));
    }
}

