package eu.dnetlib.validator.service.impls.listeners;

import eu.dnetlib.api.enabling.ResultSetService;
import eu.dnetlib.domain.EPR;
import eu.dnetlib.domain.functionality.validator.StoredJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.utils.EPRUtils;
import eu.dnetlib.validator.engine.ValidatorException;
import eu.dnetlib.validator.engine.execution.CompletedTask;
import eu.dnetlib.validator.engine.execution.JobListener;
import eu.dnetlib.validator.service.impl.ValidatorManager;
import gr.uoa.di.driver.util.ServiceLocator;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeConstants;
import org.springframework.core.task.TaskExecutor;

/* loaded from: input_file:WEB-INF/classes/eu/dnetlib/validator/service/impls/listeners/DnetListener.class */
public class DnetListener implements JobListener {
    private static Logger logger = Logger.getLogger(DnetListener.class);
    private RecordXMLBuilder xmlBuilder;
    private BlackboardJob job;
    private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
    private TaskExecutor rsExecutor;
    private ResultSetService resultSetService;
    private String outputResultSetID;
    private List<String> outputDiskBuffer;
    private EPR outputEpr;
    private int jobStatusUpdateInterval;
    private BlockingQueue<String> queue;
    private int validationJobId;
    private ValidatorManager validatorManager;
    private String groupBy;
    private final AtomicInteger activeThreads = new AtomicInteger(0);
    private Object allThreadsFinished = new Object();
    private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
    private int internalJobsFinished = 0;
    private int internalJobsSum = 1;
    private int objsValidated = 0;
    private boolean enableOutputToRS = false;
    private boolean enableOutputToDisk = false;
    private BufferedWriter bw = null;

    public void initOutputs() {
        logger.debug("initializing outputs");
        if (this.enableOutputToRS) {
            initEprOutput();
            this.job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(this.outputEpr));
            logger.debug("output epr id successfully set: " + EPRUtils.eprToXml(this.outputEpr));
        }
        if (!this.enableOutputToDisk) {
            logger.debug("initializing disk disabled");
        } else {
            initDiskOutput();
            logger.debug("disk output ok");
        }
    }

    @Override // eu.dnetlib.validator.engine.execution.JobListener
    public synchronized void currentResults(List<CompletedTask> list, int i, Object obj, Map<String, Object> map, Throwable th) throws ValidatorException {
        try {
            this.blackboardHandler.getBlackboardHandler().failed(this.job, th);
        } catch (Exception e) {
            logger.error("Error while proccessing record results");
            throw new ValidatorException("Error while proccessing record results", e);
        }
    }

    @Override // eu.dnetlib.validator.engine.execution.JobListener
    public synchronized void currentResults(List<CompletedTask> list, int i, Object obj, Map<String, Object> map) throws ValidatorException {
        try {
            if (this.enableOutputToRS) {
                sendToQueue(this.xmlBuilder.buildXml((List) map.get("veloList"), obj, (Map) map.get("recordValidationResult")));
            }
            if (this.enableOutputToDisk && ((Integer) map.get("recordBlacklistScore")).intValue() < 100) {
                this.bw.write(list.get(0).valobjId);
                this.bw.newLine();
            }
            this.objsValidated++;
            if (this.objsValidated % this.jobStatusUpdateInterval == 0) {
                this.job.getParameters().put("recordsTested", Integer.toString(this.objsValidated));
                this.blackboardHandler.getBlackboardHandler().ongoing(this.job);
            }
        } catch (Exception e) {
            logger.error("Error while proccessing record results");
            throw new ValidatorException("Error while proccessing record results", e);
        }
    }

    @Override // eu.dnetlib.validator.engine.execution.JobListener
    public synchronized void finished(int i, Map<String, Object> map) {
        try {
            logger.debug("Job " + i + " finished");
            this.internalJobsFinished++;
            if (this.internalJobsFinished == this.internalJobsSum) {
                logger.debug("internalJobsFinished == internalJobsSum");
                if (this.enableOutputToRS) {
                    try {
                        this.queue.put("finished");
                    } catch (InterruptedException e) {
                        logger.error("Error finalizing queue", e);
                    }
                    if (this.queue.size() > 0) {
                        long timeInMillis = Calendar.getInstance().getTimeInMillis();
                        this.activeThreads.getAndIncrement();
                        this.rsExecutor.execute(new RSTask(this.resultSetService, this.outputResultSetID, this.queue, this.activeThreads, this.allThreadsFinished));
                        logger.debug("Populating RS took " + (Calendar.getInstance().getTimeInMillis() - timeInMillis) + " milli seconds");
                    }
                    logger.debug("active threads to finish: " + this.activeThreads.get());
                    logger.debug("trying to close result set");
                    while (this.activeThreads.get() > 0) {
                        logger.debug("waiting active threads to finish. Remaining: " + this.activeThreads.get());
                        synchronized (this.allThreadsFinished) {
                            this.allThreadsFinished.wait();
                        }
                        logger.debug("retrying to finish. Remaining: " + this.activeThreads.get());
                    }
                    logger.debug("closing result set");
                    this.resultSetService.closeRS(this.outputResultSetID);
                }
                if (this.enableOutputToDisk) {
                    this.bw.write("FINISHED");
                    this.bw.close();
                }
                this.job.getParameters().put("jobId", Integer.toString(this.validationJobId));
                this.job.getParameters().put("recordsTested", Integer.toString(this.objsValidated));
                logger.info("Getting stored job");
                StoredJob storedJob = this.validatorManager.getStoredJob(i, this.groupBy);
                logger.info("Score: " + storedJob.getContentJobScore());
                this.job.getParameters().put("score", storedJob.getContentJobScore() + "");
                this.blackboardHandler.getBlackboardHandler().done(this.job);
            } else {
                logger.debug("Waiting " + (this.internalJobsSum - this.internalJobsFinished) + " job(s) to finish");
            }
        } catch (Exception e2) {
            logger.error("Error while finalizing successfull workflow job", e2);
        }
    }

    @Override // eu.dnetlib.validator.engine.execution.JobListener
    public synchronized void failed(int i, Map<String, Object> map, Throwable th) {
        try {
            this.internalJobsFinished++;
            if (this.internalJobsFinished == this.internalJobsSum) {
                if (this.enableOutputToRS) {
                    this.resultSetService.closeRS(this.outputResultSetID);
                }
                if (this.enableOutputToDisk) {
                    this.outputDiskBuffer.clear();
                    this.bw.close();
                    File file = new File("/tmp/validator-wf/" + this.validationJobId);
                    if (file.exists()) {
                        file.delete();
                    }
                }
                this.job.getParameters().put("jobId", Integer.toString(this.validationJobId));
                this.blackboardHandler.getBlackboardHandler().failed(this.job, th);
            } else {
                logger.debug("Waiting " + (this.internalJobsSum - this.internalJobsFinished) + " job(s) to finish");
            }
        } catch (Exception e) {
            logger.error("Error while finalizing failed workflow job", e);
        }
    }

    private void sendToQueue(String str) {
        logger.debug("received passed XMLresult");
        try {
            this.queue.put(str);
        } catch (InterruptedException e) {
            logger.error("Error putting in queue.", e);
        }
        if (this.queue.size() > 50) {
            long timeInMillis = Calendar.getInstance().getTimeInMillis();
            RSTask rSTask = new RSTask(this.resultSetService, this.outputResultSetID, this.queue, this.activeThreads, this.allThreadsFinished);
            this.activeThreads.getAndIncrement();
            this.rsExecutor.execute(rSTask);
            logger.debug("Populating RS took " + (Calendar.getInstance().getTimeInMillis() - timeInMillis) + " milli seconds");
        }
    }

    private void initEprOutput() {
        try {
            logger.debug("Initializing ResultSetService.");
            this.resultSetService = this.resultSetServiceLocator.getService();
            this.outputEpr = this.resultSetService.createPushRS(DateTimeConstants.SECONDS_PER_DAY, 0);
            this.outputResultSetID = this.outputEpr.getParameter("ResourceIdentifier");
            this.queue = new LinkedBlockingQueue();
        } catch (Exception e) {
            logger.error("Error initializing ResultSetService.", e);
            this.blackboardHandler.getBlackboardHandler().failed(this.job, e);
        }
    }

    private void initDiskOutput() {
        try {
            logger.debug("Initializing FileOutputStream.");
            String str = this.job.getParameters().get("datasourceId") != null ? this.job.getParameters().get("datasourceId") : this.job.getParameters().get("datasourceID") != null ? this.job.getParameters().get("datasourceID") : "unknownId";
            File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + str);
            logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + str);
            if (file.exists()) {
                logger.debug("File will be replaced");
                file.delete();
            }
            file.createNewFile();
            this.bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
            this.outputDiskBuffer = new ArrayList();
        } catch (IOException e) {
            logger.error("Error initializing FileOutputStream.", e);
            this.blackboardHandler.getBlackboardHandler().failed(this.job, e);
        }
    }

    public RecordXMLBuilder getXmlBuilder() {
        return this.xmlBuilder;
    }

    public void setXmlBuilder(RecordXMLBuilder recordXMLBuilder) {
        this.xmlBuilder = recordXMLBuilder;
    }

    public BlackboardJob getJob() {
        return this.job;
    }

    public void setJob(BlackboardJob blackboardJob) {
        this.job = blackboardJob;
    }

    public ServiceLocator<ResultSetService> getResultSetServiceLocator() {
        return this.resultSetServiceLocator;
    }

    public void setResultSetServiceLocator(ServiceLocator<ResultSetService> serviceLocator) {
        this.resultSetServiceLocator = serviceLocator;
    }

    public BlackboardNotificationHandler<BlackboardServerHandler> getBlackboardHandler() {
        return this.blackboardHandler;
    }

    public void setBlackboardHandler(BlackboardNotificationHandler<BlackboardServerHandler> blackboardNotificationHandler) {
        this.blackboardHandler = blackboardNotificationHandler;
    }

    public TaskExecutor getRsExecutor() {
        return this.rsExecutor;
    }

    public void setRsExecutor(TaskExecutor taskExecutor) {
        this.rsExecutor = taskExecutor;
    }

    public int getInternalJobsSum() {
        return this.internalJobsSum;
    }

    public void setInternalJobsSum(int i) {
        this.internalJobsSum = i;
    }

    public int getValidationJobId() {
        return this.validationJobId;
    }

    public void setValidationJobId(int i) {
        this.validationJobId = i;
    }

    public int getJobStatusUpdateInterval() {
        return this.jobStatusUpdateInterval;
    }

    public void setJobStatusUpdateInterval(int i) {
        this.jobStatusUpdateInterval = i;
    }

    public boolean isEnableOutputToDisk() {
        return this.enableOutputToDisk;
    }

    public void setEnableOutputToDisk(boolean z) {
        this.enableOutputToDisk = z;
    }

    public boolean isEnableOutputToRS() {
        return this.enableOutputToRS;
    }

    public void setEnableOutputToRS(boolean z) {
        this.enableOutputToRS = z;
    }

    public ValidatorManager getValidatorManager() {
        return this.validatorManager;
    }

    public void setValidatorManager(ValidatorManager validatorManager) {
        this.validatorManager = validatorManager;
    }

    public String getGroupBy() {
        return this.groupBy;
    }

    public void setGroupBy(String str) {
        this.groupBy = str;
    }
}
