package gr.uoa.di.madgik.searchlibrary.operatorlibrary.merge;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.stats.StatsContainer;
import java.util.Calendar;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/operatorlibrary-1.3.1-3.4.0.jar:gr/uoa/di/madgik/searchlibrary/operatorlibrary/merge/MergeWorker.class */
public class MergeWorker extends Thread {
    private Vector<ReaderHolder> readers;
    private OperationMode operationMode;
    private String uid;
    private long timeout;
    private TimeUnit timeUnit;
    private StatsContainer stats;
    private Logger logger = LoggerFactory.getLogger(MergeWorker.class.getName());
    private RecordWriter<Record> writer = null;
    private int[] recordDefinitionOffsets = null;
    private Object synchWriterInit = new Object();
    private int count = 0;
    private long firststop = 0;

    public MergeWorker(Vector<ReaderHolder> vector, StatsContainer statsContainer, OperationMode operationMode, long j, TimeUnit timeUnit, String str) {
        this.readers = null;
        this.uid = null;
        this.readers = vector;
        this.stats = statsContainer;
        this.operationMode = operationMode;
        this.timeout = j;
        this.timeUnit = timeUnit;
        this.uid = str;
    }

    public void setWriter(RecordWriter<Record> recordWriter) {
        this.writer = recordWriter;
    }

    public void setRecordDefinitionOffsets(int[] iArr) {
        this.recordDefinitionOffsets = iArr;
    }

    public Object getWriterInitSyncObject() {
        return this.synchWriterInit;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        ScanDispatcher sortScanDispatcher;
        long timeInMillis = Calendar.getInstance().getTimeInMillis();
        setName("Merge Worker");
        ArrayBlockingQueue arrayBlockingQueue = null;
        try {
            this.logger.info("Queue capacity for " + this.uid + " is 100");
            arrayBlockingQueue = new ArrayBlockingQueue(100);
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            ReaderScan[] readerScanArr = new ReaderScan[this.readers.size()];
            for (int i = 0; i < this.readers.size(); i++) {
                readerScanArr[i] = new ReaderScan(this.readers, i, arrayBlockingQueue, concurrentLinkedQueue, this.timeout, this.timeUnit, this.uid, this.operationMode);
            }
            if (this.operationMode == OperationMode.FIFO) {
                sortScanDispatcher = new FIFOScanDispatcher(readerScanArr);
            } else if (this.operationMode == OperationMode.FirstAvailable) {
                sortScanDispatcher = new FirstAvailableScanDispatcher(readerScanArr);
            } else {
                sortScanDispatcher = new SortScanDispatcher(readerScanArr);
                Sorter sorter = new Sorter(readerScanArr, arrayBlockingQueue);
                sorter.setPriority(1);
                sorter.start();
            }
            sortScanDispatcher.start();
            synchronized (this.synchWriterInit) {
                while (true) {
                    if (this.writer != null && this.recordDefinitionOffsets != null) {
                        break;
                    } else {
                        this.synchWriterInit.wait();
                    }
                }
            }
            EventHandler eventHandler = new EventHandler(this.writer, concurrentLinkedQueue, this.readers.size(), 100);
            this.logger.info("After all readers have initialized, the intermediate buffer of " + this.uid + " contains " + arrayBlockingQueue.size() + " records");
            boolean z = false;
            while (true) {
                if (z) {
                    break;
                }
                if (this.count == 0) {
                    this.firststop = Calendar.getInstance().getTimeInMillis();
                    this.stats.timeToFirstInput(this.firststop - timeInMillis);
                }
                RecordBufferEntry recordBufferEntry = (RecordBufferEntry) arrayBlockingQueue.poll(500L, TimeUnit.MILLISECONDS);
                eventHandler.propagateEvents();
                if (recordBufferEntry != null) {
                    Record record = recordBufferEntry.record;
                    if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                        break;
                    }
                    if (this.writer.importRecord(record, this.recordDefinitionOffsets[recordBufferEntry.id] + (record.getDefinitionIndex() == -1 ? 0 : record.getDefinitionIndex()), this.timeout, this.timeUnit)) {
                        this.count++;
                        eventHandler.increaseProducedRecordCount();
                    } else if (this.writer.getStatus() == IBuffer.Status.Open) {
                        this.logger.warn("Writer of " + this.uid + " has timed out");
                    }
                } else {
                    int i2 = 0;
                    synchronized (this.readers) {
                        for (int i3 = 0; i3 < readerScanArr.length; i3++) {
                            if (!this.readers.get(i3).hasFinished()) {
                                i2++;
                            }
                        }
                    }
                    if (i2 == 0) {
                        z = true;
                    }
                }
            }
            this.logger.info("Consumer side of " + this.uid + " stopped consumption. Notifying all " + this.readers.size() + " readers to stop.");
            synchronized (this.readers) {
                for (int i4 = 0; i4 < this.readers.size(); i4++) {
                    this.readers.get(i4).setFinished(true);
                    try {
                        this.readers.get(i4).getReader().close();
                    } catch (GRS2ReaderException e) {
                        this.logger.warn("Could not close reader #" + i4);
                    }
                }
            }
            arrayBlockingQueue.clear();
            sortScanDispatcher.join();
            long timeInMillis2 = Calendar.getInstance().getTimeInMillis();
            eventHandler.sendPendingFinalEvents(this.count);
            try {
                this.writer.close();
            } catch (GRS2WriterException e2) {
                this.logger.warn("Could not close writer");
            }
            for (int i5 = 0; i5 < this.readers.size(); i5++) {
                try {
                    this.readers.get(i5).getReader().close();
                } catch (GRS2ReaderException e3) {
                    this.logger.warn("Could not close reader #" + i5);
                }
            }
            this.stats.timeToComplete(timeInMillis2 - timeInMillis);
            this.stats.timeToFirst(this.firststop - timeInMillis);
            this.stats.productionRate((this.count / ((float) (timeInMillis2 - timeInMillis))) * 1000.0f);
            this.stats.producedResults(this.count);
            this.logger.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - timeInMillis) + " milliseconds\nProduced last result in " + (timeInMillis2 - timeInMillis) + " milliseconds\nProduced " + this.count + " records\nProduction rate was " + ((this.count / ((float) (timeInMillis2 - timeInMillis))) * 1000.0f) + " records per second");
        } catch (Exception e4) {
            this.logger.error("Could not complete background merging for " + this.uid + ". Closing", (Throwable) e4);
            synchronized (this.readers) {
                for (int i6 = 0; i6 < this.readers.size(); i6++) {
                    this.readers.get(i6).setFinished(true);
                    try {
                        if (this.readers.get(i6).getReader().getStatus() != IBuffer.Status.Dispose) {
                            this.readers.get(i6).getReader().close();
                        }
                    } catch (Exception e5) {
                        this.logger.warn("Could not close reader #" + i6);
                    }
                }
                arrayBlockingQueue.clear();
                try {
                    if (this.writer.getStatus() != IBuffer.Status.Dispose) {
                        this.writer.close();
                    }
                } catch (Exception e6) {
                    this.logger.error("Could not close writer of " + this.uid, (Throwable) e6);
                }
            }
        }
    }
}
