/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.broker.events.output;

import com.google.common.base.Throwables;
import eu.dnetlib.broker.common.elasticsearch.Event;
import eu.dnetlib.broker.common.subscriptions.Subscription;
import eu.dnetlib.broker.events.output.NotificationDispatcher;
import eu.dnetlib.broker.utils.LbsQueue;
import eu.dnetlib.broker.utils.QueueManager;
import eu.dnetlib.broker.utils.ThreadManager;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractNotificationDispatcher<T>
implements NotificationDispatcher,
BeanNameAware,
Runnable {
    private String dispatcherName;
    @Autowired
    private QueueManager queueManager;
    @Autowired
    private ThreadManager threadManager;
    private LbsQueue<T, T> queue;
    private final AtomicLong count = new AtomicLong(0L);
    private final AtomicLong countErrors = new AtomicLong(0L);
    private String lastError = "";
    private static final Log log = LogFactory.getLog(AbstractNotificationDispatcher.class);

    @PostConstruct
    public void init() {
        this.queue = this.queueManager.newQueue(this.dispatcherName + "-queue");
        this.threadManager.newThread(this.dispatcherName, (Runnable)this);
    }

    @Override
    public void run() {
        block2: while (true) {
            Iterator iterator = this.queue.takeList().iterator();
            while (true) {
                if (!iterator.hasNext()) continue block2;
                Object message = iterator.next();
                if (message == null) continue;
                try {
                    this.performAction(message);
                    this.count.incrementAndGet();
                    continue;
                }
                catch (Throwable e) {
                    log.error((Object)"Error sending notification", e);
                    this.countErrors.incrementAndGet();
                    this.lastError = e.getMessage() + "\nStacktrave:\n" + Throwables.getStackTraceAsString((Throwable)e);
                    continue;
                }
                break;
            }
            break;
        }
    }

    public void sendNotification(Subscription subscription, Event ... events) {
        try {
            this.queue.offer(this.prepareAction(subscription, events));
        }
        catch (Exception e) {
            log.error((Object)"Error sending notification", (Throwable)e);
        }
    }

    public void sendNotification(Subscription subscription, Map<String, Object> params) {
        try {
            this.queue.offer(this.prepareAction(subscription, params));
        }
        catch (Exception e) {
            log.error((Object)"Error sending notification", (Throwable)e);
        }
    }

    protected abstract T prepareAction(Subscription var1, Map<String, Object> var2) throws Exception;

    protected abstract T prepareAction(Subscription var1, Event ... var2) throws Exception;

    protected abstract void performAction(T var1) throws Exception;

    public String getDispatcherName() {
        return this.dispatcherName;
    }

    public void setDispatcherName(String dispatcherName) {
        this.dispatcherName = dispatcherName;
    }

    public long count() {
        return this.count.get();
    }

    public long countErrors() {
        return this.countErrors.get();
    }

    public String lastError() {
        return this.lastError;
    }

    public void resetCount() {
        this.count.set(0L);
        this.countErrors.set(0L);
        this.lastError = "";
    }

    public void setBeanName(String name) {
        if (StringUtils.isBlank((CharSequence)this.getDispatcherName())) {
            this.setDispatcherName(name);
        }
    }
}

