/*
 * Decompiled with CFR 0.152.
 */
package com.coxautodata.utils;

import com.coxautodata.SparkDistCPOptions;
import com.coxautodata.objects.CopyDefinitionWithDependencies;
import com.coxautodata.objects.Logging;
import com.coxautodata.objects.SerializableFileStatus;
import com.coxautodata.objects.SerializableFileStatus$;
import com.coxautodata.objects.SingleCopyDefinition;
import com.coxautodata.utils.FileListUtils;
import com.coxautodata.utils.PathUtils$;
import java.io.FileNotFoundException;
import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.rdd.RDD$;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.BuildFrom$;
import scala.collection.Iterable;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Seq;
import scala.collection.SeqOps;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.Await$;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.Duration$;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;
import scala.util.Try$;
import scala.util.matching.Regex;

public final class FileListUtils$
implements Logging {
    public static final FileListUtils$ MODULE$ = new FileListUtils$();
    private static Logger com$coxautodata$objects$Logging$$log;

    static {
        Logging.$init$(MODULE$);
    }

    @Override
    public String logName() {
        return Logging.logName$(this);
    }

    @Override
    public void setLogLevel(Level level) {
        Logging.setLogLevel$(this, level);
    }

    @Override
    public void logInfo(Function0<String> msg) {
        Logging.logInfo$(this, msg);
    }

    @Override
    public void logDebug(Function0<String> msg) {
        Logging.logDebug$(this, msg);
    }

    @Override
    public void logTrace(Function0<String> msg) {
        Logging.logTrace$(this, msg);
    }

    @Override
    public void logWarning(Function0<String> msg) {
        Logging.logWarning$(this, msg);
    }

    @Override
    public void logError(Function0<String> msg) {
        Logging.logError$(this, msg);
    }

    @Override
    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$(this, msg, throwable);
    }

    @Override
    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$(this, msg, throwable);
    }

    @Override
    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$(this, msg, throwable);
    }

    @Override
    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$(this, msg, throwable);
    }

    @Override
    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$(this, msg, throwable);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public Logger com$coxautodata$objects$Logging$$log() {
        return com$coxautodata$objects$Logging$$log;
    }

    @Override
    public final void com$coxautodata$objects$Logging$_setter_$com$coxautodata$objects$Logging$$log_$eq(Logger x$1) {
        com$coxautodata$objects$Logging$$log = x$1;
    }

    public <T> FileListUtils.ScalaRemoteIterator<T> com$coxautodata$utils$FileListUtils$$ScalaRemoteIterator(RemoteIterator<T> underlying) {
        return new FileListUtils.ScalaRemoteIterator<T>(underlying);
    }

    public scala.collection.immutable.Seq<Tuple2<SerializableFileStatus, scala.collection.immutable.Seq<SerializableFileStatus>>> listFiles(FileSystem fs, Path path, int threads, boolean includePathRootInDependents, List<Regex> filterNot) {
        Predef$.MODULE$.assert(threads > 0, (Function0 & Serializable)() -> "Number of threads must be positive");
        FileStatus fileStatus = fs.getFileStatus(path);
        None$ maybePathRoot = includePathRootInDependents && fileStatus.isDirectory() ? new Some((Object)SerializableFileStatus$.MODULE$.apply(fileStatus)) : None$.MODULE$;
        LinkedBlockingQueue processed = new LinkedBlockingQueue(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)Option$.MODULE$.option2Iterable(maybePathRoot.map((Function1 & Serializable)x$1 -> new Tuple2(x$1, (Object)package$.MODULE$.Seq().empty()))).toSeq()).asJava());
        LinkedBlockingDeque toProcess = new LinkedBlockingDeque(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)new Tuple2((Object)path, (Object)Option$.MODULE$.option2Iterable((Option)maybePathRoot).toSeq()), (List)Nil$.MODULE$)).asJava());
        ConcurrentLinkedQueue exceptions = new ConcurrentLinkedQueue();
        ConcurrentHashMap threadsWorking = new ConcurrentHashMap();
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Beginning recursive list of [" + path + "]");
        List tasks = ((List)package$.MODULE$.List().fill(threads, (Function0 & Serializable)() -> {
            public class Com_coxautodata_utils_FileListUtils$FileLister$1
            implements Runnable {
                private final FileSystem localFS;
                private final UUID uuid;
                private final ConcurrentHashMap threadsWorking$1;
                private final LinkedBlockingDeque toProcess$1;
                private final List filterNot$1;
                private final LinkedBlockingQueue processed$1;
                private final ConcurrentLinkedQueue exceptions$1;

                private FileSystem localFS() {
                    return this.localFS;
                }

                private UUID uuid() {
                    return this.uuid;
                }

                public void run() {
                    while (this.threadsWorking$1.containsValue(BoxesRunTime.boxToBoolean((boolean)true))) {
                        Option option = Try$.MODULE$.apply((Function0 & Serializable)() -> Option$.MODULE$.apply($this.toProcess$1.pollFirst(50L, TimeUnit.MILLISECONDS))).toOption().flatten((.less.colon.less)$less$colon$less$.MODULE$.refl());
                        if (None$.MODULE$.equals(option)) {
                            this.threadsWorking$1.put(this.uuid(), BoxesRunTime.boxToBoolean((boolean)false));
                            continue;
                        }
                        if (option instanceof Some) {
                            Object object;
                            Some some = (Some)option;
                            Tuple2 p = (Tuple2)some.value();
                            FileListUtils$.MODULE$.logDebug((Function0<String>)(Function0 & Serializable)() -> "Thread [" + this.uuid() + "] searching [" + p._1() + "], waiting to process depth [" + $this.toProcess$1.size() + "]");
                            this.threadsWorking$1.put(this.uuid(), BoxesRunTime.boxToBoolean((boolean)true));
                            try {
                                FileListUtils$.MODULE$.com$coxautodata$utils$FileListUtils$$ScalaRemoteIterator(this.localFS().listLocatedStatus((Path)p._1())).foreach((Function1 & Serializable)x0$1 -> {
                                    LocatedFileStatus locatedFileStatus = x0$1;
                                    if (locatedFileStatus.isSymlink()) {
                                        throw new RuntimeException("Link [" + locatedFileStatus + "] is not supported");
                                    }
                                    if (locatedFileStatus.isDirectory()) {
                                        if (!$this.filterNot$1.exists((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)Com_coxautodata_utils_FileListUtils$FileLister$1.$anonfun$run$4(locatedFileStatus, x$2)))) {
                                            SerializableFileStatus s = SerializableFileStatus$.MODULE$.apply((FileStatus)locatedFileStatus);
                                            $this.toProcess$1.addFirst(new Tuple2((Object)locatedFileStatus.getPath(), ((SeqOps)p._2()).$colon$plus((Object)s)));
                                            return BoxesRunTime.boxToBoolean((boolean)$this.processed$1.add(new Tuple2((Object)s, p._2())));
                                        }
                                        return BoxedUnit.UNIT;
                                    }
                                    if (locatedFileStatus.isFile()) {
                                        if (!$this.filterNot$1.exists((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)Com_coxautodata_utils_FileListUtils$FileLister$1.$anonfun$run$5(locatedFileStatus, x$3)))) {
                                            return BoxesRunTime.boxToBoolean((boolean)$this.processed$1.add(new Tuple2((Object)SerializableFileStatus$.MODULE$.apply((FileStatus)locatedFileStatus), p._2())));
                                        }
                                        return BoxedUnit.UNIT;
                                    }
                                    throw new MatchError((Object)locatedFileStatus);
                                });
                                object = BoxedUnit.UNIT;
                            }
                            catch (FileNotFoundException nf) {
                                FileListUtils$.MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "Skip path " + p._1() + "]: does no longer exist");
                                object = BoxedUnit.UNIT;
                            }
                            catch (Exception e) {
                                object = BoxesRunTime.boxToBoolean((boolean)this.exceptions$1.add(e));
                            }
                            continue;
                        }
                        throw new MatchError((Object)option);
                    }
                }

                public static final /* synthetic */ boolean $anonfun$run$4(LocatedFileStatus x1$1, Regex x$2) {
                    return x$2.findFirstIn((CharSequence)x1$1.getPath().toString()).isDefined();
                }

                public static final /* synthetic */ boolean $anonfun$run$5(LocatedFileStatus x1$1, Regex x$3) {
                    return x$3.findFirstIn((CharSequence)x1$1.getPath().toString()).isDefined();
                }

                public Com_coxautodata_utils_FileListUtils$FileLister$1(ConcurrentHashMap threadsWorking$1, FileSystem fs$1, LinkedBlockingDeque toProcess$1, List filterNot$1, LinkedBlockingQueue processed$1, ConcurrentLinkedQueue exceptions$1) {
                    this.threadsWorking$1 = threadsWorking$1;
                    this.toProcess$1 = toProcess$1;
                    this.filterNot$1 = filterNot$1;
                    this.processed$1 = processed$1;
                    this.exceptions$1 = exceptions$1;
                    this.localFS = FileSystem.get((URI)fs$1.getUri(), (Configuration)fs$1.getConf());
                    this.uuid = UUID.randomUUID();
                    threadsWorking$1.put(this.uuid(), BoxesRunTime.boxToBoolean((boolean)true));
                }

                private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                    return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$run$1(com.coxautodata.utils.FileListUtils$FileLister$1 ), $anonfun$run$2(com.coxautodata.utils.FileListUtils$FileLister$1 scala.Tuple2 ), $anonfun$run$3(com.coxautodata.utils.FileListUtils$FileLister$1 scala.Tuple2 org.apache.hadoop.fs.LocatedFileStatus ), $anonfun$run$4$adapted(org.apache.hadoop.fs.LocatedFileStatus scala.util.matching.Regex ), $anonfun$run$5$adapted(org.apache.hadoop.fs.LocatedFileStatus scala.util.matching.Regex ), $anonfun$run$6(scala.Tuple2 )}, serializedLambda);
                }
            }
            return new Com_coxautodata_utils_FileListUtils$FileLister$1(threadsWorking, fs, toProcess, filterNot, processed, exceptions);
        })).map((Function1 & Serializable)x$1 -> pool.submit((Runnable)x$1)).map((Function1 & Serializable)j -> Future$.MODULE$.apply((Function0)(JFunction0.mcV.sp & Serializable)() -> j.get(), (ExecutionContext)ExecutionContext$.MODULE$.global()));
        Await$.MODULE$.result((Awaitable)Future$.MODULE$.sequence((IterableOnce)tasks, BuildFrom$.MODULE$.buildFromIterableOps(), ExecutionContext.Implicits$.MODULE$.global()), (Duration)Duration$.MODULE$.Inf());
        pool.shutdown();
        if (!toProcess.isEmpty()) {
            throw new RuntimeException("Exception listing files, toProcess queue was not empty");
        }
        if (!exceptions.isEmpty()) {
            List collectedExceptions = CollectionConverters$.MODULE$.IteratorHasAsScala(exceptions.iterator()).asScala().toList();
            collectedExceptions.foreach((Function1 & Serializable)e -> {
                FileListUtils$.MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "Exception during file listing", e);
                return BoxedUnit.UNIT;
            });
            throw (Throwable)collectedExceptions.head();
        }
        this.logInfo((Function0<String>)(Function0 & Serializable)() -> "Finished recursive list of [" + path + "]");
        return CollectionConverters$.MODULE$.IteratorHasAsScala(processed.iterator()).asScala().toSeq();
    }

    public RDD<Tuple2<URI, CopyDefinitionWithDependencies>> getSourceFiles(SparkContext sparkContext, scala.collection.immutable.Seq<URI> sourceURIs, URI destinationURI, boolean updateOverwritePathBehaviour, int numListstatusThreads, List<Regex> filterNot) {
        RDD sourceRDD = ((RDD)((IterableOnceOps)sourceURIs.map((Function1 & Serializable)sourceURI -> {
            FileSystem sourceFS = new Path(sourceURI).getFileSystem(sparkContext.hadoopConfiguration());
            return sparkContext.parallelize(MODULE$.listFiles(sourceFS, new Path(sourceURI), numListstatusThreads, !updateOverwritePathBehaviour, filterNot), sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).map((Function1 & Serializable)x0$1 -> {
                Tuple2 tuple2 = x0$1;
                if (tuple2 != null) {
                    SerializableFileStatus f = (SerializableFileStatus)tuple2._1();
                    scala.collection.immutable.Seq d = (scala.collection.immutable.Seq)tuple2._2();
                    scala.collection.immutable.Seq dependentFolders = (scala.collection.immutable.Seq)d.map((Function1 & Serializable)dl -> {
                        URI udl = PathUtils$.MODULE$.sourceURIToDestinationURI(dl.uri(), (URI)sourceURI, destinationURI, updateOverwritePathBehaviour);
                        return new SingleCopyDefinition((SerializableFileStatus)dl, udl);
                    });
                    URI fu = PathUtils$.MODULE$.sourceURIToDestinationURI(f.uri(), (URI)sourceURI, destinationURI, updateOverwritePathBehaviour);
                    return new CopyDefinitionWithDependencies(f, fu, (scala.collection.immutable.Seq<SingleCopyDefinition>)dependentFolders);
                }
                throw new MatchError((Object)tuple2);
            }, ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class));
        })).reduce((Function2 & Serializable)(x$4, x$5) -> x$4.union(x$5))).map((Function1 & Serializable)x$6 -> x$6.toKeyedDefinition(), ClassTag$.MODULE$.apply(Tuple2.class));
        this.handleSourceCollisions((RDD<Tuple2<URI, CopyDefinitionWithDependencies>>)sourceRDD);
        this.handleDestCollisions((RDD<Tuple2<URI, CopyDefinitionWithDependencies>>)sourceRDD);
        return sourceRDD;
    }

    public RDD<Tuple2<URI, SerializableFileStatus>> getDestinationFiles(SparkContext sparkContext, Path destinationPath, SparkDistCPOptions options) {
        FileSystem destinationFS = destinationPath.getFileSystem(sparkContext.hadoopConfiguration());
        return sparkContext.parallelize(this.listFiles(destinationFS, destinationPath, options.numListstatusThreads(), false, (List<Regex>)package$.MODULE$.List().empty()), sparkContext.parallelize$default$2(), ClassTag$.MODULE$.apply(Tuple2.class)).map((Function1 & Serializable)x0$1 -> {
            Tuple2 tuple2 = x0$1;
            if (tuple2 != null) {
                SerializableFileStatus f = (SerializableFileStatus)tuple2._1();
                return new Tuple2((Object)f.getPath().toUri(), (Object)f);
            }
            throw new MatchError((Object)tuple2);
        }, ClassTag$.MODULE$.apply(Tuple2.class));
    }

    public void handleSourceCollisions(RDD<Tuple2<URI, CopyDefinitionWithDependencies>> source) {
        RDD collisions = RDD$.MODULE$.rddToPairRDDFunctions(source, ClassTag$.MODULE$.apply(URI.class), ClassTag$.MODULE$.apply(CopyDefinitionWithDependencies.class), Ordering$.MODULE$.ordered(Predef$.MODULE$.$conforms())).groupByKey().filter((Function1 & Serializable)x$7 -> BoxesRunTime.boxToBoolean((boolean)FileListUtils$.$anonfun$handleSourceCollisions$1(x$7)));
        collisions.foreach((Function1 & Serializable)x0$1 -> {
            FileListUtils$.$anonfun$handleSourceCollisions$2(x0$1);
            return BoxedUnit.UNIT;
        });
        if (!collisions.isEmpty()) {
            throw new RuntimeException("Collisions found where multiple source files lead to the same destination location; check executor logs for specific collision detail.");
        }
    }

    public void handleDestCollisions(RDD<Tuple2<URI, CopyDefinitionWithDependencies>> source) {
        RDD collisions = source.collect((PartialFunction)new Serializable(){
            private static final long serialVersionUID = 0L;

            public final <A1 extends Tuple2<URI, CopyDefinitionWithDependencies>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                CopyDefinitionWithDependencies copyDefinitionWithDependencies;
                A1 A1 = x1;
                if (A1 != null && (copyDefinitionWithDependencies = (CopyDefinitionWithDependencies)A1._2()) != null) {
                    SerializableFileStatus s = copyDefinitionWithDependencies.source();
                    URI d = copyDefinitionWithDependencies.destination();
                    URI uRI = s.uri();
                    URI uRI2 = d;
                    if (!(uRI != null ? !((Object)uRI).equals(uRI2) : uRI2 != null)) {
                        return (B1)d;
                    }
                }
                return (B1)function1.apply(x1);
            }

            public final boolean isDefinedAt(Tuple2<URI, CopyDefinitionWithDependencies> x1) {
                CopyDefinitionWithDependencies copyDefinitionWithDependencies;
                Tuple2<URI, CopyDefinitionWithDependencies> tuple2 = x1;
                if (tuple2 != null && (copyDefinitionWithDependencies = (CopyDefinitionWithDependencies)tuple2._2()) != null) {
                    SerializableFileStatus s = copyDefinitionWithDependencies.source();
                    URI d = copyDefinitionWithDependencies.destination();
                    URI uRI = s.uri();
                    URI uRI2 = d;
                    if (!(uRI != null ? !((Object)uRI).equals(uRI2) : uRI2 != null)) {
                        return true;
                    }
                }
                return false;
            }
        }, ClassTag$.MODULE$.apply(URI.class));
        collisions.foreach((Function1 & Serializable)d -> {
            FileListUtils$.MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "The following file has the same source and destination location: [" + d + "]");
            return BoxedUnit.UNIT;
        });
        if (!collisions.isEmpty()) {
            throw new RuntimeException("Collisions found where a file has the same source and destination location; check executor logs for specific collision detail.");
        }
    }

    public static final /* synthetic */ boolean $anonfun$handleSourceCollisions$1(Tuple2 x$7) {
        return ((IterableOnceOps)x$7._2()).size() > 1;
    }

    public static final /* synthetic */ void $anonfun$handleSourceCollisions$2(Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 != null) {
            URI f = (URI)tuple2._1();
            Iterable l = (Iterable)tuple2._2();
            MODULE$.logError((Function0<String>)(Function0 & Serializable)() -> "The following files will collide on destination file [" + f + "]: " + ((IterableOnceOps)l.map((Function1 & Serializable)x$8 -> x$8.source().getPath())).mkString(", "));
            return;
        }
        throw new MatchError((Object)tuple2);
    }

    private FileListUtils$() {
    }
}

