package gr.uoa.di.madgik.searchlibrary.operatorlibrary.datasink.predefined;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.reader.ForwardReader;
import gr.uoa.di.madgik.grs.reader.IRecordReader;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.FileField;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.FieldNaming;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.datasink.DataSink;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.stats.StatsContainer;
import java.io.File;
import java.io.FileInputStream;
import java.net.URI;
import java.util.Calendar;
import java.util.Map;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gr/uoa/di/madgik/searchlibrary/operatorlibrary/datasink/predefined/FTPDataSink.class */
public class FTPDataSink<T extends Record> extends DataSink {
    private static final String PARAMETER_DirectoryName = "directory";
    private static final String PARAMETER_Username = "username";
    private static final String PARAMETER_Password = "password";
    private static final String PARAMETER_Port = "port";
    private static Logger log = LoggerFactory.getLogger(FTPDataSink.class.getName());
    private FTPClient ftpClient;
    private String server;
    private int port;
    private String username;
    private String password;
    private String directory;
    private IRecordReader<T> reader;

    public FTPDataSink(URI uri, String str, Map<String, String> map, StatsContainer statsContainer) throws Exception {
        super(uri, str, map, statsContainer);
        this.ftpClient = new FTPClient();
        this.username = "anonymous";
        this.password = "anonymous";
        this.directory = "";
        this.reader = null;
        this.reader = new ForwardReader(uri);
        RecordDefinition recordDefinition = this.reader.getRecordDefinitions()[0];
        if (recordDefinition.getDefinition(FieldNaming.FTPFieldName.id.name()) < 0 || recordDefinition.getDefinition(FieldNaming.FTPFieldName.bytestream.name()) < 0 || recordDefinition.getDefinition(FieldNaming.FTPFieldName.mimeType.name()) < 0) {
            log.error("FTPDataSink could not be initialized, cause corresponding fields are missing from resultSet");
            throw new Exception("FTPDataSink could not be initialized, cause corresponding fields are missing from resultSet");
        }
        this.server = str;
        this.port = this.ftpClient.getDefaultPort();
        this.directory = "";
        if (map != null && map.size() > 0) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                if (entry.getKey() != null && entry.getKey().trim().length() > 0 && entry.getValue() != null) {
                    if (entry.getKey().equals(PARAMETER_DirectoryName)) {
                        this.directory = entry.getValue();
                        if (!this.directory.endsWith("/")) {
                            this.directory += "/";
                        }
                    } else if (entry.getKey().equals(PARAMETER_Username)) {
                        this.username = entry.getValue();
                    } else if (entry.getKey().equals(PARAMETER_Password)) {
                        this.password = entry.getValue();
                    } else if (entry.getKey().equals(PARAMETER_Port)) {
                        this.port = Integer.parseInt(entry.getValue());
                    }
                }
            }
        }
        log.info("Ininializing ftp data sink at: " + this.username + "@" + this.server + ":" + this.port + "/" + this.directory);
        this.ftpClient.connect(this.server, this.port);
        this.ftpClient.login(this.username, this.password);
        this.ftpClient.setFileType(2);
        log.info("Connected to " + this.server + ".");
        log.info("FTP server replied: " + this.ftpClient.getReplyString());
        int replyCode = this.ftpClient.getReplyCode();
        if (!FTPReply.isPositiveCompletion(replyCode)) {
            log.error("FTP server refused connection. Reply CODE: " + replyCode);
            this.ftpClient.disconnect();
            throw new Exception("FTP server refused connection. Reply CODE: " + replyCode);
        }
        this.output = "ftp://" + this.server + "/" + this.directory;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Record record;
        Thread.currentThread().setName(FTPDataSink.class.getName());
        long timeInMillis = Calendar.getInstance().getTimeInMillis();
        long j = timeInMillis;
        long j2 = timeInMillis;
        int i = 0;
        while (this.reader.getStatus() != IBuffer.Status.Dispose && (this.reader.getStatus() != IBuffer.Status.Close || this.reader.availableRecords() != 0)) {
            try {
                try {
                    try {
                        record = this.reader.get(this.timeout, this.timeUnit);
                    } catch (Exception e) {
                        log.warn("Could not retrieve and store the record. Continuing", e);
                    }
                    if (record == null) {
                        if (this.reader.getStatus() == IBuffer.Status.Open) {
                            log.warn("Producer has timed out");
                        }
                        break;
                    }
                    if (i == 0) {
                        j = Calendar.getInstance().getTimeInMillis();
                    }
                    String str = null;
                    File file = null;
                    try {
                        StringField field = record.getField(FieldNaming.FTPFieldName.id.name());
                        if (field instanceof StringField) {
                            str = field.getPayload();
                        }
                        FileField field2 = record.getField(FieldNaming.FTPFieldName.bytestream.name());
                        if (field2 instanceof FileField) {
                            file = field2.getPayload();
                        }
                        StringField field3 = record.getField(FieldNaming.FTPFieldName.mimeType.name());
                        storeFTPFile(this.directory + (str.hashCode() + (field3 instanceof StringField ? field3.getPayload() : "").replaceAll("/", ".")), file);
                        i++;
                        if (i == 1) {
                            j2 = Calendar.getInstance().getTimeInMillis();
                        }
                    } catch (Exception e2) {
                        log.warn("Could not extract payload from record #" + i + ". Continuing");
                    }
                } catch (Exception e3) {
                    log.error("Error during datasink retrieval. Closing", e3);
                    try {
                        this.reader.close();
                        if (this.ftpClient.isConnected()) {
                            this.ftpClient.disconnect();
                        }
                    } catch (Exception e4) {
                    }
                }
            } catch (Throwable th) {
                try {
                    this.reader.close();
                    if (this.ftpClient.isConnected()) {
                        this.ftpClient.disconnect();
                    }
                } catch (Exception e5) {
                }
                throw th;
            }
        }
        try {
            this.reader.close();
            if (this.ftpClient.isConnected()) {
                this.ftpClient.disconnect();
            }
        } catch (Exception e6) {
        }
        long timeInMillis2 = Calendar.getInstance().getTimeInMillis();
        this.stats.timeToComplete(timeInMillis2 - timeInMillis);
        this.stats.timeToFirstInput(j - timeInMillis);
        this.stats.timeToFirst(j2 - timeInMillis);
        this.stats.producedResults(i);
        this.stats.productionRate((i / ((float) (timeInMillis2 - timeInMillis))) * 1000.0f);
        log.info("DATASINK OPERATOR:Produced first result in " + (j2 - timeInMillis) + " milliseconds\nProduced last result in " + (timeInMillis2 - timeInMillis) + " milliseconds\nProduced " + i + " results\nProduction rate was " + ((i / ((float) (timeInMillis2 - timeInMillis))) * 1000.0f) + " records per second");
    }

    public void storeFTPFile(String str, File file) {
        int i = 0;
        while (true) {
            log.debug("Storing element as " + str);
            try {
                FileInputStream fileInputStream = new FileInputStream(file);
                if (fileInputStream != null) {
                    if (this.ftpClient.storeFile(str, fileInputStream)) {
                        log.debug("Data element stored succesfuly at " + str);
                    } else {
                        log.warn("Data element was not stored succesfuly at " + str);
                    }
                    fileInputStream.close();
                    return;
                }
                return;
            } catch (Exception e) {
                if (i == 3) {
                    log.error("Did not manage to append element in the data sink");
                    return;
                }
                i++;
                log.warn("Did not manage to append element in the data sink, reconnecting and trying again...", e);
                try {
                    reconnect();
                } catch (Exception e2) {
                    log.error("Did not manage to reconnect to the ftp site", e2);
                    return;
                }
            }
        }
    }

    private void reconnect() throws Exception {
        this.ftpClient.connect(this.server, this.port);
        this.ftpClient.login(this.username, this.password);
        this.ftpClient.setFileType(2);
    }

    @Override // gr.uoa.di.madgik.searchlibrary.operatorlibrary.datasink.DataSink
    public String getOutput() {
        return this.output;
    }
}
