package com.netflix.astyanax.recipes.queue;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.netflix.astyanax.recipes.locks.BusyLockException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:WEB-INF/lib/astyanax-queue-2.0.2.jar:com/netflix/astyanax/recipes/queue/MessageQueueDispatcher.class */
public class MessageQueueDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(MessageQueueDispatcher.class);
    public static final int DEFAULT_BATCH_SIZE = 5;
    public static final int DEFAULT_POLLING_INTERVAL = 1000;
    public static final int DEFAULT_THREAD_COUNT = 1;
    public static final int DEFAULT_CONSUMER_COUNT = 1;
    public static final int DEFAULT_ACK_SIZE = 100;
    public static final int DEFAULT_ACK_INTERVAL = 100;
    public static final int DEFAULT_BACKLOG_SIZE = 1000;
    private int processorThreadCount;
    private int batchSize;
    private int consumerCount;
    private int ackSize;
    private long ackInterval;
    private int backlogSize;
    private long pollingInterval;
    private boolean terminate;
    private MessageQueue messageQueue;
    private ExecutorService executor;
    private MessageConsumer ackConsumer;
    private Function<MessageContext, Boolean> callback;
    private MessageHandlerFactory handlerFactory;
    private LinkedBlockingQueue<MessageContext> toAck;
    private LinkedBlockingQueue<MessageContext> toProcess;

    /* loaded from: input_file:WEB-INF/lib/astyanax-queue-2.0.2.jar:com/netflix/astyanax/recipes/queue/MessageQueueDispatcher$Builder.class */
    public static class Builder {
        private final MessageQueueDispatcher dispatcher = new MessageQueueDispatcher();

        public Builder withMessageQueue(MessageQueue messageQueue) {
            this.dispatcher.messageQueue = messageQueue;
            return this;
        }

        public Builder withThreadCount(int i) {
            return withProcessorThreadCount(i);
        }

        public Builder withProcessorThreadCount(int i) {
            this.dispatcher.processorThreadCount = i;
            return this;
        }

        public Builder withBacklogSize(int i) {
            this.dispatcher.backlogSize = i;
            return this;
        }

        public Builder withConsumerCount(int i) {
            this.dispatcher.consumerCount = i;
            return this;
        }

        public Builder withBatchSize(int i) {
            this.dispatcher.batchSize = i;
            return this;
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$602(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        public com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.Builder withAckInterval(long r7, java.util.concurrent.TimeUnit r9) {
            /*
                r6 = this;
                r0 = r6
                com.netflix.astyanax.recipes.queue.MessageQueueDispatcher r0 = r0.dispatcher
                java.util.concurrent.TimeUnit r1 = java.util.concurrent.TimeUnit.MILLISECONDS
                r2 = r7
                r3 = r9
                long r1 = r1.convert(r2, r3)
                long r0 = com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$602(r0, r1)
                r0 = r6
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.Builder.withAckInterval(long, java.util.concurrent.TimeUnit):com.netflix.astyanax.recipes.queue.MessageQueueDispatcher$Builder");
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$702(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        public com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.Builder withPollingInterval(long r7, java.util.concurrent.TimeUnit r9) {
            /*
                r6 = this;
                r0 = r6
                com.netflix.astyanax.recipes.queue.MessageQueueDispatcher r0 = r0.dispatcher
                java.util.concurrent.TimeUnit r1 = java.util.concurrent.TimeUnit.MILLISECONDS
                r2 = r7
                r3 = r9
                long r1 = r1.convert(r2, r3)
                long r0 = com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$702(r0, r1)
                r0 = r6
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.Builder.withPollingInterval(long, java.util.concurrent.TimeUnit):com.netflix.astyanax.recipes.queue.MessageQueueDispatcher$Builder");
        }

        public Builder withCallback(Function<MessageContext, Boolean> function) {
            this.dispatcher.callback = function;
            return this;
        }

        public Builder withMessageHandlerFactory(MessageHandlerFactory messageHandlerFactory) {
            this.dispatcher.handlerFactory = messageHandlerFactory;
            return this;
        }

        public MessageQueueDispatcher build() {
            Preconditions.checkArgument(this.dispatcher.consumerCount <= this.dispatcher.processorThreadCount, "consumerCounter must be <= threadCount");
            this.dispatcher.initialize();
            return this.dispatcher;
        }
    }

    private MessageQueueDispatcher() {
        this.processorThreadCount = 1;
        this.batchSize = 5;
        this.consumerCount = 1;
        this.ackSize = 100;
        this.ackInterval = 100L;
        this.backlogSize = 1000;
        this.pollingInterval = 1000L;
        this.terminate = false;
        this.toAck = Queues.newLinkedBlockingQueue();
        this.toProcess = Queues.newLinkedBlockingQueue(500);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initialize() {
        Preconditions.checkNotNull(this.messageQueue, "Must specify message queue");
        if (this.handlerFactory == null) {
            this.handlerFactory = new SimpleMessageHandlerFactory();
        }
        this.toProcess = Queues.newLinkedBlockingQueue(this.backlogSize);
    }

    public void start() {
        this.executor = Executors.newScheduledThreadPool(this.processorThreadCount + this.consumerCount + 1);
        startAckThread();
        for (int i = 0; i < this.consumerCount; i++) {
            startConsumer(i);
        }
        for (int i2 = 0; i2 < this.processorThreadCount; i2++) {
            startProcessor(i2);
        }
    }

    public void stop() {
        this.terminate = true;
        this.executor.shutdownNow();
    }

    private void startAckThread() {
        this.ackConsumer = this.messageQueue.createConsumer();
        this.executor.submit(new Runnable() { // from class: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName(StringUtils.join(Lists.newArrayList(MessageQueueDispatcher.this.messageQueue.getName(), "Ack"), ":"));
                while (!MessageQueueDispatcher.this.terminate) {
                    try {
                        ArrayList newArrayList = Lists.newArrayList();
                        MessageQueueDispatcher.this.toAck.drainTo(newArrayList);
                        if (!newArrayList.isEmpty()) {
                            try {
                                MessageQueueDispatcher.this.ackConsumer.ackMessages(newArrayList);
                            } catch (MessageQueueException e) {
                                MessageQueueDispatcher.this.toAck.addAll(newArrayList);
                                MessageQueueDispatcher.LOG.warn("Failed to ack consumer", e);
                            }
                        }
                    } catch (Throwable th) {
                        MessageQueueDispatcher.LOG.info("Error acking messages", th);
                    }
                    try {
                        Thread.sleep(MessageQueueDispatcher.this.ackInterval);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
    }

    private void startConsumer(final int i) {
        this.executor.submit(new Runnable() { // from class: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.2
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName(StringUtils.join(Lists.newArrayList(MessageQueueDispatcher.this.messageQueue.getName(), "Consumer", Integer.toString(i)), ":"));
                MessageConsumer createConsumer = MessageQueueDispatcher.this.messageQueue.createConsumer();
                while (!MessageQueueDispatcher.this.terminate) {
                    try {
                        List<MessageContext> readMessages = createConsumer.readMessages(MessageQueueDispatcher.this.batchSize);
                        if (readMessages.isEmpty()) {
                            Thread.sleep(MessageQueueDispatcher.this.pollingInterval);
                        } else {
                            Iterator<MessageContext> it = readMessages.iterator();
                            while (it.hasNext()) {
                                MessageQueueDispatcher.this.toProcess.put(it.next());
                            }
                        }
                    } catch (BusyLockException e) {
                        try {
                            Thread.sleep(MessageQueueDispatcher.this.pollingInterval);
                        } catch (InterruptedException e2) {
                            Thread.interrupted();
                            return;
                        }
                    } catch (Throwable th) {
                        MessageQueueDispatcher.LOG.warn("Error consuming messages ", th);
                    }
                }
            }
        });
    }

    private void startProcessor(final int i) {
        this.executor.submit(new Runnable() { // from class: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.3
            @Override // java.lang.Runnable
            public void run() {
                String join = StringUtils.join(Lists.newArrayList(MessageQueueDispatcher.this.messageQueue.getName(), "Processor", Integer.toString(i)), ":");
                Thread.currentThread().setName(join);
                MessageQueueDispatcher.LOG.info("Starting message processor : " + join);
                while (!MessageQueueDispatcher.this.terminate) {
                    try {
                        try {
                            MessageContext messageContext = (MessageContext) MessageQueueDispatcher.this.toProcess.take();
                            if (messageContext != null) {
                                Message message = messageContext.getMessage();
                                try {
                                } catch (Throwable th) {
                                    messageContext.setException(th);
                                    MessageQueueDispatcher.this.toAck.add(messageContext);
                                    MessageQueueDispatcher.LOG.error("Error processing message " + message.getKey(), th);
                                }
                                if (message.getTaskClass() != null) {
                                    if (MessageQueueDispatcher.this.handlerFactory.createInstance(message.getTaskClass()).apply(messageContext).booleanValue()) {
                                        MessageQueueDispatcher.this.toAck.add(messageContext);
                                    }
                                } else if (((Boolean) MessageQueueDispatcher.this.callback.apply(messageContext)).booleanValue()) {
                                    messageContext.setStatus(MessageStatus.DONE);
                                    MessageQueueDispatcher.this.toAck.add(messageContext);
                                }
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    } catch (Throwable th2) {
                        MessageQueueDispatcher.LOG.error("Error running producer : " + join, th2);
                        return;
                    }
                }
            }
        });
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$602(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$602(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.ackInterval = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$602(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$702(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$702(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.pollingInterval = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.netflix.astyanax.recipes.queue.MessageQueueDispatcher.access$702(com.netflix.astyanax.recipes.queue.MessageQueueDispatcher, long):long");
    }

    static {
    }
}
