package gr.uoa.di.madgik.workflow.adaptor.search;

import gr.uoa.di.madgik.commons.channel.proxy.tcp.TCPServerNozzleConfig;
import gr.uoa.di.madgik.commons.infra.HostingNode;
import gr.uoa.di.madgik.commons.infra.nodeassignmentpolicy.LocalOnlyPolicy;
import gr.uoa.di.madgik.commons.infra.nodeassignmentpolicy.NodeAssignmentPolicy;
import gr.uoa.di.madgik.commons.infra.nodeassignmentpolicy.SingleRemoteNodePolicy;
import gr.uoa.di.madgik.commons.infra.nodeselection.NodeSelector;
import gr.uoa.di.madgik.commons.infra.nodeselection.random.RandomNodeSelector;
import gr.uoa.di.madgik.commons.infra.nodeselection.ru.LRUNodeSelector;
import gr.uoa.di.madgik.environment.hint.EnvHintCollection;
import gr.uoa.di.madgik.execution.datatype.NamedDataType;
import gr.uoa.di.madgik.execution.engine.ExecutionEngine;
import gr.uoa.di.madgik.execution.engine.ExecutionEngineConfig;
import gr.uoa.di.madgik.execution.engine.ExecutionHandle;
import gr.uoa.di.madgik.execution.exception.ExecutionException;
import gr.uoa.di.madgik.execution.exception.ExecutionRunTimeException;
import gr.uoa.di.madgik.execution.exception.ExecutionSerializationException;
import gr.uoa.di.madgik.execution.exception.ExecutionValidationException;
import gr.uoa.di.madgik.execution.plan.ExecutionPlan;
import gr.uoa.di.madgik.execution.plan.element.BoundaryPlanElement;
import gr.uoa.di.madgik.execution.plan.element.FlowPlanElement;
import gr.uoa.di.madgik.execution.plan.element.IPlanElement;
import gr.uoa.di.madgik.execution.plan.element.SequencePlanElement;
import gr.uoa.di.madgik.execution.plan.element.invocable.BoundaryConfig;
import gr.uoa.di.madgik.execution.plan.element.variable.VariableCollection;
import gr.uoa.di.madgik.execution.utils.BoundaryIsolationInfo;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import gr.uoa.di.madgik.rr.ResourceRegistry;
import gr.uoa.di.madgik.rr.ResourceRegistryException;
import gr.uoa.di.madgik.rr.element.execution.ExecutionServer;
import gr.uoa.di.madgik.rr.element.execution.RRExecutionServer2HnAdapter;
import gr.uoa.di.madgik.rr.element.query.QueryHelper;
import gr.uoa.di.madgik.rr.element.search.index.DataSourceService;
import gr.uoa.di.madgik.rr.element.search.index.FTIndexService;
import gr.uoa.di.madgik.rr.element.search.index.OpenSearchDataSourceService;
import gr.uoa.di.madgik.rr.element.search.index.SruConsumerService;
import gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor;
import gr.uoa.di.madgik.workflow.adaptor.search.analyzer.SearchPlanAnalyzer;
import gr.uoa.di.madgik.workflow.adaptor.search.nodeassignment.DataSourceNodeAssignmentNode;
import gr.uoa.di.madgik.workflow.adaptor.search.nodeassignment.NodeAssignmentNode;
import gr.uoa.di.madgik.workflow.adaptor.search.nodeassignment.NodeAssignmentTree;
import gr.uoa.di.madgik.workflow.adaptor.search.nodeassignment.OperatorNodeAssignmentNode;
import gr.uoa.di.madgik.workflow.adaptor.search.rewriter.SearchPlanRewriter;
import gr.uoa.di.madgik.workflow.adaptor.search.searchsystemplan.DataSourceNode;
import gr.uoa.di.madgik.workflow.adaptor.search.searchsystemplan.OperatorNode;
import gr.uoa.di.madgik.workflow.adaptor.search.searchsystemplan.PlanNode;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.NodeExecutionInfo;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.OutputVariableNode;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.WrapperNode;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.elementconstructors.processing.ExceptElementConstructor;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.elementconstructors.processing.JoinElementConstructor;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.elementconstructors.processing.MergeElementConstructor;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.elementconstructors.processing.ProcessingElementConstructor;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.wrappers.FunctionalityWrapper;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.wrappers.datasource.DataSourceWrapper;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.wrappers.datasource.DataSourceWrapperFactory;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.wrappers.datasource.DataSourceWrapperFactoryConfig;
import gr.uoa.di.madgik.workflow.adaptor.search.utils.wrappers.processing.ProcessingWrapper;
import gr.uoa.di.madgik.workflow.adaptor.utils.IAdaptorResources;
import gr.uoa.di.madgik.workflow.adaptor.utils.IOutputResource;
import gr.uoa.di.madgik.workflow.directory.ExecutionDirectory;
import gr.uoa.di.madgik.workflow.directory.ExecutionObserver;
import gr.uoa.di.madgik.workflow.exception.WorkflowEnvironmentException;
import gr.uoa.di.madgik.workflow.exception.WorkflowInternalErrorException;
import gr.uoa.di.madgik.workflow.exception.WorkflowSerializationException;
import gr.uoa.di.madgik.workflow.exception.WorkflowValidationException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.derby.iapi.services.classfile.VMDescriptor;
import org.apache.lucene.util.packed.PackedInts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/workflowsearchadaptor-1.9.0-3.10.1.jar:gr/uoa/di/madgik/workflow/adaptor/search/WorkflowSearchAdaptor.class */
public class WorkflowSearchAdaptor implements IWorkflowAdaptor, Serializable {
    private static final long serialVersionUID = 1;
    private EnvHintCollection hints;
    private ExecutionPlan Plan;
    private transient DataSourceWrapperFactory datasourceWrapperFactory;
    public static final String DatasourceNodeSelectorHintName = "DatasourceNodeSelector";
    public static final String DataSourceNodeSelectorTieBreakerHintName = "DataSourceNodeSelectorTieBreaker";
    public static final String OperatorNodeSelectorHintName = "OperatorNodeSelector";
    public static final String OperatorNodeSelectorTieBreakerHintName = "OperatorNodeSelectorTieBreaker";
    public static final String OperatorNodeSelectorThresholdHintName = "OperatorNodeSelectorThreshold";
    public static final String MaxCollocationCostHintName = "MaxCollocationCost";
    public static final String ExcludeLocalHintName = "ExcludeLocal";
    public static final String NodeAssignmentPolicyHintName = "NodeAssignmentPolicy";
    public static final String ComplexPlanLevelsHintName = "ComplexPlanLevels";
    public static final String ComplexPlanNumNodesHintName = "ComplexPlanNumNodes";
    private static final boolean ExcludeLocalDef = true;
    private static Logger logger = LoggerFactory.getLogger(WorkflowSearchAdaptor.class);
    private static final Float OperatorNodeSelectorThresholdDef = Float.valueOf(PackedInts.COMPACT);
    private static final Float MaxCollocationCostDef = Float.valueOf(30.0f);
    private static final Integer ComplexPlanLevelsDef = 3;
    private static final Integer ComplexPlanNumNodesDef = 20;
    private static final String OperatorNodeSelectorDef = RandomNodeSelector.class.getName();
    private static final String NodeAssignmentPolicyDef = SingleRemoteNodePolicy.class.getName();
    private static final String DataSourceNodeSelectorDef = LRUNodeSelector.class.getName();
    private PlanNode searchPlan = null;
    private String executionId = null;
    private WrapperNode wrapperTree = null;
    private OutputVariableNode ioVariableNodes = null;
    private transient ExecutionHandle Handle = null;
    private transient NodeAssignmentPolicy nodeAssignmentPolicy = new SingleRemoteNodePolicy(new RandomNodeSelector());
    private transient NodeSelector datasourceNodeSelector = new LRUNodeSelector();
    private float operatorNodeSelectorThreshold = OperatorNodeSelectorThresholdDef.floatValue();
    private float maxCollocationCost = MaxCollocationCostDef.floatValue();
    private int complexPlanLevels = ComplexPlanLevelsDef.intValue();
    private int complexPlanNumNodes = ComplexPlanNumNodesDef.intValue();
    private boolean excludeLocal = true;

    public WorkflowSearchAdaptor(EnvHintCollection envHintCollection) throws Exception {
        this.hints = null;
        this.Plan = null;
        this.datasourceWrapperFactory = null;
        this.Plan = new ExecutionPlan();
        this.Plan.EnvHints.AddHint(envHintCollection.GetHint("GCubeActionScope"));
        this.hints = envHintCollection;
        ParseHints();
        this.datasourceWrapperFactory = new DataSourceWrapperFactory();
        ExecutionEngine.Init(new ExecutionEngineConfig(0));
        ResourceRegistry.startBridging();
    }

    public WorkflowSearchAdaptor(DataSourceWrapperFactoryConfig dataSourceWrapperFactoryConfig, EnvHintCollection envHintCollection) throws Exception {
        this.hints = null;
        this.Plan = null;
        this.datasourceWrapperFactory = null;
        this.Plan = new ExecutionPlan();
        this.Plan.EnvHints.AddHint(envHintCollection.GetHint("GCubeActionScope"));
        this.hints = envHintCollection;
        ParseHints();
        this.datasourceWrapperFactory = new DataSourceWrapperFactory(dataSourceWrapperFactoryConfig);
        ExecutionEngine.Init(new ExecutionEngineConfig(0));
        ResourceRegistry.startBridging();
    }

    private void ParseHints() throws Exception {
        boolean z = false;
        if (this.hints.HintExists(OperatorNodeSelectorThresholdHintName)) {
            this.operatorNodeSelectorThreshold = Float.parseFloat(this.hints.GetHint(OperatorNodeSelectorThresholdHintName).Hint.Payload);
            logger.info("Using operator node selector threshold: " + this.operatorNodeSelectorThreshold);
            z = true;
        } else {
            logger.info("Using default operator node selector threshold: " + this.operatorNodeSelectorThreshold);
        }
        if (this.hints.HintExists(ComplexPlanLevelsHintName)) {
            this.complexPlanLevels = Integer.parseInt(this.hints.GetHint(ComplexPlanLevelsHintName).Hint.Payload);
            logger.info("Using complex plan levels: " + this.complexPlanLevels);
        } else {
            logger.info("Using default complex plan levels: " + this.complexPlanLevels);
        }
        if (this.hints.HintExists(ComplexPlanNumNodesHintName)) {
            this.complexPlanNumNodes = Integer.parseInt(this.hints.GetHint(ComplexPlanNumNodesHintName).Hint.Payload);
            logger.info("Using complex plan node number: " + this.complexPlanNumNodes);
        } else {
            logger.info("Using default complex plan node number: " + this.complexPlanNumNodes);
        }
        if (this.hints.HintExists(MaxCollocationCostHintName)) {
            this.maxCollocationCost = Float.parseFloat(this.hints.GetHint(MaxCollocationCostHintName).Hint.Payload);
            logger.info("Using maximum collocation cost: " + this.maxCollocationCost);
        } else {
            logger.info("Using default maximum collocation cost: " + this.maxCollocationCost);
        }
        NodeSelector nodeSelector = null;
        boolean z2 = false;
        NodeSelector nodeSelector2 = null;
        if (this.hints.HintExists(OperatorNodeSelectorTieBreakerHintName)) {
            String str = this.hints.GetHint(OperatorNodeSelectorTieBreakerHintName).Hint.Payload;
            nodeSelector2 = (NodeSelector) Class.forName(str).newInstance();
            logger.info("Using operator node selector tie breaker: " + str);
        }
        if (this.hints.HintExists(OperatorNodeSelectorHintName)) {
            String str2 = this.hints.GetHint(OperatorNodeSelectorHintName).Hint.Payload;
            if (!str2.equals(OperatorNodeSelectorDef)) {
                z2 = true;
            }
            nodeSelector = nodeSelector2 != null ? (NodeSelector) Class.forName(str2).getConstructor(NodeSelector.class).newInstance(nodeSelector2) : (NodeSelector) Class.forName(str2).newInstance();
            logger.info("Using operator node selector: " + str2);
        }
        if (this.hints.HintExists(NodeAssignmentPolicyHintName)) {
            Constructor<?> constructor = !z ? Class.forName(this.hints.GetHint(NodeAssignmentPolicyHintName).Hint.Payload).getConstructor(NodeSelector.class) : Class.forName(this.hints.GetHint(NodeAssignmentPolicyHintName).Hint.Payload).getConstructor(NodeSelector.class, Float.class);
            if (!z2 && nodeSelector2 == null) {
                nodeSelector = (NodeSelector) Class.forName(OperatorNodeSelectorDef).newInstance();
            }
            if (z) {
                this.nodeAssignmentPolicy = (NodeAssignmentPolicy) constructor.newInstance(nodeSelector, Float.valueOf(this.operatorNodeSelectorThreshold));
            } else {
                this.nodeAssignmentPolicy = (NodeAssignmentPolicy) constructor.newInstance(nodeSelector);
            }
            logger.info("Using node assignment policy: " + this.hints.GetHint(NodeAssignmentPolicyHintName).Hint.Payload);
        } else if (z2) {
            if (z) {
                this.nodeAssignmentPolicy = (NodeAssignmentPolicy) Class.forName(NodeAssignmentPolicyDef).getConstructor(NodeSelector.class, Float.class).newInstance(nodeSelector, Float.valueOf(this.operatorNodeSelectorThreshold));
            } else {
                this.nodeAssignmentPolicy = (NodeAssignmentPolicy) Class.forName(NodeAssignmentPolicyDef).getConstructor(NodeSelector.class).newInstance(nodeSelector);
            }
        }
        NodeSelector nodeSelector3 = null;
        if (this.hints.HintExists(DataSourceNodeSelectorTieBreakerHintName)) {
            String str3 = this.hints.GetHint(DataSourceNodeSelectorTieBreakerHintName).Hint.Payload;
            nodeSelector3 = (NodeSelector) Class.forName(str3).newInstance();
            logger.info("Using data source node selector tie breaker: " + str3);
        }
        if (this.hints.HintExists(DatasourceNodeSelectorHintName)) {
            String str4 = this.hints.GetHint(DatasourceNodeSelectorHintName).Hint.Payload;
            if (!str4.equals(DataSourceNodeSelectorDef) || nodeSelector3 != null) {
                if (nodeSelector3 != null) {
                    this.datasourceNodeSelector = (NodeSelector) Class.forName(str4).getConstructor(NodeSelector.class).newInstance(nodeSelector3);
                } else {
                    this.datasourceNodeSelector = (NodeSelector) Class.forName(str4).newInstance();
                }
                logger.info("Using data source node selector: " + str4);
            }
        }
        if (this.hints.HintExists(ExcludeLocalHintName)) {
            this.excludeLocal = Boolean.parseBoolean(this.hints.GetHint(ExcludeLocalHintName).Hint.Payload);
        }
        logger.info((this.excludeLocal ? "E" : "Not e") + "xcluding local node");
    }

    public void SetInputPlan(PlanNode planNode) {
        this.searchPlan = planNode;
    }

    @Override // gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor
    public void SetExecutionId(String str) {
        this.executionId = str;
    }

    @Override // gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor
    public void CreatePlan() throws WorkflowValidationException, WorkflowSerializationException, WorkflowInternalErrorException, WorkflowEnvironmentException {
        try {
            List<ExecutionServer> all = ExecutionServer.getAll(true);
            HashMap hashMap = new HashMap();
            RRExecutionServer2HnAdapter rRExecutionServer2HnAdapter = new RRExecutionServer2HnAdapter();
            List<ExecutionServer> all2 = ExecutionServer.getAll(true);
            logger.info("Calling adaptall on CreatePlan on " + all2.size() + " execution nodes");
            List<HostingNode> adaptAll = rRExecutionServer2HnAdapter.adaptAll(all2);
            logger.debug("found execution nodes to execute the complex plan");
            logger.debug("---------------------------------------------------");
            Iterator<HostingNode> it = adaptAll.iterator();
            while (it.hasNext()) {
                logger.debug(it.next().toXML());
            }
            logger.debug("---------------------------------------------------");
            if (adaptAll.size() == 0) {
                throw new WorkflowInternalErrorException("No execution nodes were found in the infrastructure");
            }
            boolean z = false;
            int i = 0;
            Iterator<HostingNode> it2 = adaptAll.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                if (it2.next().isLocal()) {
                    z = true;
                    break;
                }
                i++;
            }
            SearchPlanAnalyzer searchPlanAnalyzer = new SearchPlanAnalyzer(this.complexPlanLevels, this.complexPlanNumNodes);
            boolean isComplex = searchPlanAnalyzer.isComplex(this.searchPlan);
            if (z && (this.excludeLocal || isComplex)) {
                logger.info("excludeLocal is " + this.excludeLocal + ", isComplex is " + isComplex + ". Removing local from candidates");
                adaptAll.remove(i);
            }
            for (ExecutionServer executionServer : all) {
                hashMap.put(executionServer.getHostingNode().getID(), executionServer);
            }
            if (this.searchPlan == null) {
                throw new WorkflowValidationException("No search plan specified");
            }
            float f = this.maxCollocationCost;
            float calculateOperatorCost = (searchPlanAnalyzer.calculateOperatorCost(this.searchPlan) / this.maxCollocationCost) / adaptAll.size();
            if (calculateOperatorCost > 1.0f) {
                f = this.maxCollocationCost * calculateOperatorCost;
            }
            this.searchPlan = new SearchPlanRewriter(f).rewrite(this.searchPlan);
            NodeAssignmentPolicy nodeAssignmentPolicy = this.nodeAssignmentPolicy;
            if (isComplex) {
                logger.info("Complex plan, policy: " + this.nodeAssignmentPolicy.getClass().getName() + (this.excludeLocal ? ", local node excluded" : ""));
            } else if (!z || this.excludeLocal) {
                nodeAssignmentPolicy = new SingleRemoteNodePolicy(new RandomNodeSelector());
                nodeAssignmentPolicy.reset();
                if (this.excludeLocal) {
                    logger.info("Non-complex plan, local node excluded. Executing in single remote node");
                } else if (!z) {
                    logger.info("Non-complex plan but could not determine which node is local (or local node is not registered as execution node). Executing in single remote node");
                }
            } else {
                nodeAssignmentPolicy = new LocalOnlyPolicy();
                logger.info("Non-complex plan, executing locally");
            }
            NodeAssignmentTree nodeAssignmentTree = new NodeAssignmentTree(adaptAll, nodeAssignmentPolicy, this.datasourceNodeSelector, this.maxCollocationCost);
            NodeAssignmentNode build = nodeAssignmentTree.build(this.searchPlan);
            NodeExecutionInfo ConstructWorkflow = ConstructWorkflow(build, hashMap, nodeAssignmentTree.getUtilizationFactors());
            if (ConstructWorkflow.wrapperNode.wrapper instanceof ProcessingWrapper) {
                ((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).elevate();
                ((SequencePlanElement) ConstructWorkflow.element).ElementCollection.add(((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).getElevationElement());
                this.Plan.Variables.Add(((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).getElevationVariable());
            }
            if (build instanceof OperatorNodeAssignmentNode) {
                OperatorNodeAssignmentNode operatorNodeAssignmentNode = (OperatorNodeAssignmentNode) build;
                if (!operatorNodeAssignmentNode.element.assignedNode.isLocal()) {
                    logger.info("Boundary cross from local to " + operatorNodeAssignmentNode.element.assignedNode.getId() + VMDescriptor.METHOD + operatorNodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty) + VMDescriptor.ENDMETHOD);
                    BoundaryPlanElement boundaryPlanElement = new BoundaryPlanElement();
                    BoundaryConfig boundaryConfig = new BoundaryConfig();
                    boundaryConfig.HostName = operatorNodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty);
                    boundaryConfig.Port = Integer.parseInt(hashMap.get(operatorNodeAssignmentNode.element.assignedNode.getId()).getPort());
                    boundaryConfig.NozzleConfig = new TCPServerNozzleConfig(false, 0);
                    boundaryPlanElement.SetName(boundaryConfig.HostName);
                    boundaryPlanElement.Config = boundaryConfig;
                    boundaryPlanElement.Isolation = new BoundaryIsolationInfo();
                    boundaryPlanElement.Isolation.CleanUp = true;
                    boundaryPlanElement.Isolation.Isolate = false;
                    boundaryPlanElement.Root = ConstructWorkflow.element;
                    ConstructWorkflow.element = boundaryPlanElement;
                }
            }
            this.Plan.Root = ConstructWorkflow.element;
            this.wrapperTree = ConstructWorkflow.wrapperNode;
            logger.trace("****************************************");
            try {
                logger.trace(this.Plan.Serialize());
            } catch (ExecutionSerializationException e) {
                logger.trace("error while serializing plan.", (Throwable) e);
            }
            logger.trace("****************************************");
        } catch (Exception e2) {
            throw new WorkflowInternalErrorException("Could not construct workflow", e2);
        }
    }

    public void setWrapperTree(WrapperNode wrapperNode) {
        this.wrapperTree = wrapperNode;
    }

    public WrapperNode getWrapperTree() {
        return this.wrapperTree;
    }

    public static String serializePlan(Serializable serializable) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        objectOutputStream.writeObject(serializable);
        objectOutputStream.flush();
        String encodeBase64String = Base64.encodeBase64String(byteArrayOutputStream.toByteArray());
        try {
            byteArrayOutputStream.close();
        } catch (Exception e) {
        }
        try {
            objectOutputStream.close();
        } catch (Exception e2) {
        }
        return encodeBase64String;
    }

    public static Object deserializePlan(String str) throws IOException, ClassNotFoundException {
        ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(Base64.decodeBase64(str)));
        Object readObject = objectInputStream.readObject();
        try {
            objectInputStream.close();
        } catch (Exception e) {
        }
        return readObject;
    }

    public VariableCollection getVariableCollection() {
        return this.Plan.Variables;
    }

    @Override // gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor
    public ExecutionPlan GetCreatedPlan() {
        return this.Plan;
    }

    public void setCreatedPlan(ExecutionPlan executionPlan) {
        this.Plan = executionPlan;
    }

    public String GetExecutionID() {
        return this.executionId;
    }

    public OutputVariableNode getOutputVariables() {
        return this.ioVariableNodes != null ? this.ioVariableNodes : ConstructOutputVariableTree(this.wrapperTree);
    }

    public String ExecutePlan() throws WorkflowValidationException, ExecutionException {
        String str;
        logger.trace("******************");
        logger.trace(this.Plan.Serialize());
        logger.trace("******************");
        if (this.Plan.Root == null) {
            throw new WorkflowValidationException("No execution plan has been created");
        }
        logger.trace("Submitting plan...");
        this.Handle = ExecutionEngine.Submit(this.Plan);
        logger.trace("Submitting plan...OK");
        Object obj = new Object();
        this.executionId = ExecutionDirectory.ReserveKey();
        logger.trace("Registering observer...");
        ExecutionObserver executionObserver = new ExecutionObserver(this.executionId, ExecutionDirectory.DirectoryEntryType.Generic, -1L, this.Handle, this, false, obj);
        ExecutionDirectory.Register(executionObserver);
        this.Handle.RegisterObserver(executionObserver);
        logger.trace("Registering observer...");
        logger.trace("Executing plan " + this.executionId);
        ExecutionEngine.Execute(this.Handle);
        logger.trace("Before synch");
        synchronized (obj) {
            while (!executionObserver.IsCompleted()) {
                try {
                    obj.wait();
                } catch (Exception e) {
                }
            }
        }
        logger.trace("After synch");
        logger.trace("Executing plan " + this.executionId + " OK");
        if (!this.Handle.IsCompleted()) {
            logger.warn("Not completed! Why am I here?");
        } else if (this.Handle.IsCompletedWithSuccess()) {
            logger.info("Plan successfully completed");
        } else if (this.Handle.IsCompletedWithError()) {
            str = "Plan unsuccessfully completed with error";
            logger.info(this.Handle.GetCompletionError() instanceof ExecutionRunTimeException ? str + " of cause " + ((ExecutionRunTimeException) this.Handle.GetCompletionError()).GetCauseFullName() : "Plan unsuccessfully completed with error", (Throwable) this.Handle.GetCompletionError());
        } else {
            logger.warn("Completed but neither with success or failure!");
        }
        logger.info("Returning grs2 locator: " + this.wrapperTree.wrapper.getOutputVariable().Value.GetStringValue());
        return this.wrapperTree.wrapper.getOutputVariable().Value.GetStringValue();
    }

    public ExecutionException GetCompletionError() throws WorkflowValidationException {
        if (this.Plan == null) {
            throw new WorkflowValidationException("No execution plan has been created");
        }
        return this.Handle.GetCompletionError();
    }

    private NodeExecutionInfo ConstructWorkflow(NodeAssignmentNode nodeAssignmentNode, Map<String, ExecutionServer> map, Map<String, Float> map2) throws Exception {
        if (!(nodeAssignmentNode instanceof OperatorNodeAssignmentNode)) {
            if (!(nodeAssignmentNode instanceof DataSourceNodeAssignmentNode)) {
                throw new Exception("Unrecognized search plan node type");
            }
            logger.info("In WorkflowSearchAdaptor planNode : " + ((DataSourceNodeAssignmentNode) nodeAssignmentNode).toXML());
            return ConstructDataSourcePlanElement((DataSourceNodeAssignmentNode) nodeAssignmentNode);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (NodeAssignmentNode nodeAssignmentNode2 : ((OperatorNodeAssignmentNode) nodeAssignmentNode).getChildren()) {
            NodeExecutionInfo ConstructWorkflow = ConstructWorkflow(nodeAssignmentNode2, map, map2);
            if (nodeAssignmentNode2 instanceof OperatorNodeAssignmentNode) {
                OperatorNodeAssignmentNode operatorNodeAssignmentNode = (OperatorNodeAssignmentNode) nodeAssignmentNode2;
                if (!nodeAssignmentNode.element.assignedNode.getId().equals(operatorNodeAssignmentNode.element.assignedNode.getId())) {
                    logger.info("Boundary cross from " + nodeAssignmentNode.element.assignedNode.getId() + VMDescriptor.METHOD + nodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty) + ") to" + operatorNodeAssignmentNode.element.assignedNode.getId() + VMDescriptor.METHOD + operatorNodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty) + VMDescriptor.ENDMETHOD);
                    ((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).elevate();
                    ((SequencePlanElement) ConstructWorkflow.element).ElementCollection.add(((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).getElevationElement());
                    this.Plan.Variables.Add(((ProcessingWrapper) ConstructWorkflow.wrapperNode.wrapper).getElevationVariable());
                    BoundaryPlanElement boundaryPlanElement = new BoundaryPlanElement();
                    BoundaryConfig boundaryConfig = new BoundaryConfig();
                    boundaryConfig.HostName = operatorNodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty);
                    boundaryConfig.Port = Integer.parseInt(map.get(operatorNodeAssignmentNode.element.assignedNode.getId()).getPort());
                    boundaryConfig.NozzleConfig = new TCPServerNozzleConfig(false, 0);
                    boundaryPlanElement.SetName(boundaryConfig.HostName);
                    boundaryPlanElement.Config = boundaryConfig;
                    boundaryPlanElement.Isolation = new BoundaryIsolationInfo();
                    boundaryPlanElement.Isolation.CleanUp = true;
                    boundaryPlanElement.Isolation.Isolate = false;
                    boundaryPlanElement.Root = ConstructWorkflow.element;
                    ConstructWorkflow.element = boundaryPlanElement;
                }
            }
            arrayList.add(ConstructWorkflow.element);
            arrayList2.add(ConstructWorkflow.wrapperNode);
        }
        FlowPlanElement flowPlanElement = new FlowPlanElement();
        flowPlanElement.ElementCollection = arrayList;
        NodeExecutionInfo ConstructProcessingPlanElement = ConstructProcessingPlanElement((OperatorNodeAssignmentNode) nodeAssignmentNode, arrayList2, map2);
        SequencePlanElement sequencePlanElement = new SequencePlanElement();
        sequencePlanElement.ElementCollection.add(flowPlanElement);
        sequencePlanElement.ElementCollection.add(ConstructProcessingPlanElement.element);
        return new NodeExecutionInfo(sequencePlanElement, ConstructProcessingPlanElement.wrapperNode);
    }

    private NodeExecutionInfo ConstructProcessingPlanElement(OperatorNodeAssignmentNode operatorNodeAssignmentNode, List<WrapperNode> list, Map<String, Float> map) throws Exception {
        List<NamedDataType> arrayList = new ArrayList<>();
        Iterator<WrapperNode> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().wrapper.getOutputVariable());
        }
        String functionality = ((OperatorNode) operatorNodeAssignmentNode.element.processingNode).getFunctionality();
        ProcessingElementConstructor processingElementConstructor = null;
        if (functionality.equalsIgnoreCase("merge")) {
            processingElementConstructor = new MergeElementConstructor();
        } else if (functionality.equalsIgnoreCase("join")) {
            processingElementConstructor = new JoinElementConstructor();
        } else if (functionality.equalsIgnoreCase("except")) {
            processingElementConstructor = new ExceptElementConstructor();
        } else if (functionality.equalsIgnoreCase("fuse")) {
            processingElementConstructor = new MergeElementConstructor();
        }
        Integer num = null;
        float floatValue = map.get(operatorNodeAssignmentNode.element.assignedNode.getId()).floatValue();
        if (floatValue > 1.0f) {
            num = Integer.valueOf(limitBufferCapacity(floatValue));
            logger.info("Node " + operatorNodeAssignmentNode.element.assignedNode.getId() + VMDescriptor.METHOD + operatorNodeAssignmentNode.element.assignedNode.getPropertyByName(gr.uoa.di.madgik.rr.element.infra.HostingNode.HostnameProperty) + VMDescriptor.ENDMETHOD + " is overcommitted (factor:" + floatValue + "). Limiting buffer to: " + num);
        }
        logger.error("planNode : " + operatorNodeAssignmentNode);
        logger.error("planNode.element : " + operatorNodeAssignmentNode.element);
        logger.error("planNode.element.processingNode : " + operatorNodeAssignmentNode.element.processingNode);
        logger.error("elementConstructor : " + processingElementConstructor);
        NodeExecutionInfo constructPlanElement = processingElementConstructor.constructPlanElement(operatorNodeAssignmentNode.element.processingNode.getFunctionalArgs(), arrayList, num);
        FunctionalityWrapper functionalityWrapper = constructPlanElement.wrapperNode.wrapper;
        functionalityWrapper.addVariablesToPlan(this.Plan);
        return new NodeExecutionInfo(constructPlanElement.element, new WrapperNode(functionalityWrapper, list));
    }

    private NodeExecutionInfo ConstructDataSourcePlanElement(DataSourceNodeAssignmentNode dataSourceNodeAssignmentNode) throws WorkflowValidationException, ExecutionValidationException, ExecutionSerializationException, ResourceRegistryException, Exception {
        DataSourceWrapper newSruConsumerServiceWrapper;
        String str = dataSourceNodeAssignmentNode.instanceId;
        DataSourceService GetSourceServiceById = QueryHelper.GetSourceServiceById(str);
        if (GetSourceServiceById == null) {
            throw new Exception("Datasource service with id " + str + " does not exist");
        }
        if (!(GetSourceServiceById instanceof DataSourceService)) {
            throw new Exception("Unrecognized datasource type retrieved from registry: + " + GetSourceServiceById.getClass().getName());
        }
        String endpoint = GetSourceServiceById.getEndpoint();
        String id = GetSourceServiceById.getID();
        logger.info("serviceEndpoint : " + endpoint);
        logger.info("resourceKey     : " + id);
        logger.info("in WorkflowSearchAdaptor:ConstructDataSourcePlanElement serviceEndpoint : " + endpoint);
        if (GetSourceServiceById instanceof FTIndexService) {
            newSruConsumerServiceWrapper = this.datasourceWrapperFactory.newFullTextIndexNodeWrapper(endpoint, this.hints);
        } else if (GetSourceServiceById instanceof OpenSearchDataSourceService) {
            newSruConsumerServiceWrapper = this.datasourceWrapperFactory.newOpenSearchDataSourceServiceWrapper(endpoint, this.hints);
        } else {
            if (!(GetSourceServiceById instanceof SruConsumerService)) {
                throw new Exception("Unrecognized datasource type retrieved from registry: + " + GetSourceServiceById.getClass().getName());
            }
            newSruConsumerServiceWrapper = this.datasourceWrapperFactory.newSruConsumerServiceWrapper(endpoint, this.hints);
        }
        newSruConsumerServiceWrapper.setQuery(((DataSourceNode) dataSourceNodeAssignmentNode.element.processingNode).getCqlInput());
        newSruConsumerServiceWrapper.setResourceKey(id);
        IPlanElement iPlanElement = newSruConsumerServiceWrapper.constructPlanElements()[0];
        newSruConsumerServiceWrapper.addVariablesToPlan(this.Plan);
        return new NodeExecutionInfo(iPlanElement, new WrapperNode(newSruConsumerServiceWrapper, null));
    }

    private int limitBufferCapacity(float f) {
        if (f <= 1.0f) {
            return RecordWriter.DefaultBufferCapacity;
        }
        int floor = (int) Math.floor(RecordWriter.DefaultBufferCapacity * Math.exp((-f) + 1.0f));
        if (floor < 10) {
            return 10;
        }
        return floor;
    }

    private OutputVariableNode ConstructOutputVariableTree(WrapperNode wrapperNode) {
        if (wrapperNode.children == null) {
            return new OutputVariableNode(wrapperNode.wrapper.getOutputVariable().Name, null);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<WrapperNode> it = wrapperNode.children.iterator();
        while (it.hasNext()) {
            arrayList.add(ConstructOutputVariableTree(it.next()));
        }
        return new OutputVariableNode(wrapperNode.wrapper.getOutputVariable().Name, arrayList);
    }

    @Override // gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor
    public void SetAdaptorResources(IAdaptorResources iAdaptorResources) throws WorkflowValidationException {
    }

    @Override // gr.uoa.di.madgik.workflow.adaptor.IWorkflowAdaptor
    public Set<IOutputResource> GetOutput() {
        return null;
    }
}
