package gr.uoa.di.madgik.workflow.adaptor.datatransformation.library.merge;

import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.gcube.datatransformation.datatransformationlibrary.dataelements.DTSExceptionWrapper;
import org.gcube.datatransformation.datatransformationlibrary.dataelements.DataElement;
import org.gcube.datatransformation.datatransformationlibrary.dataelements.impl.LocalFileDataElement;
import org.gcube.datatransformation.datatransformationlibrary.datahandlers.DataSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/workflowdtsadaptor-1.1.4-3.7.0.jar:gr/uoa/di/madgik/workflow/adaptor/datatransformation/library/merge/MergeWorker.class */
public class MergeWorker extends Thread {
    private Vector<ReaderHolder> readers;
    private DataSink sink;
    private Object synchDispatcher;
    private String uid;
    private LocatorReader inputRetriever;
    private Logger log = LoggerFactory.getLogger(MergeWorker.class.getName());
    private Object synchWriterInit = new Object();
    private AtomicInteger count = new AtomicInteger(0);
    private long firststop = 0;
    private SynchFinished synchFinished = new SynchFinished();
    private boolean finished = false;

    public MergeWorker(Vector<ReaderHolder> vector, DataSink dataSink, String str, LocatorReader locatorReader, Object obj) {
        this.readers = null;
        this.sink = null;
        this.uid = null;
        this.readers = vector;
        this.sink = dataSink;
        this.uid = str;
        this.inputRetriever = locatorReader;
        this.synchDispatcher = obj;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        InterruptedException interruptedException;
        long j;
        long currentTimeMillis = System.currentTimeMillis();
        setName("Merge Worker");
        BlockingQueue blockingQueue = null;
        try {
            try {
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                this.log.debug("Queue capacity for " + this.uid + " is 100");
                Vector vector = new Vector(this.readers.size());
                new FirstAvailableScanDispatcher(vector, this.readers, arrayBlockingQueue, null, this.uid, this.synchDispatcher, this.synchWriterInit, this.synchFinished).start();
                synchronized (this.synchWriterInit) {
                    while (vector.size() == 0 && !this.synchFinished.isFinished()) {
                        this.synchWriterInit.wait();
                    }
                }
                int i = 1;
                while (!this.finished) {
                    if (this.count.get() == 0) {
                        this.firststop = System.currentTimeMillis();
                        this.log.debug("First stop: " + (this.firststop - currentTimeMillis));
                    } else if (this.count.get() == 1) {
                        synchronized (this.count) {
                            this.count.notify();
                        }
                    }
                    if (i < 500) {
                        int i2 = i * 2;
                        i = i2;
                        j = i2;
                    } else {
                        j = 500;
                    }
                    DataElement dataElement = (DataElement) arrayBlockingQueue.poll(j, TimeUnit.MILLISECONDS);
                    if (dataElement == null) {
                        int i3 = 0;
                        for (int i4 = 0; i4 < vector.size(); i4++) {
                            if (!this.readers.get(i4).hasFinished()) {
                                i3++;
                            }
                        }
                        if (i3 == 0 && this.inputRetriever.hasFinished()) {
                            this.finished = true;
                        }
                    } else if (dataElement instanceof DTSExceptionWrapper) {
                        this.log.error("received execption", ((DTSExceptionWrapper) dataElement).getThrowable());
                        this.sink.append(dataElement);
                    } else {
                        this.sink.append(dataElement);
                        this.count.incrementAndGet();
                        dataElement.destroy();
                        if (dataElement instanceof LocalFileDataElement) {
                            ((LocalFileDataElement) dataElement).getFileContent().delete();
                        }
                    }
                    if (this.sink.isClosed()) {
                        this.finished = true;
                    }
                }
                this.synchFinished.setFinished(true);
                synchronized (this.readers) {
                    for (int i5 = 0; i5 < this.readers.size(); i5++) {
                        this.readers.get(i5).setFinished(true);
                    }
                }
                for (int i6 = 0; i6 < vector.size(); i6++) {
                    do {
                        try {
                            ((ReaderScan) vector.get(i6)).join();
                            interruptedException = null;
                        } catch (InterruptedException e) {
                            interruptedException = e;
                            this.log.error("received exception", (Throwable) interruptedException);
                            this.sink.append(new DTSExceptionWrapper(interruptedException));
                        }
                    } while (interruptedException != null);
                }
                synchronized (this.count) {
                    this.count.notify();
                }
                arrayBlockingQueue.clear();
                this.sink.close();
                for (int i7 = 0; i7 < this.readers.size(); i7++) {
                    if (this.readers.get(i7).getReader() != null && !this.readers.get(i7).getReader().isClosed()) {
                        this.log.info("Closing GRS2DataSource #" + i7);
                        this.readers.get(i7).getReader().close();
                    }
                }
                synchronized (this.synchDispatcher) {
                    this.synchDispatcher.notify();
                }
                synchronized (this.synchWriterInit) {
                    this.synchWriterInit.notify();
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis2 - currentTimeMillis) + " milliseconds\nProduced " + this.count.get() + " data elements\nProduction rate was " + ((this.count.get() / ((float) (currentTimeMillis2 - currentTimeMillis))) * 1000.0f) + " records per second");
            } catch (Exception e2) {
                this.log.error("Could not complete background merging for " + this.uid + ". Closing", (Throwable) e2);
                this.sink.append(new DTSExceptionWrapper(e2));
                synchronized (this.count) {
                    this.count.notify();
                    blockingQueue.clear();
                    this.sink.close();
                    for (int i8 = 0; i8 < this.readers.size(); i8++) {
                        if (this.readers.get(i8).getReader() != null && !this.readers.get(i8).getReader().isClosed()) {
                            this.log.info("Closing GRS2DataSource #" + i8);
                            this.readers.get(i8).getReader().close();
                        }
                    }
                    synchronized (this.synchDispatcher) {
                        this.synchDispatcher.notify();
                        synchronized (this.synchWriterInit) {
                            this.synchWriterInit.notify();
                            long currentTimeMillis3 = System.currentTimeMillis();
                            this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis3 - currentTimeMillis) + " milliseconds\nProduced " + this.count.get() + " data elements\nProduction rate was " + ((this.count.get() / ((float) (currentTimeMillis3 - currentTimeMillis))) * 1000.0f) + " records per second");
                        }
                    }
                }
            }
        } catch (Throwable th) {
            synchronized (this.count) {
                this.count.notify();
                blockingQueue.clear();
                this.sink.close();
                for (int i9 = 0; i9 < this.readers.size(); i9++) {
                    if (this.readers.get(i9).getReader() != null && !this.readers.get(i9).getReader().isClosed()) {
                        this.log.info("Closing GRS2DataSource #" + i9);
                        this.readers.get(i9).getReader().close();
                    }
                }
                synchronized (this.synchDispatcher) {
                    this.synchDispatcher.notify();
                    synchronized (this.synchWriterInit) {
                        this.synchWriterInit.notify();
                        long currentTimeMillis4 = System.currentTimeMillis();
                        this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis4 - currentTimeMillis) + " milliseconds\nProduced " + this.count.get() + " data elements\nProduction rate was " + ((this.count.get() / ((float) (currentTimeMillis4 - currentTimeMillis))) * 1000.0f) + " records per second");
                        throw th;
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicInteger getCounter() {
        return this.count;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasFinished() {
        return this.finished;
    }
}
