package eu.dnetlib.msro.workflows.sarasvati.loader;

import com.googlecode.sarasvati.Graph;
import com.googlecode.sarasvati.GraphProcess;
import com.googlecode.sarasvati.mem.MemEngine;
import com.googlecode.sarasvati.mem.MemGraphProcess;
import eu.dnetlib.enabling.common.Stoppable;
import eu.dnetlib.enabling.common.StoppableDetails;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.msro.rmi.MSROException;
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
import java.io.File;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;

/* loaded from: input_file:WEB-INF/lib/dnet-msro-service-3.0.3.jar:eu/dnetlib/msro/workflows/sarasvati/loader/WorkflowExecutor.class */
public class WorkflowExecutor implements Stoppable {
    private MemEngine engine;
    private GraphLoader graphLoader;
    private GraphProcessRegistry graphProcessRegistry;
    private ProfileToSarasvatiConverter profileToSarasvatiConverter;
    private ScheduledExecutorService queueConsumers;

    @Resource
    private UniqueServiceLocator serviceLocator;
    private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
    private boolean paused = false;
    private PriorityBlockingQueue<GraphProcess> pendingProcs = new PriorityBlockingQueue<>(20, new Comparator<GraphProcess>() { // from class: eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor.1
        @Override // java.util.Comparator
        public int compare(GraphProcess graphProcess, GraphProcess graphProcess2) {
            return NumberUtils.compare(NumberUtils.toInt(graphProcess.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), 50), NumberUtils.toInt(graphProcess2.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), 50));
        }
    });

    public boolean isPaused() {
        return this.paused;
    }

    public void setPaused(boolean z) {
        this.paused = z;
    }

    public void init() {
        this.queueConsumers = Executors.newScheduledThreadPool(10);
        for (int i = 0; i < 10; i++) {
            this.queueConsumers.scheduleAtFixedRate(new Runnable() { // from class: eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor.2
                @Override // java.lang.Runnable
                public void run() {
                    if (WorkflowExecutor.this.isPaused()) {
                        return;
                    }
                    GraphProcess graphProcess = (GraphProcess) WorkflowExecutor.this.pendingProcs.poll();
                    if (graphProcess == null) {
                        WorkflowExecutor.log.debug("Process queue is empty");
                        return;
                    }
                    WorkflowExecutor.log.info("Starting workflow: " + graphProcess);
                    long now = DateUtils.now();
                    graphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_DATE, Long.valueOf(now));
                    graphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_HUMAN_DATE, DateUtils.calculate_ISO8601(now));
                    WorkflowExecutor.this.engine.startProcess(graphProcess);
                }
            }, i * 6, 60L, TimeUnit.SECONDS);
        }
    }

    public String startProcess(String str) throws Exception {
        return startProcess(str, null);
    }

    public String startProcess(String str, Map<String, Object> map) throws Exception {
        WfProfileDescriptor sarasvatiWorkflow = this.profileToSarasvatiConverter.getSarasvatiWorkflow(str);
        if (isPaused()) {
            log.warn("Wf " + str + " not launched, because WorkflowExecutor is preparing for shutdown");
            throw new MSROException("WorkflowExecutor is preparing for shutdown");
        }
        if (!sarasvatiWorkflow.isReady()) {
            log.warn("Wf " + str + " not launched, because it is not ready to start");
            throw new MSROException("Workflow " + str + " is not ready to start");
        }
        if (this.pendingProcs.size() > 100) {
            log.warn("Wf " + str + " not launched, Max number of pending procs reached: 100");
            throw new MSROException("Max number of pending procs reached: 100");
        }
        File createTempFile = File.createTempFile("wftfs", null);
        try {
            try {
                Graph loadGraph = this.graphLoader.loadGraph(sarasvatiWorkflow.getWorkflowXml());
                MemGraphProcess memGraphProcess = new MemGraphProcess(loadGraph);
                String registerProcess = this.graphProcessRegistry.registerProcess(memGraphProcess);
                this.graphProcessRegistry.associateProcessWithResource(memGraphProcess, str);
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROCESS_ID, registerProcess);
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, str);
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_NAME, sarasvatiWorkflow.getName());
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_NAME, loadGraph.getName());
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_FAMILY, sarasvatiWorkflow.getType());
                memGraphProcess.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY, Integer.valueOf(sarasvatiWorkflow.getPriority()));
                if (map != null) {
                    for (Map.Entry<String, Object> entry : map.entrySet()) {
                        memGraphProcess.getEnv().setAttribute(entry.getKey(), entry.getValue());
                    }
                }
                log.info("Process " + memGraphProcess + " in queue, priority=" + sarasvatiWorkflow.getPriority());
                this.pendingProcs.put(memGraphProcess);
                createTempFile.delete();
                return registerProcess;
            } catch (Exception e) {
                log.error("Error parsing workflow xml: " + sarasvatiWorkflow.getWorkflowXml(), e);
                throw new IllegalArgumentException("Error parsing workflow");
            }
        } catch (Throwable th) {
            createTempFile.delete();
            throw th;
        }
    }

    public void startMetaWorkflow(String str, boolean z) throws Exception {
        ISLookUpService iSLookUpService = (ISLookUpService) this.serviceLocator.getService(ISLookUpService.class);
        List<String> quickSearchProfile = iSLookUpService.quickSearchProfile("/*[.//RESOURCE_IDENTIFIER/@value='" + str + "']//CONFIGURATION[@status='EXECUTABLE']/WORKFLOW/@id/string()");
        if (quickSearchProfile == null || quickSearchProfile.isEmpty()) {
            throw new MSROException("Metaworkflow " + str + " not launched");
        }
        for (String str2 : quickSearchProfile) {
            String str3 = "/*[.//RESOURCE_IDENTIFIER/@value='" + str2 + "']//CONFIGURATION/@start/string()";
            if (z || iSLookUpService.getResourceProfileByQuery(str3).equals("auto")) {
                startProcess(str2);
            } else {
                log.warn("Worflow " + str2 + " can not be launched AUTOMATICALLY");
            }
        }
    }

    public GraphLoader getGraphLoader() {
        return this.graphLoader;
    }

    @Required
    public void setGraphLoader(GraphLoader graphLoader) {
        this.graphLoader = graphLoader;
    }

    public MemEngine getEngine() {
        return this.engine;
    }

    @Required
    public void setEngine(MemEngine memEngine) {
        this.engine = memEngine;
    }

    public GraphProcessRegistry getGraphProcessRegistry() {
        return this.graphProcessRegistry;
    }

    @Required
    public void setGraphProcessRegistry(GraphProcessRegistry graphProcessRegistry) {
        this.graphProcessRegistry = graphProcessRegistry;
    }

    public ProfileToSarasvatiConverter getProfileToSarasvatiConverter() {
        return this.profileToSarasvatiConverter;
    }

    @Required
    public void setProfileToSarasvatiConverter(ProfileToSarasvatiConverter profileToSarasvatiConverter) {
        this.profileToSarasvatiConverter = profileToSarasvatiConverter;
    }

    @Override // eu.dnetlib.enabling.common.Stoppable
    public void stop() {
        this.paused = true;
    }

    @Override // eu.dnetlib.enabling.common.Stoppable
    public void resume() {
        this.paused = false;
    }

    @Override // eu.dnetlib.enabling.common.Stoppable
    public StoppableDetails getStopDetails() {
        int countRunningWfs = this.graphProcessRegistry.countRunningWfs();
        StoppableDetails.StopStatus stopStatus = StoppableDetails.StopStatus.RUNNING;
        if (isPaused()) {
            stopStatus = countRunningWfs == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING;
        }
        this.graphProcessRegistry.listIdentifiers();
        return new StoppableDetails("D-NET workflow manager", "Running workflows: " + countRunningWfs, stopStatus);
    }
}
