/*
 * Decompiled with CFR 0.152.
 */
package gr.uoa.di.madgik.searchlibrary.operatorlibrary.duplicateeliminatoroperator;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy;
import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.IRecordReader;
import gr.uoa.di.madgik.grs.reader.RandomReader;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.writer.IRecordWriter;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.duplicateeliminatoroperator.ObjectRank;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.stats.StatsContainer;
import java.net.URI;
import java.util.Calendar;
import java.util.Hashtable;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistinctOp
extends Thread {
    private static Logger logger = LoggerFactory.getLogger((String)DistinctOp.class.getName());
    private IRecordReader<Record> reader = null;
    private IRecordWriter<Record> writer = null;
    private int currentResultCountEstimation = 0;
    private int previousResultCountEstimation = 0;
    private boolean finalEventReceived = false;
    private boolean postFinalEstimationUpdate = false;
    private int finalResultCountValue;
    private int previousRatioComputationCheckpoint = 0;
    private int previousEmissionCheckpoint = 0;
    private int emissionStep = 100;
    private Float eliminationRatio = null;
    String objectIdFieldName = null;
    String objectRankFieldName = null;
    private StatsContainer stats = null;
    private long startTime = 0L;
    private long firstInputStop = 0L;
    private long firstOutputStop = 0L;
    private int rc = 0;
    private int rcOut = 0;
    private int uniqueResults = 0;
    public static final long TimeoutDef = 180L;
    public static final TimeUnit TimeUnitDef = TimeUnit.SECONDS;
    public static final int BufferCapacityDef = 100;
    public static final boolean KeepMaximumRankDef = false;
    public static final int SafeNumberOfResultsDef = 100;
    public static final int EliminationRatioComputationStepDef = 100;
    private long timeout;
    private TimeUnit timeUnit;
    private int bufferCapacity;
    private boolean keepMaximumRank = false;
    private int safeNumberOfResults = 100;
    private int eliminationRatioComputationStep = 100;
    private String uid = null;

    public static synchronized URI dispatchNewWorker(URI loc, String objectIdFieldName, String objectRankFieldName, boolean keepMaximumRank, long timeout, TimeUnit timeUnit, int bufferCapacity, StatsContainer stats) throws Exception {
        try {
            String uid = UUID.randomUUID().toString();
            DistinctOp f = new DistinctOp();
            f.objectIdFieldName = objectIdFieldName;
            f.objectRankFieldName = objectRankFieldName;
            logger.trace(uid + ": Initializing reader with locator " + loc);
            f.reader = keepMaximumRank ? new RandomReader(loc, bufferCapacity) : new ForwardReader(loc, bufferCapacity);
            f.writer = new RecordWriter((IWriterProxy)new LocalWriterProxy(), f.reader);
            f.keepMaximumRank = keepMaximumRank;
            f.timeout = timeout;
            f.timeUnit = timeUnit;
            f.bufferCapacity = bufferCapacity;
            f.stats = stats;
            f.stats.timeToInitialize(Calendar.getInstance().getTimeInMillis());
            f.uid = uid;
            f.start();
            logger.trace(uid + ": Returning " + f.writer.getLocator());
            return f.writer.getLocator();
        }
        catch (Exception e) {
            logger.error("Error in method dispatchNewWorker:\n" + e.getMessage());
            throw new Exception(e);
        }
    }

    public static synchronized URI dispatchNewWorker(URI loc, String objectIdFieldName, String objectRankFieldName, boolean keepMaximumRank, long timeout, TimeUnit timeUnit, StatsContainer stats) throws Exception {
        return DistinctOp.dispatchNewWorker(loc, objectIdFieldName, objectRankFieldName, keepMaximumRank, timeout, timeUnit, 100, stats);
    }

    public static synchronized URI dispatchNewWorker(URI loc, String objectIdFieldName, StatsContainer stats) throws Exception {
        return DistinctOp.dispatchNewWorker(loc, objectIdFieldName, null, false, 180L, TimeUnitDef, stats);
    }

    public static synchronized URI dispatchNewWorker(URI loc, String objectIdFieldName, String objectRankFieldName, boolean keepMaximumRank, StatsContainer stats) throws Exception {
        return DistinctOp.dispatchNewWorker(loc, objectIdFieldName, objectRankFieldName, keepMaximumRank, 180L, TimeUnitDef, stats);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.startTime = Calendar.getInstance().getTimeInMillis();
            Hashtable<String, Double> distincts = this.getDistincts();
            if (this.keepMaximumRank) {
                this.reader.seek(-this.reader.currentRecord());
                this.rc = 0;
                while (this.reader.getStatus() != IBuffer.Status.Dispose && (this.reader.getStatus() != IBuffer.Status.Close || this.reader.availableRecords() != 0)) {
                    Record rec = this.reader.get(this.timeout, this.timeUnit);
                    if (rec == null) {
                        if (this.reader.getStatus() != IBuffer.Status.Open) break;
                        logger.warn("Producer of " + this.uid + " has timed out");
                        break;
                    }
                    ++this.rc;
                    Double storedRank = null;
                    ObjectRank or = this.extractObjectRank(rec);
                    if (or.objID != null) {
                        storedRank = distincts.get(or.objID);
                    }
                    if (storedRank != null && storedRank.doubleValue() != or.rank.doubleValue()) continue;
                    if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                        logger.info("Consumer side of " + this.uid + " stopped consumption. Stopping.");
                        break;
                    }
                    if (!this.writer.importRecord(rec, this.timeout, this.timeUnit)) {
                        if (this.writer.getStatus() != IBuffer.Status.Open) break;
                        logger.warn("Consumer of " + this.uid + " has timed out");
                        break;
                    }
                    ++this.rcOut;
                    if (this.rcOut != 1) continue;
                    this.firstOutputStop = Calendar.getInstance().getTimeInMillis();
                }
            }
            this.emitPendingFinalEvents(this.rcOut);
            long closeStop = Calendar.getInstance().getTimeInMillis();
            this.stats.timeToFirstInput(this.firstInputStop - this.startTime);
            this.stats.timeToFirst(this.firstOutputStop - this.startTime);
            this.stats.timeToComplete(closeStop - this.startTime);
            this.stats.producedResults(this.rc);
            this.stats.productionRate((float)this.rc / (float)(closeStop - this.startTime) * 1000.0f);
            logger.info("DUPLICATE ELIMINATION OPERATOR " + this.uid + ":" + "Produced first result in " + (this.firstOutputStop - this.startTime) + " milliseconds\n" + "Produced last result in " + (closeStop - this.startTime) + " milliseconds\n" + "Produced " + this.rcOut + " results\n" + "Read " + this.rc + " results (" + (this.rc - this.rcOut) + " duplicates)\n" + "Production rate was " + (float)this.rc / (float)(closeStop - this.startTime) * 1000.0f + " records per second");
        }
        catch (Exception e) {
            logger.error("Error in method run for " + this.uid, (Throwable)e);
        }
        finally {
            try {
                this.writer.close();
                this.reader.close();
            }
            catch (Exception exception) {}
        }
    }

    private DistinctOp() {
    }

    private Hashtable<String, Double> getDistincts() throws Exception {
        Hashtable<String, Double> distincts = new Hashtable<String, Double>();
        this.rc = 0;
        this.rcOut = 0;
        try {
            while (this.reader.getStatus() != IBuffer.Status.Dispose && (this.reader.getStatus() != IBuffer.Status.Close || this.reader.availableRecords() != 0)) {
                Record rec = this.reader.get(this.timeout, this.timeUnit);
                this.handleEvents();
                if (rec == null) {
                    if (this.reader.getStatus() != IBuffer.Status.Open) break;
                    logger.warn("Producer of " + this.uid + " has timed out");
                    break;
                }
                ++this.rc;
                if (this.rc == 1) {
                    this.firstInputStop = Calendar.getInstance().getTimeInMillis();
                }
                ObjectRank or = this.extractObjectRank(rec);
                Double storedValue = null;
                if (or.objID != null) {
                    storedValue = distincts.get(or.objID);
                } else {
                    or.objID = UUID.randomUUID().toString();
                }
                if (storedValue == null) {
                    ++this.uniqueResults;
                    distincts.put(or.objID, or.rank);
                    if (this.keepMaximumRank) continue;
                    if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                        logger.info("Consumer side of " + this.uid + " stopped consumption. Stopping.");
                        break;
                    }
                    if (!this.writer.importRecord(rec, this.timeout, this.timeUnit)) {
                        if (this.writer.getStatus() != IBuffer.Status.Open) break;
                        logger.warn("Consumer of " + this.uid + " has timed out");
                        break;
                    }
                    ++this.rcOut;
                    if (this.rc != 1) continue;
                    this.firstOutputStop = Calendar.getInstance().getTimeInMillis();
                    continue;
                }
                if (!this.keepMaximumRank || !(storedValue < or.rank)) continue;
                distincts.put(or.objID, or.rank);
            }
            return distincts;
        }
        catch (Exception e) {
            logger.error("Error in method getDistincts for " + this.uid + ":\n" + e);
            throw new Exception(e);
        }
    }

    private ObjectRank extractObjectRank(Record rec) throws Exception {
        ObjectRank or = new ObjectRank();
        if (this.objectIdFieldName != null) {
            or.objID = ((StringField)rec.getField(this.objectIdFieldName)).getPayload();
            if (this.objectRankFieldName != null) {
                or.rank = Double.valueOf(((StringField)rec.getField(this.objectRankFieldName)).getPayload());
            } else {
                if (this.keepMaximumRank) {
                    throw new Exception("Keep maximum rank is enabled, however a rank field could not be found");
                }
                or.rank = 1.0;
            }
        }
        return or;
    }

    private void handleEvents() throws Exception {
        boolean emitStoredFinal = false;
        if (this.rc > this.safeNumberOfResults && this.rc - this.previousRatioComputationCheckpoint > this.eliminationRatioComputationStep) {
            this.previousRatioComputationCheckpoint = this.rc;
            this.eliminationRatio = Float.valueOf((float)this.uniqueResults / (float)this.rc);
            if (this.finalEventReceived && !this.postFinalEstimationUpdate) {
                this.currentResultCountEstimation = (int)Math.floor(this.eliminationRatio.floatValue() * (float)this.finalResultCountValue);
                this.postFinalEstimationUpdate = true;
                emitStoredFinal = true;
            }
        }
        this.previousResultCountEstimation = this.currentResultCountEstimation;
        boolean received = false;
        BufferEvent ev = null;
        while ((ev = this.reader.receive()) != null) {
            if (!(ev instanceof KeyValueEvent)) {
                this.writer.emit(ev);
                continue;
            }
            if (((KeyValueEvent)ev).getKey().equals("resultsNumber")) {
                received = true;
                if (this.finalEventReceived || this.eliminationRatio == null) continue;
                this.currentResultCountEstimation = (int)Math.floor(this.eliminationRatio.floatValue() * (float)Integer.parseInt(((KeyValueEvent)ev).getValue()));
                continue;
            }
            if (((KeyValueEvent)ev).getKey().equals("resultsNumberFinal")) {
                received = true;
                this.finalEventReceived = true;
                this.finalResultCountValue = Integer.parseInt(((KeyValueEvent)ev).getValue());
                if (this.eliminationRatio == null) continue;
                this.currentResultCountEstimation = (int)Math.floor(this.eliminationRatio.floatValue() * (float)this.finalResultCountValue);
                continue;
            }
            this.writer.emit(ev);
        }
        if (received) {
            int val = Math.max(this.currentResultCountEstimation, this.rcOut);
            if (val != this.rcOut || emitStoredFinal) {
                if (this.currentResultCountEstimation != this.previousResultCountEstimation) {
                    this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumber", "" + val));
                }
            } else if (this.rcOut - this.previousEmissionCheckpoint >= this.emissionStep) {
                this.previousEmissionCheckpoint = this.rcOut;
                this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumber", "" + this.rcOut));
            }
        } else if (emitStoredFinal) {
            this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumber", "" + this.currentResultCountEstimation));
        } else if (this.rcOut > this.currentResultCountEstimation && this.rcOut - this.previousEmissionCheckpoint >= this.emissionStep) {
            this.previousEmissionCheckpoint = this.rcOut;
            this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumber", "" + this.rcOut));
        }
    }

    private void emitPendingFinalEvents(int count) throws Exception {
        this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumberFinal", "" + count));
    }
}

