/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.client.loadbalancer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jppf.JPPFException;
import org.jppf.client.AbstractJPPFClientConnection;
import org.jppf.client.JPPFJob;
import org.jppf.client.event.TaskResultEvent;
import org.jppf.client.event.TaskResultListener;
import org.jppf.client.loadbalancer.ClientProportionalBundler;
import org.jppf.client.loadbalancer.TaskWrapper;
import org.jppf.server.protocol.JPPFTask;
import org.jppf.server.protocol.JPPFTaskBundle;
import org.jppf.server.scheduler.bundle.Bundler;
import org.jppf.server.scheduler.bundle.LoadBalancingProfile;
import org.jppf.server.scheduler.bundle.proportional.ProportionalTuneProfile;
import org.jppf.utils.CollectionUtils;
import org.jppf.utils.JPPFConfiguration;
import org.jppf.utils.JPPFThreadFactory;
import org.jppf.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalancer {
    private static Logger log = LoggerFactory.getLogger(LoadBalancer.class);
    private boolean debugEnabled = log.isDebugEnabled();
    private static final int LOCAL = 0;
    private static final int REMOTE = 1;
    private boolean localEnabled = JPPFConfiguration.getProperties().getBoolean("jppf.local.execution.enabled", false);
    private ExecutorService threadPool = null;
    private Bundler[] bundlers = null;
    private AtomicBoolean locallyExecuting = new AtomicBoolean(false);

    public LoadBalancer() {
        if (this.localEnabled) {
            int n = Runtime.getRuntime().availableProcessors();
            int poolSize = JPPFConfiguration.getProperties().getInt("jppf.local.execution.threads", n);
            log.info("local execution enabled with " + poolSize + " processing threads");
            LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
            this.threadPool = new ThreadPoolExecutor(poolSize, poolSize, Long.MAX_VALUE, TimeUnit.MICROSECONDS, queue, (ThreadFactory)new JPPFThreadFactory("client processing thread"));
            ProportionalTuneProfile profile = new ProportionalTuneProfile();
            profile.setPerformanceCacheSize(2000);
            profile.setProportionalityFactor(4);
            this.bundlers = new ClientProportionalBundler[2];
            this.bundlers[0] = new ClientProportionalBundler((LoadBalancingProfile)profile);
            this.bundlers[1] = new ClientProportionalBundler((LoadBalancingProfile)profile);
            Bundler[] bundlerArray = this.bundlers;
            int n2 = this.bundlers.length;
            int n3 = 0;
            while (n3 < n2) {
                Bundler b = bundlerArray[n3];
                b.setup();
                ++n3;
            }
        }
    }

    public void stop() {
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void execute(JPPFJob job, AbstractJPPFClientConnection connection) throws Exception {
        boolean count = false;
        List<JPPFTask> tasks = job.getTasks();
        if (this.localEnabled && this.locallyExecuting.compareAndSet(false, true)) {
            try {
                if (connection != null) {
                    int[] bundleSize = new int[2];
                    Bundler[] bundlerArray = this.bundlers;
                    synchronized (this.bundlers) {
                        int diff;
                        int i = 0;
                        while (i <= 1) {
                            ((ClientProportionalBundler)this.bundlers[i]).setMaxSize(tasks.size());
                            bundleSize[i] = this.bundlers[i].getBundleSize();
                            ++i;
                        }
                        // ** MonitorExit[var6_8] (shouldn't be in output)
                        if (bundleSize[0] > tasks.size()) {
                            bundleSize[0] = tasks.size() - 1;
                        }
                        if (this.sum(bundleSize) > tasks.size()) {
                            bundleSize[1] = tasks.size() - bundleSize[0];
                        }
                        if ((diff = tasks.size() - this.sum(bundleSize)) > 0) {
                            i = 0;
                            while (i <= 1) {
                                int n = i++;
                                bundleSize[n] = bundleSize[n] + diff / 2;
                            }
                            if (tasks.size() > this.sum(bundleSize)) {
                                bundleSize[0] = bundleSize[0] + 1;
                            }
                        }
                        if (this.debugEnabled) {
                            log.debug("bundlers[local=" + bundleSize[0] + ", remote=" + bundleSize[1] + "]");
                        }
                        ArrayList<List> list = new ArrayList<List>();
                        int idx = 0;
                        int i2 = 0;
                        while (i2 <= 1) {
                            list.add(CollectionUtils.getAllElements(tasks, (int)idx, (int)bundleSize[i2]));
                            idx += bundleSize[i2];
                            ++i2;
                        }
                        ExecutionThread[] threads = new ExecutionThread[]{new LocalExecutionThread((List)list.get(0), job), new RemoteExecutionThread((List)list.get(1), job, connection)};
                        int i3 = 0;
                        while (i3 <= 1) {
                            threads[i3].setContextClassLoader(Thread.currentThread().getContextClassLoader());
                            ++i3;
                        }
                        i3 = 0;
                        while (i3 <= 1) {
                            threads[i3].start();
                            ++i3;
                        }
                        if (!job.isBlocking()) return;
                        i3 = 0;
                        while (i3 <= 1) {
                            threads[i3].join();
                            ++i3;
                        }
                        i3 = 0;
                        while (i3 <= 1) {
                            if (threads[i3].getException() != null) {
                                throw threads[i3].getException();
                            }
                            ++i3;
                        }
                        return;
                    }
                }
                LocalExecutionThread localThread = new LocalExecutionThread(tasks, job);
                if (!job.isBlocking()) {
                    localThread.setContextClassLoader(Thread.currentThread().getContextClassLoader());
                    localThread.start();
                    return;
                }
                ((ExecutionThread)localThread).run();
                if (localThread.getException() == null) return;
                throw localThread.getException();
            }
            finally {
                this.locallyExecuting.set(false);
            }
        }
        if (connection == null) throw new JPPFException("Null driver connection and local executor is " + (this.localEnabled ? "busy" : "disabled"));
        RemoteExecutionThread remoteThread = new RemoteExecutionThread(tasks, job, connection);
        ((ExecutionThread)remoteThread).run();
        if (remoteThread.getException() == null) return;
        throw remoteThread.getException();
    }

    private int sum(int[] array) {
        int sum = 0;
        int i = 0;
        while (i < array.length) {
            sum += array[i];
            ++i;
        }
        return sum;
    }

    public boolean isLocalEnabled() {
        return this.localEnabled;
    }

    public boolean isLocallyExecuting() {
        return this.locallyExecuting.get();
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public abstract class ExecutionThread
    extends Thread {
        protected List<JPPFTask> tasks = null;
        protected Exception exception = null;
        protected JPPFJob job = null;

        public ExecutionThread(List<JPPFTask> tasks, JPPFJob job) {
            this.tasks = tasks;
            this.job = job;
        }

        @Override
        public abstract void run();

        public Exception getException() {
            return this.exception;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public class LocalExecutionThread
    extends ExecutionThread {
        public LocalExecutionThread(List<JPPFTask> tasks, JPPFJob job) {
            super(tasks, job);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                long start = System.currentTimeMillis();
                ArrayList futures = new ArrayList();
                for (JPPFTask jPPFTask : this.tasks) {
                    jPPFTask.setDataProvider(this.job.getDataProvider());
                    futures.add(LoadBalancer.this.threadPool.submit(new TaskWrapper(jPPFTask)));
                }
                for (Future future : futures) {
                    future.get();
                }
                int n = futures.size();
                if (LoadBalancer.this.debugEnabled) {
                    log.debug("received " + n + " tasks from local executor" + (n > 0 ? ", first position=" + ((JPPFTask)this.tasks.get(0)).getPosition() : ""));
                }
                if (this.job.getResultListener() != null) {
                    TaskResultListener taskResultListener = this.job.getResultListener();
                    synchronized (taskResultListener) {
                        this.job.getResultListener().resultsReceived(new TaskResultEvent(this.tasks));
                    }
                }
                double elapsed = System.currentTimeMillis() - start;
                LoadBalancer.this.bundlers[0].feedback(this.tasks.size(), elapsed);
            }
            catch (Exception e) {
                if (LoadBalancer.this.debugEnabled) {
                    log.debug(e.getMessage(), (Throwable)e);
                }
                this.exception = e;
            }
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public class RemoteExecutionThread
    extends ExecutionThread {
        private AbstractJPPFClientConnection connection;

        public RemoteExecutionThread(List<JPPFTask> tasks, JPPFJob job, AbstractJPPFClientConnection connection) {
            super(tasks, job);
            this.connection = null;
            this.connection = connection;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                long start = System.currentTimeMillis();
                int count = 0;
                boolean completed = false;
                JPPFJob newJob = this.createNewJob(this.job);
                for (JPPFTask task : this.tasks) {
                    int pos = task.getPosition();
                    newJob.addTask(task, new Object[0]);
                    task.setPosition(pos);
                }
                while (!completed) {
                    JPPFTaskBundle bundle = this.createBundle(newJob);
                    this.connection.sendTasks(bundle, newJob);
                    ClassLoader cl = this.connection.getClient().getRequestClassLoader(bundle.getRequestUuid());
                    while (count < this.tasks.size()) {
                        Pair<List<JPPFTask>, Integer> p = this.connection.receiveResults(cl);
                        int n = ((List)p.first()).size();
                        count += n;
                        if (LoadBalancer.this.debugEnabled) {
                            log.debug("received " + n + " tasks from server" + (n > 0 ? ", first position=" + ((JPPFTask)((List)p.first()).get(0)).getPosition() : ""));
                        }
                        if (this.job.getResultListener() == null) continue;
                        TaskResultListener taskResultListener = newJob.getResultListener();
                        synchronized (taskResultListener) {
                            newJob.getResultListener().resultsReceived(new TaskResultEvent((List)p.first()));
                        }
                    }
                    completed = true;
                }
                if (LoadBalancer.this.localEnabled) {
                    double elapsed = System.currentTimeMillis() - start;
                    LoadBalancer.this.bundlers[1].feedback(this.tasks.size(), elapsed);
                }
            }
            catch (Exception e) {
                if (LoadBalancer.this.debugEnabled) {
                    log.debug(e.getMessage(), (Throwable)e);
                }
                this.exception = e;
            }
        }

        private JPPFJob createNewJob(JPPFJob job) {
            JPPFJob newJob = new JPPFJob(job.getJobUuid());
            newJob.setDataProvider(job.getDataProvider());
            newJob.setJobSLA(job.getJobSLA());
            newJob.setJobMetadata(job.getJobMetadata());
            newJob.setBlocking(job.isBlocking());
            newJob.setResultListener(job.getResultListener());
            newJob.setId(job.getId());
            return newJob;
        }

        private JPPFTaskBundle createBundle(JPPFJob job) {
            String requestUuid = job.getJobUuid();
            JPPFTaskBundle bundle = new JPPFTaskBundle();
            bundle.setRequestUuid(requestUuid);
            ClassLoader cl = null;
            Object oldCl = null;
            if (!job.getTasks().isEmpty()) {
                JPPFTask task = job.getTasks().get(0);
                cl = task.getClass().getClassLoader();
                this.connection.getClient().addRequestClassLoader(requestUuid, cl);
                if (log.isDebugEnabled()) {
                    log.debug("adding request class loader=" + cl + " for uuid=" + requestUuid);
                }
            }
            return bundle;
        }
    }
}

