package eu.dnetlib.data.collective.harvest.provider;

import eu.dnetlib.data.collective.harvest.provider.fs.ArchiveWalker;
import eu.dnetlib.data.collective.harvest.provider.fs.FileWalker;
import eu.dnetlib.data.collective.harvest.provider.fs.TryFileWalker;
import eu.dnetlib.data.collective.harvest.provider.ftp.FTPFileWalker;
import eu.dnetlib.data.collective.harvest.provider.ftp.FTPFilter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.DirectoryWalker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:eu/dnetlib/data/collective/harvest/provider/DataProvider.class */
public class DataProvider<T> implements Iterable<T> {
    protected ExecutorService producer = Executors.newSingleThreadExecutor();
    protected BlockingQueue<T> queue = new ArrayBlockingQueue(20);
    protected URI source;
    protected FileType type;
    protected String username;
    protected String password;
    protected String ftpFileFilter;
    private static final Log log = LogFactory.getLog(DataProvider.class);
    public static final Object done = new Object();

    /* loaded from: input_file:eu/dnetlib/data/collective/harvest/provider/DataProvider$FileType.class */
    public enum FileType {
        TEXT,
        PDF,
        DOC
    }

    public DataProvider(FileType fileType, String str) throws URISyntaxException {
        this.type = fileType;
        this.source = URI.create(str);
    }

    @Override // java.lang.Iterable
    public Iterator<T> iterator() {
        if (this.source == null) {
            return null;
        }
        try {
            return doReadFilesUnder(this.source);
        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    public BlockingStream<T> readFilesUnder(String str) throws FileNotFoundException, URISyntaxException {
        return doReadFromFileSystem(getFileSource(URI.create(str)));
    }

    public BlockingStream<T> readFilesUnderFTP(String str) throws FileNotFoundException, URISyntaxException {
        return doReadFilesUnder(URI.create(str));
    }

    private File getFileSource(URI uri) throws FileNotFoundException {
        File file = new File(uri.getPath());
        if (file.exists()) {
            return file;
        }
        throw new FileNotFoundException("file " + uri + " doesn't exist or is not a directory");
    }

    public String getUsername() {
        return this.username;
    }

    public void setUsername(String str) {
        this.username = str;
    }

    public String getPassword() {
        return this.password;
    }

    public void setPassword(String str) {
        this.password = str;
    }

    public String getFtpFileFilter() {
        return this.ftpFileFilter;
    }

    public void setFtpFileFilter(String str) {
        this.ftpFileFilter = str;
    }

    private BlockingStream<T> doReadFromFileSystem(File file) throws FileNotFoundException {
        log.info("reading files under " + file.getAbsolutePath());
        if (file.isDirectory()) {
            doReadFromDirectory(this.queue);
        } else {
            doReadFromArchive(this.queue);
        }
        return new BlockingStream<>(this.queue);
    }

    private BlockingStream<T> doReadFilesUnder(URI uri) throws FileNotFoundException {
        ProtocolType valueOf = ProtocolType.valueOf(uri.getScheme());
        switch (valueOf) {
            case ftp:
            case sftp:
                doReadFromFTP(this.queue, uri);
                break;
            case file:
                doReadFromFileSystem(getFileSource(uri));
                break;
            default:
                throw new IllegalArgumentException("Invalid protocol type: " + valueOf.name());
        }
        return new BlockingStream<>(this.queue);
    }

    private void doReadFromArchive(BlockingQueue<T> blockingQueue) throws FileNotFoundException {
        final File fileSource = getFileSource(this.source);
        final ArchiveWalker archiveWalker = new ArchiveWalker(blockingQueue, this.type, fileSource);
        this.producer.execute(new Runnable() { // from class: eu.dnetlib.data.collective.harvest.provider.DataProvider.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    archiveWalker.doWalk();
                    DataProvider.log.info("finished to iterate under " + fileSource.getAbsolutePath());
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }
        });
    }

    private void doReadFromDirectory(BlockingQueue<T> blockingQueue) throws FileNotFoundException {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final File fileSource = getFileSource(this.source);
        try {
            new TryFileWalker(linkedBlockingQueue, this.type, fileSource).doWalk();
        } catch (DirectoryWalker.CancelException e) {
        } catch (IOException e2) {
            log.info("got an exceptionn while trying to read one file from FileWalker, percolating", e2);
            throw new IllegalStateException(e2);
        }
        final FileWalker fileWalker = new FileWalker(blockingQueue, this.type, fileSource);
        this.producer.execute(new Runnable() { // from class: eu.dnetlib.data.collective.harvest.provider.DataProvider.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    fileWalker.doWalk();
                    DataProvider.log.info("finished to iterate under " + fileSource.getAbsolutePath());
                } catch (IOException e3) {
                    throw new IllegalStateException(e3);
                }
            }
        });
    }

    private void doReadFromFTP(BlockingQueue<T> blockingQueue, final URI uri) {
        ProtocolType valueOf = ProtocolType.valueOf(uri.getScheme());
        ItemUtility itemUtility = new ItemUtility(this.source);
        if (itemUtility.getPassword() == null && this.password != null) {
            itemUtility.setPassword(this.password);
        }
        if (itemUtility.getUsername() == null && this.username != null) {
            itemUtility.setUsername(this.username);
        }
        FTPFilter fTPFilter = null;
        if (this.ftpFileFilter != null && this.ftpFileFilter.length() > 0) {
            fTPFilter = new FTPFilter(this.ftpFileFilter);
            log.info("FTP FILTER is not nulll but " + this.ftpFileFilter);
        }
        final FTPFileWalker fTPFileWalker = new FTPFileWalker(blockingQueue, fTPFilter, 0, itemUtility, valueOf);
        this.producer.execute(new Runnable() { // from class: eu.dnetlib.data.collective.harvest.provider.DataProvider.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    fTPFileWalker.doWalk(uri.getPath());
                    DataProvider.log.info("finished to iterate under " + uri.getPath());
                } catch (IOException e) {
                    throw new IllegalStateException(e);
                }
            }
        });
    }
}
