/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.commit;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobCommitEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterJobSetupEvent;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterTaskAbortEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobAbortCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobSetupFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;

public class CommitterEventHandler
extends AbstractService
implements EventHandler<CommitterEvent> {
    private static final Log LOG = LogFactory.getLog(CommitterEventHandler.class);
    private final AppContext context;
    private final OutputCommitter committer;
    private ThreadPoolExecutor launcherPool;
    private Thread eventHandlingThread;
    private BlockingQueue<CommitterEvent> eventQueue = new LinkedBlockingQueue<CommitterEvent>();
    private final AtomicBoolean stopped;
    private Thread jobCommitThread = null;
    private int commitThreadCancelTimeoutMs;

    public CommitterEventHandler(AppContext context, OutputCommitter committer) {
        super("CommitterEventHandler");
        this.context = context;
        this.committer = committer;
        this.stopped = new AtomicBoolean(false);
    }

    public void init(Configuration conf) {
        super.init(conf);
        this.commitThreadCancelTimeoutMs = conf.getInt("yarn.app.mapreduce.am.job.committer.cancel-timeout", 60000);
    }

    public void start() {
        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("CommitterEvent Processor #%d").build();
        this.launcherPool = new ThreadPoolExecutor(5, 5, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
        this.eventHandlingThread = new Thread(new Runnable(){

            @Override
            public void run() {
                CommitterEvent event = null;
                while (!CommitterEventHandler.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    try {
                        event = (CommitterEvent)((Object)CommitterEventHandler.this.eventQueue.take());
                    }
                    catch (InterruptedException e) {
                        if (!CommitterEventHandler.this.stopped.get()) {
                            LOG.error((Object)("Returning, interrupted : " + e));
                        }
                        return;
                    }
                    CommitterEventHandler.this.launcherPool.execute(new EventProcessor(event));
                }
            }
        });
        this.eventHandlingThread.setName("CommitterEvent Handler");
        this.eventHandlingThread.start();
        super.start();
    }

    public void handle(CommitterEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnException((Throwable)e);
        }
    }

    public void stop() {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        this.eventHandlingThread.interrupt();
        this.launcherPool.shutdown();
        super.stop();
    }

    private synchronized void jobCommitStarted() throws IOException {
        if (this.jobCommitThread != null) {
            throw new IOException("Commit while another commit thread active: " + this.jobCommitThread.toString());
        }
        this.jobCommitThread = Thread.currentThread();
    }

    private synchronized void jobCommitEnded() {
        if (this.jobCommitThread == Thread.currentThread()) {
            this.jobCommitThread = null;
            ((Object)((Object)this)).notifyAll();
        }
    }

    private synchronized void cancelJobCommit() {
        Thread threadCommitting = this.jobCommitThread;
        if (threadCommitting != null && threadCommitting.isAlive()) {
            LOG.info((Object)"Canceling commit");
            threadCommitting.interrupt();
            long now = this.context.getClock().getTime();
            long timeoutTimestamp = now + (long)this.commitThreadCancelTimeoutMs;
            try {
                while (this.jobCommitThread == threadCommitting && now > timeoutTimestamp) {
                    ((Object)((Object)this)).wait(now - timeoutTimestamp);
                    now = this.context.getClock().getTime();
                }
            }
            catch (InterruptedException e) {
                // empty catch block
            }
        }
    }

    private class EventProcessor
    implements Runnable {
        private CommitterEvent event;

        EventProcessor(CommitterEvent event) {
            this.event = event;
        }

        @Override
        public void run() {
            LOG.info((Object)("Processing the event " + this.event.toString()));
            switch ((CommitterEventType)this.event.getType()) {
                case JOB_SETUP: {
                    this.handleJobSetup((CommitterJobSetupEvent)this.event);
                    break;
                }
                case JOB_COMMIT: {
                    this.handleJobCommit((CommitterJobCommitEvent)this.event);
                    break;
                }
                case JOB_ABORT: {
                    this.handleJobAbort((CommitterJobAbortEvent)this.event);
                    break;
                }
                case TASK_ABORT: {
                    this.handleTaskAbort((CommitterTaskAbortEvent)this.event);
                    break;
                }
                default: {
                    throw new YarnException("Unexpected committer event " + this.event.toString());
                }
            }
        }

        protected void handleJobSetup(CommitterJobSetupEvent event) {
            try {
                CommitterEventHandler.this.committer.setupJob(event.getJobContext());
                CommitterEventHandler.this.context.getEventHandler().handle((Event)new JobSetupCompletedEvent(event.getJobID()));
            }
            catch (Exception e) {
                LOG.warn((Object)"Job setup failed", (Throwable)e);
                CommitterEventHandler.this.context.getEventHandler().handle((Event)new JobSetupFailedEvent(event.getJobID(), StringUtils.stringifyException((Throwable)e)));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void handleJobCommit(CommitterJobCommitEvent event) {
            try {
                CommitterEventHandler.this.jobCommitStarted();
                CommitterEventHandler.this.committer.commitJob(event.getJobContext());
                CommitterEventHandler.this.context.getEventHandler().handle((Event)new JobCommitCompletedEvent(event.getJobID()));
            }
            catch (Exception e) {
                LOG.error((Object)"Could not commit job", (Throwable)e);
                CommitterEventHandler.this.context.getEventHandler().handle((Event)new JobCommitFailedEvent(event.getJobID(), StringUtils.stringifyException((Throwable)e)));
            }
            finally {
                CommitterEventHandler.this.jobCommitEnded();
            }
        }

        protected void handleJobAbort(CommitterJobAbortEvent event) {
            CommitterEventHandler.this.cancelJobCommit();
            try {
                CommitterEventHandler.this.committer.abortJob(event.getJobContext(), event.getFinalState());
            }
            catch (Exception e) {
                LOG.warn((Object)"Could not abort job", (Throwable)e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle((Event)new JobAbortCompletedEvent(event.getJobID(), event.getFinalState()));
        }

        protected void handleTaskAbort(CommitterTaskAbortEvent event) {
            try {
                CommitterEventHandler.this.committer.abortTask(event.getAttemptContext());
            }
            catch (Exception e) {
                LOG.warn((Object)("Task cleanup failed for attempt " + event.getAttemptID()), (Throwable)e);
            }
            CommitterEventHandler.this.context.getEventHandler().handle((Event)new TaskAttemptEvent(event.getAttemptID(), TaskAttemptEventType.TA_CLEANUP_DONE));
        }
    }
}

