package eu.dnetlib.enabling.manager.msro.openaire.hbase;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.NodeToken;
import eu.dnetlib.data.information.DataSourceResolver;
import eu.dnetlib.data.transform.Column;
import eu.dnetlib.data.transform.Row;
import eu.dnetlib.data.transform.XsltRowTransformer;
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
import eu.dnetlib.enabling.resultset.WorkflowCountingResultSetFactory;
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
import eu.dnetlib.workflow.AsyncJobNode;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Iterator;
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.core.io.Resource;

/* loaded from: input_file:eu/dnetlib/enabling/manager/msro/openaire/hbase/StoreHBaseRecords.class */
public class StoreHBaseRecords extends AsyncJobNode {
    private static final Log log = LogFactory.getLog(StoreHBaseRecords.class);
    private DataSourceResolver dataSourceResolver;
    private ResultSetClientFactory resultSetClientFactory;
    private Resource xslt;
    private XsltRowTransformerFactory xsltRowTransformerFactory;
    private WorkflowCountingResultSetFactory countingRSFactory;
    private int batchSize;

    protected void executeAsync(Engine engine, NodeToken nodeToken) {
        String attribute = nodeToken.getEnv().getAttribute("dataSource");
        String attribute2 = nodeToken.getEnv().getAttribute("hbaseTable");
        Configuration configuration = (Configuration) nodeToken.getEnv().getTransientAttribute("hbaseConf");
        try {
            doWrite(getCountingRSFactory().createCountingResultSet(getDataSourceResolver().resolve(attribute).retrieve(), nodeToken), getXsltRowTransformerFactory().getTransformer(xsltString()), attribute2, configuration);
            log.info("finished import to HBase");
            engine.complete(nodeToken, Arc.DEFAULT_ARC);
        } catch (Throwable th) {
            log.error("error: ", th);
            failed(engine, nodeToken, th);
        }
    }

    private void doWrite(W3CEndpointReference w3CEndpointReference, XsltRowTransformer xsltRowTransformer, String str, Configuration configuration) throws IOException {
        HTable hTable = new HTable(configuration, str);
        hTable.setAutoFlush(false, true);
        ArrayList newArrayList = Lists.newArrayList();
        int i = 0;
        try {
            for (Row row : Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(w3CEndpointReference), xsltRowTransformer))) {
                log.debug(row);
                Put put = new Put(Bytes.toBytes(row.getKey()));
                Iterator it = row.iterator();
                while (it.hasNext()) {
                    Column column = (Column) it.next();
                    put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes((String) column.getName()), (byte[]) column.getValue());
                }
                newArrayList.add(put);
                i++;
                if (i % getBatchSize() == 0) {
                    hTable.put(newArrayList);
                    newArrayList.clear();
                }
            }
        } finally {
            if (!newArrayList.isEmpty()) {
                hTable.put(newArrayList);
            }
            hTable.flushCommits();
            hTable.close();
        }
    }

    private String xsltString() {
        if (getXslt() == null) {
            return null;
        }
        StringWriter stringWriter = new StringWriter();
        try {
            IOUtils.copy(getXslt().getInputStream(), stringWriter);
            return stringWriter.toString();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }
    }

    public DataSourceResolver getDataSourceResolver() {
        return this.dataSourceResolver;
    }

    @Required
    public void setDataSourceResolver(DataSourceResolver dataSourceResolver) {
        this.dataSourceResolver = dataSourceResolver;
    }

    public ResultSetClientFactory getResultSetClientFactory() {
        return this.resultSetClientFactory;
    }

    @Required
    public void setResultSetClientFactory(ResultSetClientFactory resultSetClientFactory) {
        this.resultSetClientFactory = resultSetClientFactory;
    }

    public XsltRowTransformerFactory getXsltRowTransformerFactory() {
        return this.xsltRowTransformerFactory;
    }

    @Required
    public void setXsltRowTransformerFactory(XsltRowTransformerFactory xsltRowTransformerFactory) {
        this.xsltRowTransformerFactory = xsltRowTransformerFactory;
    }

    public int getBatchSize() {
        return this.batchSize;
    }

    @Required
    public void setBatchSize(int i) {
        this.batchSize = i;
    }

    public Resource getXslt() {
        return this.xslt;
    }

    public void setXslt(Resource resource) {
        this.xslt = resource;
    }

    public WorkflowCountingResultSetFactory getCountingRSFactory() {
        return this.countingRSFactory;
    }

    @Required
    public void setCountingRSFactory(WorkflowCountingResultSetFactory workflowCountingResultSetFactory) {
        this.countingRSFactory = workflowCountingResultSetFactory;
    }
}
