/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mdstore.modular.mongodb;

import com.google.common.collect.Lists;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.QueryBuilder;
import com.mongodb.client.FindIterable;
import com.mongodb.client.ListIndexesIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.IndexOptions;
import eu.dnetlib.data.mdstore.modular.RecordParser;
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
import eu.dnetlib.data.mdstore.modular.mongodb.MongoBulkWritesManager;
import eu.dnetlib.data.mdstore.modular.mongodb.MongoResultSetListener;
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
import eu.dnetlib.rmi.data.DocumentNotFoundException;
import eu.dnetlib.rmi.data.MDStoreServiceException;
import java.util.Arrays;
import java.util.List;
import java.util.Spliterators;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.bson.conversions.Bson;
import org.springframework.beans.factory.annotation.Required;

public class MongoMDStore
implements MDStore {
    private static final int BULK_SIZE = 100;
    private static final Log log = LogFactory.getLog(MongoMDStore.class);
    private static List<String> requiredIndices = Arrays.asList("{ \"id\" : 1}", "{ \"timestamp\" : 1}", "{ \"originalId\" : 1}");
    private final boolean discardRecords;
    private String id;
    private MongoDatabase mongoDatabase;
    private MongoCollection<DBObject> collection;
    private MongoCollection<DBObject> discardedCollection;
    private RecordParser recordParser;
    private static List<String> indices = Lists.newArrayList((Object[])new String[]{"id", "timestamp", "originalId"});
    private final IndexOptions options = new IndexOptions().background(true);

    public MongoMDStore(String id, MongoCollection<DBObject> collection, RecordParser recordParser, boolean discardRecords, MongoDatabase mongoDatabase) {
        this.id = id;
        this.mongoDatabase = mongoDatabase;
        this.collection = collection;
        this.discardedCollection = this.mongoDatabase.getCollection("discarded-" + StringUtils.substringBefore((String)id, (String)"_"), DBObject.class);
        this.recordParser = recordParser;
        this.discardRecords = discardRecords;
    }

    @Override
    public int feed(Iterable<String> records, boolean incremental) {
        this.ensureIndices();
        final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
        final Object sentinel = new Object();
        int countStored = 0;
        Callable<Integer> writer = new Callable<Integer>(){

            @Override
            public Integer call() throws Exception {
                MongoBulkWritesManager bulkWritesManager = new MongoBulkWritesManager((MongoCollection<DBObject>)MongoMDStore.this.collection, (MongoCollection<DBObject>)MongoMDStore.this.discardedCollection, 100, MongoMDStore.this.recordParser, MongoMDStore.this.discardRecords);
                int count = 0;
                try {
                    while (true) {
                        Object record;
                        if ((record = queue.take()) == sentinel) {
                            bulkWritesManager.flushBulks();
                            break;
                        }
                        ++count;
                        bulkWritesManager.insert((String)record);
                    }
                }
                catch (InterruptedException e) {
                    log.fatal((Object)"got exception in background thread", (Throwable)e);
                    throw new IllegalStateException(e);
                }
                log.debug((Object)String.format("extracted %s records from feeder queue", count));
                return count;
            }
        };
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> storedCountInt = executorService.submit(writer);
        try {
            log.info((Object)("feeding mdstore " + this.id));
            if (records != null) {
                for (String record : records) {
                    queue.put(record);
                }
            }
            queue.put(sentinel);
            countStored = storedCountInt.get();
        }
        catch (InterruptedException e) {
            log.error((Object)("Error on feeding mdstore with id:" + this.id), (Throwable)e);
            throw new IllegalStateException(e);
        }
        catch (ExecutionException e) {
            log.error((Object)("Error on feeding mdstore with id:" + this.id), (Throwable)e);
            throw new IllegalStateException(e);
        }
        log.info((Object)("finished feeding mdstore " + this.id));
        this.ensureIndices();
        this.collection.createIndex((Bson)new BasicDBObject("id", (Object)1));
        return countStored;
    }

    public void ensureIndices() {
        for (String key : indices) {
            this.collection.createIndex((Bson)new BasicDBObject(key, (Object)1), this.options);
        }
    }

    public boolean isIndexed() {
        ListIndexesIterable listIndexesIterable = this.collection.listIndexes(DBObject.class);
        Stream<DBObject> inputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(listIndexesIterable.iterator(), 16), false);
        List key = inputStream.map(dbo -> new BasicDBObject(dbo.toMap()).getString("key")).collect(Collectors.toList());
        return requiredIndices.stream().allMatch(key::contains);
    }

    public void replace(String grep, String replace) {
        Pattern regex = Pattern.compile(grep, 8);
        BasicDBObject query = (BasicDBObject)QueryBuilder.start((String)"body").regex(regex).get();
        FindIterable matches = this.collection.find((Bson)query, DBObject.class);
        if (log.isDebugEnabled()) {
            log.debug((Object)("FOUND: " + this.collection.count((Bson)query)));
        }
        for (DBObject match : matches) {
            BasicDBObject o = new BasicDBObject(match.toMap());
            o.put("body", (Object)regex.matcher((String)match.get("body")).replaceAll(replace));
            this.collection.findOneAndReplace((Bson)new BasicDBObject("_id", o.get("_id")), (Object)o);
        }
    }

    @Override
    public ResultSetListener<String> deliver(String from, String until, String recordFilter, boolean noCursorTimeout) {
        return this.deliver(from, until, recordFilter, new SerializeMongoRecord(), noCursorTimeout);
    }

    @Override
    public ResultSetListener<String> deliverIds(String from, String until, String recordFilter) {
        return this.deliver(from, until, recordFilter, new SerializeMongoRecordId(), false);
    }

    public ResultSetListener deliver(String from, String until, String recordFilter, Function<DBObject, String> serializer, boolean noCursorTimeout) {
        try {
            Pattern filter = recordFilter != null && recordFilter.length() > 0 ? Pattern.compile(recordFilter, 8) : null;
            return new MongoResultSetListener(this.collection, this.parseLong(from), this.parseLong(until), filter, serializer, 100, noCursorTimeout);
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    private Long parseLong(String s) throws MDStoreServiceException {
        if (StringUtils.isBlank((CharSequence)s)) {
            return null;
        }
        try {
            Long l = Long.valueOf(s);
            if (l == 0L || l == -1L) {
                return null;
            }
            return l;
        }
        catch (NumberFormatException e) {
            throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", (Throwable)e);
        }
    }

    @Override
    public Iterable<String> iterate() {
        Stream<DBObject> inputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(this.collection.find().iterator(), 16), false);
        return inputStream.map(it -> (String)it.get("body")).collect(Collectors.toList());
    }

    @Override
    public void deleteRecord(String recordId) {
        this.collection.deleteOne((Bson)new BasicDBObject("id", (Object)recordId));
    }

    @Override
    public String getRecord(String recordId) throws DocumentNotFoundException {
        DBObject obj = (DBObject)this.collection.find((Bson)new BasicDBObject("id", (Object)recordId)).first();
        if (obj == null || !obj.containsField("body")) {
            throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, this.id));
        }
        String body = (String)obj.get("body");
        if (body.trim().length() == 0) {
            throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, this.id));
        }
        return new SerializeMongoRecord().apply(obj);
    }

    @Override
    public void truncate() {
        this.collection.drop();
        this.discardedCollection.drop();
    }

    public DBObject getMDStoreMetadata() {
        return (DBObject)this.mongoDatabase.getCollection("metadata", DBObject.class).find((Bson)new BasicDBObject("mdId", (Object)this.getId())).first();
    }

    @Override
    public String getFormat() {
        return (String)this.getMDStoreMetadata().get("format");
    }

    @Override
    public String getInterpretation() {
        return (String)this.getMDStoreMetadata().get("interpretation");
    }

    @Override
    public String getLayout() {
        return (String)this.getMDStoreMetadata().get("layout");
    }

    @Override
    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public MongoCollection<DBObject> getCollection() {
        return this.collection;
    }

    public void setCollection(MongoCollection<DBObject> collection) {
        this.collection = collection;
    }

    public RecordParser getRecordParser() {
        return this.recordParser;
    }

    @Required
    public void setRecordParser(RecordParser recordParser) {
        this.recordParser = recordParser;
    }

    @Override
    public int getSize() {
        return (int)this.collection.count();
    }

    public MongoCollection<DBObject> getDiscardedCollection() {
        return this.discardedCollection;
    }

    public void setDiscardedCollection(MongoCollection<DBObject> discardedCollection) {
        this.discardedCollection = discardedCollection;
    }

    private class SerializeMongoRecordId
    implements Function<DBObject, String> {
        private SerializeMongoRecordId() {
        }

        @Override
        public String apply(DBObject arg) {
            return (String)arg.get("id");
        }
    }

    private class SerializeMongoRecord
    implements Function<DBObject, String> {
        private SerializeMongoRecord() {
        }

        @Override
        public String apply(DBObject arg) {
            return (String)arg.get("body");
        }
    }
}

