package com.netflix.astyanax.recipes.queue.shard;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.netflix.astyanax.recipes.queue.MessageQueueMetadata;
import com.netflix.astyanax.recipes.queue.MessageQueueShard;
import com.netflix.astyanax.recipes.queue.MessageQueueShardStats;
import com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:astyanax-queue-2.0.2.jar:com/netflix/astyanax/recipes/queue/shard/TimePartitionedShardReaderPolicy.class */
public class TimePartitionedShardReaderPolicy implements ShardReaderPolicy {
    public static final long DEFAULT_POLLING_INTERVAL = 1000;
    public static final long NO_CATCHUP_POLLING_INTERVAL = 0;
    private static final String SEPARATOR = ":";
    private final MessageQueueMetadata settings;
    private final List<MessageQueueShard> shards;
    private final Map<String, MessageQueueShardStats> shardStats;
    private final LinkedBlockingQueue<MessageQueueShard> workQueue;
    private final LinkedBlockingQueue<MessageQueueShard> idleQueue;
    private final long pollingInterval;
    private final long catchupPollingInterval;
    private int currentTimePartition;

    /* loaded from: input_file:astyanax-queue-2.0.2.jar:com/netflix/astyanax/recipes/queue/shard/TimePartitionedShardReaderPolicy$Factory.class */
    public static class Factory implements ShardReaderPolicy.Factory {
        private final Builder builder;

        /* loaded from: input_file:astyanax-queue-2.0.2.jar:com/netflix/astyanax/recipes/queue/shard/TimePartitionedShardReaderPolicy$Factory$Builder.class */
        public static class Builder {
            private long pollingInterval = 1000;
            private long catchupPollingInterval = 0;

            public Builder withPollingInterval(long j, TimeUnit timeUnit) {
                this.pollingInterval = TimeUnit.MILLISECONDS.convert(j, timeUnit);
                return this;
            }

            public Builder withCatchupPollingInterval(long j, TimeUnit timeUnit) {
                this.catchupPollingInterval = TimeUnit.MILLISECONDS.convert(j, timeUnit);
                return this;
            }

            public Factory build() {
                return new Factory(this);
            }
        }

        public static Builder builder() {
            return new Builder();
        }

        public Factory(Builder builder) {
            this.builder = builder;
        }

        @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy.Factory
        public ShardReaderPolicy create(MessageQueueMetadata messageQueueMetadata) {
            return new TimePartitionedShardReaderPolicy(this.builder, messageQueueMetadata);
        }
    }

    private TimePartitionedShardReaderPolicy(Factory.Builder builder, MessageQueueMetadata messageQueueMetadata) {
        this.workQueue = Queues.newLinkedBlockingQueue();
        this.idleQueue = Queues.newLinkedBlockingQueue();
        this.currentTimePartition = -1;
        this.settings = messageQueueMetadata;
        this.pollingInterval = builder.pollingInterval;
        this.catchupPollingInterval = builder.catchupPollingInterval;
        this.shards = Lists.newArrayListWithCapacity(messageQueueMetadata.getPartitionCount() * messageQueueMetadata.getShardCount());
        for (int i = 0; i < messageQueueMetadata.getPartitionCount(); i++) {
            for (int i2 = 0; i2 < messageQueueMetadata.getShardCount(); i2++) {
                this.shards.add(new MessageQueueShard(messageQueueMetadata.getQueueName() + ":" + i + ":" + i2, i, i2));
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        this.shardStats = Maps.newHashMapWithExpectedSize(this.shards.size());
        for (MessageQueueShard messageQueueShard : this.shards) {
            newArrayList.add(messageQueueShard);
            this.shardStats.put(messageQueueShard.getName(), messageQueueShard);
        }
        Collections.shuffle(newArrayList);
        this.workQueue.addAll(newArrayList);
    }

    private int getCurrentPartitionIndex() {
        if (this.settings.getPartitionCount() <= 1) {
            return 0;
        }
        return (int) ((TimeUnit.MICROSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) / this.settings.getPartitionDuration().longValue()) % this.settings.getPartitionCount());
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public MessageQueueShard nextShard() throws InterruptedException {
        int currentPartitionIndex = getCurrentPartitionIndex();
        if (currentPartitionIndex != this.currentTimePartition) {
            synchronized (this) {
                if (currentPartitionIndex != this.currentTimePartition) {
                    this.currentTimePartition = currentPartitionIndex;
                    ArrayList<MessageQueueShard> newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.idleQueue.size());
                    this.idleQueue.drainTo(newArrayListWithCapacity);
                    for (MessageQueueShard messageQueueShard : newArrayListWithCapacity) {
                        if (messageQueueShard.getPartition() == this.currentTimePartition) {
                            this.workQueue.add(messageQueueShard);
                        } else {
                            this.idleQueue.add(messageQueueShard);
                        }
                    }
                }
            }
        }
        return this.workQueue.take();
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public void releaseShard(MessageQueueShard messageQueueShard, int i) {
        if (messageQueueShard.getPartition() == this.currentTimePartition || i != 0) {
            this.workQueue.add(messageQueueShard);
        } else {
            this.idleQueue.add(messageQueueShard);
        }
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public Collection<MessageQueueShard> listShards() {
        return Collections.unmodifiableList(this.shards);
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public Map<String, MessageQueueShardStats> getShardStats() {
        return this.shardStats;
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public int getWorkQueueDepth() {
        return this.workQueue.size();
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public int getIdleQueueDepth() {
        return this.idleQueue.size();
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public boolean isCatchingUp() {
        return getWorkQueueDepth() > this.settings.getShardCount() * 2;
    }

    @Override // com.netflix.astyanax.recipes.queue.shard.ShardReaderPolicy
    public long getPollInterval() {
        return (!isCatchingUp() || this.catchupPollingInterval == 0) ? this.pollingInterval : this.catchupPollingInterval;
    }
}
