package eu.dnetlib.iis.importer.logs;

import eu.dnetlib.iis.core.java.HadoopContext;
import eu.dnetlib.iis.core.java.PortBindings;
import eu.dnetlib.iis.core.java.Process;
import eu.dnetlib.iis.core.java.io.DataStore;
import eu.dnetlib.iis.core.java.io.FileSystemPath;
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
import eu.dnetlib.iis.core.java.porttype.PortType;
import eu.dnetlib.iis.websiteusage.schemas.LogEntry;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.security.InvalidParameterException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

/* loaded from: input_file:eu/dnetlib/iis/importer/logs/PiwikLogsImporter.class */
public class PiwikLogsImporter implements Process {
    private static final String NULL_VALUE = "NULL";
    private static final String TIMESTAMP_HEADER = "timestamp";
    private static final String PORT_OUT_LOG_ENTRIES = "log_entries";
    public static final String PARAM_PIWIK_LOGS_PATH = "import.piwik.logs.path";
    private final Logger log = Logger.getLogger(getClass());
    private static final Map<String, PortType> outputPorts = new HashMap();

    public PiwikLogsImporter() {
        outputPorts.put(PORT_OUT_LOG_ENTRIES, new AvroPortType(LogEntry.SCHEMA$));
    }

    public Map<String, PortType> getInputPorts() {
        return Collections.emptyMap();
    }

    public Map<String, PortType> getOutputPorts() {
        return outputPorts;
    }

    public void run(PortBindings portBindings, HadoopContext hadoopContext, Map<String, String> map) throws Exception {
        FileSystem fileSystem = FileSystem.get(hadoopContext.getConfiguration());
        if (!map.containsKey(PARAM_PIWIK_LOGS_PATH)) {
            throw new InvalidParameterException("required parameter 'import.piwik.logs.path' is missing!");
        }
        DataFileWriter<LogEntry> dataFileWriter = null;
        try {
            dataFileWriter = DataStore.create(new FileSystemPath(fileSystem, (Path) portBindings.getOutput().get(PORT_OUT_LOG_ENTRIES)), LogEntry.SCHEMA$);
            processNode(fileSystem, new Path(map.get(PARAM_PIWIK_LOGS_PATH)), dataFileWriter);
            if (dataFileWriter != null) {
                dataFileWriter.close();
            }
        } catch (Throwable th) {
            if (dataFileWriter != null) {
                dataFileWriter.close();
            }
            throw th;
        }
    }

    protected void processNode(FileSystem fileSystem, Path path, DataFileWriter<LogEntry> dataFileWriter) throws Exception {
        if (fileSystem.isDirectory(path)) {
            for (FileStatus fileStatus : fileSystem.listStatus(path)) {
                processNode(fileSystem, fileStatus.getPath(), dataFileWriter);
            }
            return;
        }
        BufferedReader bufferedReader = null;
        try {
            bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
            while (true) {
                String readLine = bufferedReader.readLine();
                if (readLine == null) {
                    break;
                }
                String[] split = readLine.split("\t", -1);
                if (split.length != 5) {
                    this.log.warn("skipping invalid line:\n" + readLine + "\ncontaining " + split.length + " tokens in file: " + path.toString());
                } else if (TIMESTAMP_HEADER.equals(split[0])) {
                    this.log.debug("skipping file header in path: " + path.toString());
                } else {
                    LogEntry.Builder newBuilder = LogEntry.newBuilder();
                    newBuilder.setTimestamp(split[0]);
                    newBuilder.setAction(split[1]);
                    if (!isEmpty(split[2])) {
                        newBuilder.setUser(split[2]);
                    }
                    if (!isEmpty(split[3])) {
                        newBuilder.setSession(split[3]);
                    }
                    if (!isEmpty(split[4])) {
                        newBuilder.setData(split[4]);
                    }
                    dataFileWriter.append(newBuilder.build());
                }
            }
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        } catch (Throwable th) {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            throw th;
        }
    }

    private static boolean isEmpty(String str) {
        return str == null || str.trim().isEmpty() || NULL_VALUE.equals(str);
    }
}
