/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client.api.async.impl;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.sparkproject.guava.annotations.VisibleForTesting;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AMRMClientAsyncImpl<T extends AMRMClient.ContainerRequest>
extends AMRMClientAsync<T> {
    private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
    private final HeartbeatThread heartbeatThread;
    private final CallbackHandlerThread handlerThread;
    private final BlockingQueue<AllocateResponse> responseQueue;
    private final Object unregisterHeartbeatLock = new Object();
    private volatile boolean keepRunning = true;
    private volatile float progress;
    private volatile Throwable savedException = null;

    public AMRMClientAsyncImpl(int intervalMs, AMRMClientAsync.CallbackHandler callbackHandler) {
        this(new AMRMClientImpl(), intervalMs, callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs, AMRMClientAsync.CallbackHandler callbackHandler) {
        super(client, intervalMs, callbackHandler);
        this.heartbeatThread = new HeartbeatThread();
        this.handlerThread = new CallbackHandlerThread();
        this.responseQueue = new LinkedBlockingQueue<AllocateResponse>();
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.client.init(conf);
    }

    protected void serviceStart() throws Exception {
        this.handlerThread.setDaemon(true);
        this.handlerThread.start();
        this.client.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        this.keepRunning = false;
        this.heartbeatThread.interrupt();
        try {
            this.heartbeatThread.join();
        }
        catch (InterruptedException ex) {
            LOG.error((Object)"Error joining with heartbeat thread", (Throwable)ex);
        }
        this.client.stop();
        this.handlerThread.interrupt();
        super.serviceStop();
    }

    @Override
    public void setHeartbeatInterval(int interval) {
        this.heartbeatIntervalMs.set(interval);
    }

    @Override
    public List<? extends Collection<T>> getMatchingRequests(Priority priority, String resourceName, Resource capability) {
        return this.client.getMatchingRequests(priority, resourceName, capability);
    }

    @Override
    public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException {
        RegisterApplicationMasterResponse response = this.client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
        this.heartbeatThread.start();
        return response;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) throws YarnException, IOException {
        Object object = this.unregisterHeartbeatLock;
        synchronized (object) {
            this.keepRunning = false;
            this.client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
        }
    }

    @Override
    public void addContainerRequest(T req) {
        this.client.addContainerRequest(req);
    }

    @Override
    public void removeContainerRequest(T req) {
        this.client.removeContainerRequest(req);
    }

    @Override
    public void releaseAssignedContainer(ContainerId containerId) {
        this.client.releaseAssignedContainer(containerId);
    }

    @Override
    public Resource getAvailableResources() {
        return this.client.getAvailableResources();
    }

    @Override
    public int getClusterNodeCount() {
        return this.client.getClusterNodeCount();
    }

    private class CallbackHandlerThread
    extends Thread {
        public CallbackHandlerThread() {
            super("AMRM Callback Handler Thread");
        }

        @Override
        public void run() {
            while (AMRMClientAsyncImpl.this.keepRunning) {
                try {
                    List<Container> allocated;
                    List<ContainerStatus> completed;
                    AllocateResponse response;
                    if (AMRMClientAsyncImpl.this.savedException != null) {
                        LOG.error((Object)"Stopping callback due to: ", AMRMClientAsyncImpl.this.savedException);
                        AMRMClientAsyncImpl.this.handler.onError(AMRMClientAsyncImpl.this.savedException);
                        return;
                    }
                    try {
                        response = (AllocateResponse)AMRMClientAsyncImpl.this.responseQueue.take();
                    }
                    catch (InterruptedException ex) {
                        LOG.info((Object)"Interrupted while waiting for queue", (Throwable)ex);
                        continue;
                    }
                    List<NodeReport> updatedNodes = response.getUpdatedNodes();
                    if (!updatedNodes.isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onNodesUpdated(updatedNodes);
                    }
                    if (!(completed = response.getCompletedContainersStatuses()).isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onContainersCompleted(completed);
                    }
                    if (!(allocated = response.getAllocatedContainers()).isEmpty()) {
                        AMRMClientAsyncImpl.this.handler.onContainersAllocated(allocated);
                    }
                    AMRMClientAsyncImpl.this.progress = AMRMClientAsyncImpl.this.handler.getProgress();
                }
                catch (Throwable ex) {
                    AMRMClientAsyncImpl.this.handler.onError(ex);
                    throw new YarnRuntimeException(ex);
                }
            }
            return;
        }
    }

    private class HeartbeatThread
    extends Thread {
        public HeartbeatThread() {
            super("AMRM Heartbeater thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                AllocateResponse response = null;
                Object object = AMRMClientAsyncImpl.this.unregisterHeartbeatLock;
                synchronized (object) {
                    if (!AMRMClientAsyncImpl.this.keepRunning) {
                        return;
                    }
                    try {
                        response = AMRMClientAsyncImpl.this.client.allocate(AMRMClientAsyncImpl.this.progress);
                    }
                    catch (ApplicationAttemptNotFoundException e) {
                        AMRMClientAsyncImpl.this.handler.onShutdownRequest();
                        LOG.info((Object)"Shutdown requested. Stopping callback.");
                        return;
                    }
                    catch (Throwable ex) {
                        LOG.error((Object)"Exception on heartbeat", ex);
                        AMRMClientAsyncImpl.this.savedException = ex;
                        AMRMClientAsyncImpl.this.handlerThread.interrupt();
                        return;
                    }
                    if (response != null) {
                        while (true) {
                            try {
                                AMRMClientAsyncImpl.this.responseQueue.put(response);
                            }
                            catch (InterruptedException ex) {
                                LOG.debug((Object)"Interrupted while waiting to put on response queue", (Throwable)ex);
                                continue;
                            }
                            break;
                        }
                    }
                }
                try {
                    Thread.sleep(AMRMClientAsyncImpl.this.heartbeatIntervalMs.get());
                    continue;
                }
                catch (InterruptedException ex) {
                    LOG.debug((Object)"Heartbeater interrupted", (Throwable)ex);
                    continue;
                }
                break;
            }
        }
    }
}

