/*
 * Decompiled with CFR 0.152.
 */
package org.apache.jackrabbit.oak.jcr.observation;

import com.google.common.base.Preconditions;
import com.google.common.collect.ForwardingIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Monitor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jcr.observation.EventListener;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.api.stats.RepositoryStatistics;
import org.apache.jackrabbit.commons.iterator.EventIteratorAdapter;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.jcr.observation.EventQueue;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.filter.ACFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.EventFilter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.security.authorization.permission.PermissionProvider;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimeSeriesMax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ChangeProcessor
implements Observer {
    private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
    public static final double DELAY_THRESHOLD = 0.8;
    public static final int MAX_DELAY = 10000;
    private final ContentSession contentSession;
    private final NamePathMapper namePathMapper;
    private final PermissionProvider permissionProvider;
    private final ListenerTracker tracker;
    private final EventListener eventListener;
    private final AtomicReference<List<FilterProvider>> filterProvider;
    private final AtomicLong eventCount;
    private final AtomicLong eventDuration;
    private final TimeSeriesMax maxQueueLength;
    private final int queueLength;
    private final CommitRateLimiter commitRateLimiter;
    private CompositeRegistration registration;
    private volatile NodeState previousRoot;
    private final Monitor runningMonitor = new Monitor();
    private final RunningGuard running = new RunningGuard(this.runningMonitor);

    public ChangeProcessor(ContentSession contentSession, NamePathMapper namePathMapper, PermissionProvider permissionProvider, ListenerTracker tracker, List<FilterProvider> filters, StatisticManager statisticManager, int queueLength, CommitRateLimiter commitRateLimiter) {
        this.contentSession = contentSession;
        this.namePathMapper = namePathMapper;
        this.permissionProvider = permissionProvider;
        this.tracker = tracker;
        this.eventListener = tracker.getTrackedListener();
        this.filterProvider = new AtomicReference<List<FilterProvider>>(filters);
        this.eventCount = statisticManager.getCounter(RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER);
        this.eventDuration = statisticManager.getCounter(RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION);
        this.maxQueueLength = statisticManager.maxQueLengthRecorder();
        this.queueLength = queueLength;
        this.commitRateLimiter = commitRateLimiter;
    }

    public void setFilterProvider(List<FilterProvider> filters) {
        this.filterProvider.set(filters);
    }

    public synchronized void start(Whiteboard whiteboard) {
        Preconditions.checkState(this.registration == null, "Change processor started already");
        final WhiteboardExecutor executor = new WhiteboardExecutor();
        executor.start(whiteboard);
        final BackgroundObserver observer = this.createObserver(executor);
        this.registration = new CompositeRegistration(WhiteboardUtils.registerObserver(whiteboard, observer), WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class, this.tracker.getListenerMBean(), "EventListener", this.tracker.toString()), new Registration(){

            @Override
            public void unregister() {
                observer.close();
            }
        }, new Registration(){

            @Override
            public void unregister() {
                executor.stop();
            }
        });
    }

    private BackgroundObserver createObserver(WhiteboardExecutor executor) {
        return new BackgroundObserver(this, executor, this.queueLength){
            private volatile long delay;
            private volatile boolean blocking;

            @Override
            protected void added(int queueSize) {
                ChangeProcessor.this.maxQueueLength.recordValue(queueSize);
                if (queueSize == ChangeProcessor.this.queueLength) {
                    if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (!this.blocking) {
                            LOG.warn("Revision queue is full. Further commits will be blocked.");
                        }
                        ChangeProcessor.this.commitRateLimiter.blockCommits();
                    } else if (!this.blocking) {
                        LOG.warn("Revision queue is full. Further revisions will be compacted.");
                    }
                    this.blocking = true;
                } else {
                    double fillRatio = (double)queueSize / (double)ChangeProcessor.this.queueLength;
                    if (fillRatio > 0.8) {
                        if (ChangeProcessor.this.commitRateLimiter != null) {
                            int newDelay;
                            if (this.delay == 0L) {
                                LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
                            }
                            if ((long)(newDelay = 1 + (int)((fillRatio - 0.8) / 0.19999999999999996 * 10000.0)) > this.delay) {
                                this.delay = newDelay;
                                ChangeProcessor.this.commitRateLimiter.setDelay(this.delay);
                            }
                        }
                    } else if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (this.delay > 0L) {
                            LOG.debug("Revision queue becoming empty. Unblocking commits");
                            ChangeProcessor.this.commitRateLimiter.setDelay(0L);
                            this.delay = 0L;
                        }
                        if (this.blocking) {
                            LOG.debug("Revision queue becoming empty. Stop delaying commits.");
                            ChangeProcessor.this.commitRateLimiter.unblockCommits();
                            this.blocking = false;
                        }
                    }
                }
            }
        };
    }

    public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
        Preconditions.checkState(this.registration != null, "Change processor not started");
        if (this.running.stop()) {
            if (this.runningMonitor.enter(timeOut, unit)) {
                this.registration.unregister();
                this.runningMonitor.leave();
                return true;
            }
            return false;
        }
        return true;
    }

    public synchronized void stop() {
        Preconditions.checkState(this.registration != null, "Change processor not started");
        if (this.running.stop()) {
            this.registration.unregister();
            this.runningMonitor.leave();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void contentChanged(@Nonnull NodeState root, @Nullable CommitInfo info) {
        block7: {
            if (this.previousRoot != null) {
                try {
                    List<FilterProvider> providers = this.filterProvider.get();
                    ArrayList<EventQueue> eventQueues = Lists.newArrayList();
                    for (FilterProvider provider : providers) {
                        if (!provider.includeCommit(this.contentSession.toString(), info)) continue;
                        String basePath = provider.getPath();
                        EventFilter userFilter = provider.getFilter(this.previousRoot, root);
                        ACFilter acFilter = new ACFilter(this.previousRoot, root, this.permissionProvider, basePath);
                        EventQueue events = new EventQueue(this.namePathMapper, info, this.previousRoot, root, basePath, Filters.all(userFilter, acFilter));
                        eventQueues.add(events);
                    }
                    Iterator events = Iterators.concat(eventQueues.iterator());
                    if (!events.hasNext() || !this.runningMonitor.enterIf(this.running)) break block7;
                    try {
                        this.eventListener.onEvent(new EventIteratorAdapter(this.statisticProvider(events)));
                    }
                    finally {
                        this.runningMonitor.leave();
                    }
                }
                catch (Exception e) {
                    LOG.warn("Error while dispatching observation events", (Throwable)e);
                }
            }
        }
        this.previousRoot = root;
    }

    private <T> Iterator<T> statisticProvider(final Iterator<T> events) {
        return new ForwardingIterator<T>(){

            @Override
            protected Iterator<T> delegate() {
                return events;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public T next() {
                long t0 = System.nanoTime();
                try {
                    Object t = super.next();
                    return t;
                }
                finally {
                    ChangeProcessor.this.eventCount.incrementAndGet();
                    ChangeProcessor.this.eventDuration.addAndGet(System.nanoTime() - t0);
                }
            }
        };
    }

    private static class RunningGuard
    extends Monitor.Guard {
        private boolean stopped;

        public RunningGuard(Monitor monitor) {
            super(monitor);
        }

        @Override
        public boolean isSatisfied() {
            return !this.stopped;
        }

        public boolean stop() {
            boolean wasStopped = this.stopped;
            this.stopped = true;
            return !wasStopped;
        }
    }
}

