/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.plugin.base;

import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLOutputFactory;
import javax.xml.stream.events.EndElement;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BaseCollectorIterator
implements Iterator<String> {
    private String nextElement;
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(100);
    private static final Logger log = LoggerFactory.getLogger(BaseCollectorIterator.class);
    private static final String END_ELEM = "__END__";

    public BaseCollectorIterator(FileSystem fs, Path filePath, AggregatorReport report) {
        new Thread(() -> this.importHadoopFile(fs, filePath, report)).start();
        try {
            this.nextElement = this.queue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    protected BaseCollectorIterator(String resourcePath, AggregatorReport report) {
        new Thread(() -> this.importTestFile(resourcePath, report)).start();
        try {
            this.nextElement = this.queue.take();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public synchronized boolean hasNext() {
        return this.nextElement != null & !END_ELEM.equals(this.nextElement);
    }

    @Override
    public synchronized String next() {
        try {
            String string = END_ELEM.equals(this.nextElement) ? null : this.nextElement;
            return string;
        }
        finally {
            try {
                this.nextElement = this.queue.take();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void importHadoopFile(FileSystem fs, Path filePath, AggregatorReport report) {
        log.info("I start to read the TAR stream");
        try (FSDataInputStream origInputStream = fs.open(filePath);
             TarArchiveInputStream tarInputStream = new TarArchiveInputStream((InputStream)origInputStream);){
            this.importTarStream(tarInputStream, report);
        }
        catch (Throwable e) {
            throw new RuntimeException("Error processing BASE records", e);
        }
    }

    private void importTestFile(String resourcePath, AggregatorReport report) {
        try (InputStream origInputStream = BaseCollectorIterator.class.getResourceAsStream(resourcePath);
             TarArchiveInputStream tarInputStream = new TarArchiveInputStream(origInputStream);){
            this.importTarStream(tarInputStream, report);
        }
        catch (Throwable e) {
            throw new RuntimeException("Error processing BASE records", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void importTarStream(TarArchiveInputStream tarInputStream, AggregatorReport report) {
        long count = 0L;
        XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
        XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newInstance();
        try {
            TarArchiveEntry entry;
            while ((entry = (TarArchiveEntry)tarInputStream.getNextEntry()) != null) {
                String name = entry.getName();
                if (entry.isDirectory() || !name.contains("ListRecords") || !name.endsWith(".bz2")) continue;
                log.info("Processing file (BZIP): " + name);
                byte[] bzipData = new byte[(int)entry.getSize()];
                IOUtils.readFully((InputStream)tarInputStream, (byte[])bzipData);
                try (ByteArrayInputStream bzipIs = new ByteArrayInputStream(bzipData);
                     BufferedInputStream bzipBis = new BufferedInputStream(bzipIs);){
                    CompressorInputStream bzipInput = new CompressorStreamFactory().createCompressorInputStream((InputStream)bzipBis);
                    try {
                        XMLEventReader reader = xmlInputFactory.createXMLEventReader((InputStream)bzipInput);
                        XMLEventWriter eventWriter = null;
                        StringWriter xmlWriter = null;
                        while (reader.hasNext()) {
                            EndElement endElement;
                            StartElement startElement;
                            XMLEvent nextEvent = reader.nextEvent();
                            if (nextEvent.isStartElement() && "record".equals((startElement = nextEvent.asStartElement()).getName().getLocalPart())) {
                                xmlWriter = new StringWriter();
                                eventWriter = xmlOutputFactory.createXMLEventWriter(xmlWriter);
                            }
                            if (eventWriter != null) {
                                eventWriter.add(nextEvent);
                            }
                            if (!nextEvent.isEndElement() || !"record".equals((endElement = nextEvent.asEndElement()).getName().getLocalPart())) continue;
                            eventWriter.flush();
                            eventWriter.close();
                            this.queue.put(xmlWriter.toString());
                            eventWriter = null;
                            xmlWriter = null;
                            ++count;
                        }
                    }
                    finally {
                        if (bzipInput == null) continue;
                        bzipInput.close();
                    }
                }
            }
            this.queue.put(END_ELEM);
            return;
        }
        catch (Throwable e) {
            log.error("Error processing BASE records", e);
            report.put((Object)e.getClass().getName(), (Object)e.getMessage());
            throw new RuntimeException("Error processing BASE records", e);
        }
        finally {
            log.info("Total records (written in queue): " + count);
        }
    }
}

