package eu.dnetlib.lbs.events.manager;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.EventRepository;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.events.output.DispatcherManager;
import eu.dnetlib.lbs.subscriptions.NotificationFrequency;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import eu.dnetlib.lbs.utils.LbsQueue;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/dnetlib/lbs/events/manager/EventManager.class */
public class EventManager implements Runnable {
    private final EventRepository eventRepository;
    private final NotificationRepository notificationRepository;
    private final SubscriptionRepository subscriptionRepo;
    private final DispatcherManager dispatcherManager;
    private final LbsQueue<String, Event> queue;
    private static final Log log = LogFactory.getLog(EventManager.class);

    public EventManager(EventRepository eventRepository, NotificationRepository notificationRepository, SubscriptionRepository subscriptionRepository, DispatcherManager dispatcherManager, LbsQueue<String, Event> lbsQueue) {
        this.eventRepository = eventRepository;
        this.notificationRepository = notificationRepository;
        this.subscriptionRepo = subscriptionRepository;
        this.dispatcherManager = dispatcherManager;
        this.queue = lbsQueue;
    }

    public boolean add(String str) {
        return this.queue.offer(str);
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("Event indexer started: " + Thread.currentThread().getName());
        while (true) {
            List<Event> takeList = this.queue.takeList();
            this.eventRepository.saveAll(takeList);
            takeList.stream().filter((v0) -> {
                return v0.isInstantMessage();
            }).forEach(event -> {
                StreamSupport.stream(this.subscriptionRepo.findByTopic(event.getTopic()).spliterator(), false).filter(subscription -> {
                    return subscription.verifyEventConditions(event);
                }).filter(subscription2 -> {
                    return subscription2.getFrequency() == NotificationFrequency.realtime;
                }).forEach(subscription3 -> {
                    this.notificationRepository.save(new Notification(subscription3, event));
                    this.dispatcherManager.dispatch(subscription3, event);
                });
            });
        }
    }

    public LbsQueue<String, Event> getQueue() {
        return this.queue;
    }
}
