/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.client.concurrent;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jppf.JPPFException;
import org.jppf.client.JPPFJob;
import org.jppf.client.concurrent.FutureResultCollector;
import org.jppf.client.concurrent.JPPFExecutorService;
import org.jppf.client.concurrent.JPPFTaskFuture;
import org.jppf.server.protocol.JPPFTask;
import org.jppf.utils.Pair;
import org.jppf.utils.ThreadSynchronization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class BatchHandler
extends ThreadSynchronization
implements Runnable {
    private static Logger log = LoggerFactory.getLogger(BatchHandler.class);
    private static boolean debugEnabled = log.isDebugEnabled();
    private static AtomicLong jobCount = new AtomicLong(0L);
    private int batchSize = 0;
    private long batchTimeout = 0L;
    private JPPFExecutorService executor = null;
    private AtomicReference<JPPFJob> currentJobRef = new AtomicReference<Object>(null);
    private AtomicReference<JPPFJob> nextJobRef = new AtomicReference<Object>(null);
    private long start = 0L;
    private long elapsed = 0L;
    private ReentrantLock lock = new ReentrantLock();
    private Condition jobReady = this.lock.newCondition();

    BatchHandler(JPPFExecutorService executor) {
        this.executor = executor;
        this.nextJobRef.set(this.createJob());
    }

    synchronized int getBatchSize() {
        return this.batchSize;
    }

    synchronized void setBatchSize(int batchSize) {
        this.batchSize = batchSize;
    }

    synchronized long getBatchTimeout() {
        return this.batchTimeout;
    }

    synchronized void setBatchTimeout(long batchTimeout) {
        this.batchTimeout = batchTimeout;
    }

    @Override
    public void run() {
        this.start = System.currentTimeMillis();
        while (!this.isStopped()) {
            try {
                this.lock.lock();
                try {
                    while (!this.isStopped() && this.currentJobRef.get() == null) {
                        if (this.batchTimeout > 0L) {
                            long n = this.batchTimeout - this.elapsed;
                            if (n > 0L) {
                                this.jobReady.await(n, TimeUnit.MILLISECONDS);
                            }
                        } else {
                            this.jobReady.await();
                        }
                        this.updateNextJob(false);
                    }
                    if (this.isStopped()) break;
                    JPPFJob job = this.currentJobRef.get();
                    if (debugEnabled) {
                        log.debug("submitting job " + job.getId() + " with " + job.getTasks().size() + " tasks");
                    }
                    FutureResultCollector collector = (FutureResultCollector)job.getResultListener();
                    collector.setTaskCount(job.getTasks().size());
                    this.executor.submitJob(job);
                    this.currentJobRef.set(null);
                    this.elapsed = System.currentTimeMillis() - this.start;
                }
                finally {
                    this.lock.unlock();
                }
            }
            catch (Exception e) {
                log.error(e.getMessage(), (Throwable)e);
            }
        }
    }

    private void updateNextJob(boolean sendSignal) {
        this.lock.lock();
        try {
            JPPFJob job = this.nextJobRef.get();
            int size = 0;
            size = job.getTasks() == null ? 0 : job.getTasks().size();
            if (this.batchTimeout > 0L) {
                this.elapsed = System.currentTimeMillis() - this.start;
            }
            if (size == 0) {
                if (this.batchTimeout > 0L && this.elapsed >= this.batchTimeout) {
                    this.start = System.currentTimeMillis();
                    this.elapsed = 0L;
                }
                return;
            }
            if (this.batchTimeout > 0L && this.elapsed >= this.batchTimeout || this.batchSize > 0 && size >= this.batchSize || this.batchSize <= 0 && this.batchTimeout <= 0L) {
                this.currentJobRef.set(job);
                this.nextJobRef.set(this.createJob());
                this.start = System.currentTimeMillis();
                this.elapsed = 0L;
                if (sendSignal) {
                    this.jobReady.signal();
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    Future<Object> addTask(JPPFTask task) {
        if (debugEnabled) {
            log.debug("submitting one JPPFTask");
        }
        JPPFTaskFuture future = null;
        JPPFJob job = this.nextJobRef.get();
        try {
            FutureResultCollector collector = (FutureResultCollector)job.getResultListener();
            job.addTask(task, new Object[0]);
            future = new JPPFTaskFuture(collector, task.getPosition());
        }
        catch (JPPFException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        this.updateNextJob(true);
        return future;
    }

    <T> Future<T> addTask(Callable<T> task) {
        if (debugEnabled) {
            log.debug("submitting one Callable Task");
        }
        JPPFTaskFuture future = null;
        JPPFJob job = this.nextJobRef.get();
        try {
            FutureResultCollector collector = (FutureResultCollector)job.getResultListener();
            JPPFTask jppfTask = job.addTask(task, new Object[0]);
            future = new JPPFTaskFuture(collector, jppfTask.getPosition());
        }
        catch (JPPFException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        this.updateNextJob(true);
        return future;
    }

    <T> Pair<FutureResultCollector, Integer> addTasks(Collection<? extends Callable<T>> tasks) {
        if (debugEnabled) {
            log.debug("submitting " + tasks.size() + " Callable Tasks");
        }
        Pair pair = null;
        JPPFJob job = this.nextJobRef.get();
        FutureResultCollector collector = (FutureResultCollector)job.getResultListener();
        int start = 0;
        try {
            List<JPPFTask> jobTasks = job.getTasks();
            start = jobTasks == null ? 0 : jobTasks.size();
            for (Callable<T> task : tasks) {
                job.addTask(task, new Object[0]);
            }
        }
        catch (JPPFException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        pair = new Pair((Object)collector, (Object)start);
        this.updateNextJob(true);
        return pair;
    }

    private JPPFJob createJob() {
        JPPFJob job = new JPPFJob();
        job.setId(String.valueOf(this.getClass().getSimpleName()) + " job " + jobCount.incrementAndGet());
        FutureResultCollector collector = new FutureResultCollector(0, job.getJobUuid());
        job.setResultListener(collector);
        job.setBlocking(false);
        collector.addListener(this.executor);
        return job;
    }

    void close() {
        this.setStopped(true);
        this.lock.lock();
        try {
            this.jobReady.signalAll();
        }
        finally {
            this.lock.unlock();
        }
    }
}

