/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.util;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.neo4j.api.core.NeoService;
import org.neo4j.api.core.Node;
import org.neo4j.api.core.Transaction;
import org.neo4j.impl.transaction.DeadlockDetectedException;
import org.neo4j.impl.transaction.UserTransactionImpl;
import org.neo4j.util.DeadlockCapsule;
import org.neo4j.util.NeoTransactionQueue;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public abstract class NeoTransactionQueueWorker
extends Thread {
    private NeoService neo;
    private NeoTransactionQueue workQueue;
    private boolean halted;
    private int maxConsumers;
    private ExecutorService consumers;
    private Set<Integer> consumerTxIds = Collections.synchronizedSet(new HashSet());
    private boolean paused;
    private boolean fallThrough;
    private int batchSize;

    public NeoTransactionQueueWorker(NeoService neo, Node rootNode, int maxConsumers) {
        this(neo, rootNode, maxConsumers, 1);
    }

    public NeoTransactionQueueWorker(NeoService neo, Node rootNode, int maxConsumers, int batchSize) {
        super(NeoTransactionQueueWorker.class.getSimpleName());
        this.neo = neo;
        this.maxConsumers = maxConsumers;
        this.workQueue = this.createQueue(rootNode);
        this.batchSize = batchSize;
    }

    public void add(Map<String, Object> values) {
        for (int i = 0; i < 10; ++i) {
            try {
                this.getQueue().add(this.findTxId(), values);
                return;
            }
            catch (DeadlockDetectedException e) {
                try {
                    Thread.sleep(20L);
                }
                catch (InterruptedException ee) {
                    Thread.interrupted();
                }
                continue;
            }
        }
    }

    private int findTxId() {
        return new UserTransactionImpl(this.neo).getEventIdentifier();
    }

    protected NeoTransactionQueue getQueue() {
        return this.workQueue;
    }

    protected NeoTransactionQueue createQueue(Node rootNode) {
        return new NeoTransactionQueue(this.neo, rootNode);
    }

    public void setPaused(boolean paused) {
        this.paused = paused;
    }

    public boolean isPaused() {
        return this.paused;
    }

    public void setFallThrough(boolean fallThrough) {
        this.fallThrough = fallThrough;
    }

    public boolean isFallThrough() {
        return this.fallThrough;
    }

    public void startUp() {
        this.consumers = new ThreadPoolExecutor(this.maxConsumers, this.maxConsumers, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(this.maxConsumers), new ThreadFactory(){
            private int counter = 1;

            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "SearchUpdateWorker Consumer[" + this.counter++ + "]");
            }
        });
        this.start();
    }

    public void shutDown() {
        this.halted = true;
        this.wakeUp();
        this.consumers.shutdown();
        try {
            this.consumers.awaitTermination(15L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    protected void waitBeforeRun() {
        try {
            Thread.sleep(2000L);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    @Override
    public void run() {
        this.waitBeforeRun();
        while (!this.halted) {
            try {
                if (!this.isPaused()) {
                    this.balanceQueue();
                }
            }
            catch (DeadlockDetectedException e) {
            }
            catch (Throwable e) {
                System.out.println("Error in balance queue:" + e);
            }
            this.waitForChange();
        }
    }

    public boolean isIdle() {
        return this.numberOfConsumers() == 0;
    }

    synchronized void wakeUp() {
        this.notify();
    }

    private void addConsumer(Consumer consumer) {
        this.consumers.submit(consumer);
    }

    private void consumerDone(int txId) {
        this.consumerTxIds.remove(txId);
        this.wakeUp();
    }

    private void doHandleEntry(Map<String, Object> entry) {
        if (this.isFallThrough()) {
            return;
        }
        this.handleEntry(entry);
    }

    protected void beforeBatch() {
    }

    protected void afterBatch() {
    }

    protected abstract void handleEntry(Map<String, Object> var1);

    protected long getWaitTimeoutBetweenBalancing() {
        return 2000L;
    }

    private synchronized void waitForChange() {
        try {
            this.wait(this.getWaitTimeoutBetweenBalancing());
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    private int numberOfConsumers() {
        return this.consumerTxIds.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void balanceQueue() {
        Map<Integer, NeoTransactionQueue.TxQueue> queues = this.getQueue().getQueues();
        HashSet<Integer> queueIds = new HashSet<Integer>(queues.keySet());
        while (!this.halted && this.numberOfConsumers() < this.maxConsumers && !queueIds.isEmpty()) {
            Integer txId = (Integer)queueIds.iterator().next();
            Set<Integer> set = this.consumerTxIds;
            synchronized (set) {
                if (!this.consumerTxIds.contains(txId)) {
                    this.addConsumer(new Consumer(queues.get(txId)));
                    this.consumerTxIds.add(txId);
                }
            }
            queueIds.remove(txId);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class EntryRemover
    extends DeadlockCapsule<Object> {
        private NeoService neo;
        private NeoTransactionQueue.TxQueue queue;
        private int size;

        EntryRemover(NeoService neo, NeoTransactionQueue.TxQueue queue, int size) {
            super("EntryRemover");
            this.neo = neo;
            this.queue = queue;
            this.size = size;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Object tryOnce() {
            Transaction tx = this.neo.beginTx();
            try {
                this.queue.remove(this.size);
                tx.success();
                Object var2_2 = null;
                return var2_2;
            }
            finally {
                tx.finish();
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class Consumer
    implements Runnable {
        private NeoTransactionQueue.TxQueue updateQueue;
        private int txId;

        Consumer(NeoTransactionQueue.TxQueue updateQueue) {
            this.updateQueue = updateQueue;
            this.txId = updateQueue.getTxId();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (!NeoTransactionQueueWorker.this.halted) {
                    if (NeoTransactionQueueWorker.this.isPaused()) {
                        this.sleepSomeTime(1000L);
                        continue;
                    }
                    Collection<Map<String, Object>> entries = this.updateQueue.peek(NeoTransactionQueueWorker.this.batchSize);
                    NeoTransactionQueueWorker.this.beforeBatch();
                    for (Map<String, Object> entry : entries) {
                        this.doOne(entry);
                    }
                    NeoTransactionQueueWorker.this.afterBatch();
                    new EntryRemover(NeoTransactionQueueWorker.this.neo, this.updateQueue, entries.size()).run();
                }
            }
            catch (Throwable throwable) {
            }
            finally {
                NeoTransactionQueueWorker.this.consumerDone(this.txId);
            }
        }

        private void sleepSomeTime(long millis) {
            try {
                Thread.sleep(millis);
            }
            catch (InterruptedException e) {
                Thread.interrupted();
            }
        }

        private void doOne(Map<String, Object> entry) {
            Exception exception = null;
            for (int i = 0; !NeoTransactionQueueWorker.this.halted && i < 10; ++i) {
                try {
                    NeoTransactionQueueWorker.this.doHandleEntry(entry);
                    return;
                }
                catch (Exception e) {
                    exception = e;
                    this.sleepSomeTime(500L);
                    continue;
                }
            }
            this.handleEntryError(entry, exception);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleEntryError(Map<String, Object> entry, Exception exception) {
            Transaction tx = NeoTransactionQueueWorker.this.neo.beginTx();
            try {
                NeoTransactionQueueWorker.this.add(entry);
                tx.success();
            }
            finally {
                tx.finish();
            }
        }
    }
}

