package eu.dnetlib.msro.dli.workflows.nodes.resolver;

import com.google.common.collect.Lists;
import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.NodeToken;
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
import eu.dnetlib.enabling.resultset.client.IterableResultSetClient;
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
import eu.dnetlib.msro.dli.workflows.nodes.transform.DOIResolverWorker;
import eu.dnetlib.msro.workflows.dli.model.DLIObject;
import eu.dnetlib.msro.workflows.dli.resolver.DOIResolver;
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Resource;
import org.antlr.stringtemplate.StringTemplate;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;

/* loaded from: input_file:eu/dnetlib/msro/dli/workflows/nodes/resolver/ResolvingHarvestedPidJobNode.class */
public class ResolvingHarvestedPidJobNode extends SimpleJobNode {
    private static final Log log = LogFactory.getLog(ResolvingHarvestedPidJobNode.class);
    private static final DLIObject END_QUEUE = new DLIObject();
    private String inputEprParam;
    private String outputEprParam;

    @Autowired
    private ResultSetClientFactory resultSetClientFactory;

    @Autowired
    List<DOIResolver> doiResolvers;
    private StringTemplate xmlTemplate;

    @Resource(name = "iterableResultSetFactory")
    private IterableResultSetFactory resultSetFactory;
    private int numberOfThread = 4;

    protected String execute(NodeToken nodeToken) throws Exception {
        final IterableResultSetClient client = this.resultSetClientFactory.getClient(new EPRUtils().getEpr(nodeToken.getEnv().getAttribute(getInputEprParam())));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(getNumberOfThread());
        final ArrayList newArrayList = Lists.newArrayList();
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1000);
        final ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(10000);
        for (int i = 0; i < this.numberOfThread; i++) {
            newArrayList.add(newFixedThreadPool.submit(new DOIResolverWorker(arrayBlockingQueue, arrayBlockingQueue2, this.doiResolvers, "END_QUEUE")));
        }
        new Thread(new Runnable() { // from class: eu.dnetlib.msro.dli.workflows.nodes.resolver.ResolvingHarvestedPidJobNode.1
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = client.iterator();
                while (it.hasNext()) {
                    try {
                        arrayBlockingQueue.put((String) it.next());
                    } catch (InterruptedException e) {
                        ResolvingHarvestedPidJobNode.log.error("Error on inserting element on queue", e);
                    }
                }
                try {
                    arrayBlockingQueue.put("END_QUEUE");
                } catch (InterruptedException e2) {
                    ResolvingHarvestedPidJobNode.log.error("Error on inserting element on queue", e2);
                }
                try {
                    Iterator it2 = newArrayList.iterator();
                    while (it2.hasNext()) {
                        ResolvingHarvestedPidJobNode.log.info("Thread terminated correctly" + ((Future) it2.next()).get());
                    }
                    arrayBlockingQueue2.put(ResolvingHarvestedPidJobNode.END_QUEUE);
                } catch (Exception e3) {
                    ResolvingHarvestedPidJobNode.log.error(e3);
                }
            }
        }).start();
        nodeToken.getEnv().setAttribute(getOutputEprParam(), this.resultSetFactory.createIterableResultSet(new Iterable<String>() { // from class: eu.dnetlib.msro.dli.workflows.nodes.resolver.ResolvingHarvestedPidJobNode.2
            @Override // java.lang.Iterable
            public Iterator<String> iterator() {
                return new DLIObjectIteratorQueue(arrayBlockingQueue2, ResolvingHarvestedPidJobNode.END_QUEUE, ResolvingHarvestedPidJobNode.this.xmlTemplate);
            }
        }).toString());
        return Arc.DEFAULT_ARC;
    }

    public String getInputEprParam() {
        return this.inputEprParam;
    }

    public void setInputEprParam(String str) {
        this.inputEprParam = str;
    }

    public String getOutputEprParam() {
        return this.outputEprParam;
    }

    public void setOutputEprParam(String str) {
        this.outputEprParam = str;
    }

    @Required
    public void setTemplate(org.springframework.core.io.Resource resource) throws IOException {
        this.xmlTemplate = new StringTemplate(IOUtils.toString(resource.getInputStream()));
    }

    public int getNumberOfThread() {
        return this.numberOfThread;
    }

    public void setNumberOfThread(int i) {
        this.numberOfThread = i;
    }
}
