/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.CompactionIterator;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.io.IndexSummary;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.Pair;
import org.apache.log4j.Logger;

public class AntiEntropyService {
    private static final Logger logger = Logger.getLogger(AntiEntropyService.class);
    public static final long TREE_STORE_TIMEOUT = 600000L;
    public static final long NATURAL_REPAIR_FREQUENCY = 3600000L;
    public static final AntiEntropyService instance = new AntiEntropyService();
    private final ConcurrentMap<CFPair, Long> naturalRepairs = new ConcurrentHashMap<CFPair, Long>();
    private final Map<CFPair, ExpiringMap<InetAddress, TreePair>> trees = new HashMap<CFPair, ExpiringMap<InetAddress, TreePair>>();

    protected AntiEntropyService() {
    }

    private ExpiringMap<InetAddress, TreePair> rendezvousPairs(CFPair cf) {
        ExpiringMap<InetAddress, TreePair> ctrees = this.trees.get(cf);
        if (ctrees == null) {
            ctrees = new ExpiringMap(600000L);
            this.trees.put(cf, ctrees);
        }
        return ctrees;
    }

    public static Set<InetAddress> getNeighbors(String table) {
        StorageService ss = StorageService.instance;
        HashSet<InetAddress> neighbors = new HashSet<InetAddress>();
        Map<Range, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
        for (Range range : ss.getLocalRanges(table)) {
            neighbors.addAll((Collection<InetAddress>)replicaSets.get(range));
        }
        neighbors.remove(FBUtilities.getLocalAddress());
        return neighbors;
    }

    private void rendezvous(CFPair cf, InetAddress endpoint, MerkleTree tree) {
        InetAddress LOCAL = FBUtilities.getLocalAddress();
        ExpiringMap<InetAddress, TreePair> ctrees = this.rendezvousPairs(cf);
        ArrayList<Differencer> differencers = new ArrayList<Differencer>();
        if (LOCAL.equals(endpoint)) {
            for (InetAddress neighbor : AntiEntropyService.getNeighbors((String)cf.left)) {
                TreePair waiting = ctrees.remove(neighbor);
                if (waiting != null && waiting.right != null) {
                    differencers.add(new Differencer(cf, LOCAL, neighbor, tree, (MerkleTree)waiting.right));
                    continue;
                }
                ctrees.put(neighbor, new TreePair(tree, null));
                logger.debug((Object)("Stored local tree for " + cf + " to wait for " + neighbor));
            }
        } else {
            TreePair waiting = ctrees.remove(endpoint);
            if (waiting != null && waiting.left != null) {
                differencers.add(new Differencer(cf, LOCAL, endpoint, (MerkleTree)waiting.left, tree));
            } else {
                ctrees.put(endpoint, new TreePair(null, tree));
                logger.debug((Object)("Stored remote tree for " + cf + " from " + endpoint));
            }
        }
        for (Differencer differencer : differencers) {
            logger.info((Object)("Queueing comparison " + differencer));
            StageManager.getStage("AE-SERVICE-STAGE").execute(differencer);
        }
    }

    void notifyNeighbors(Validator validator, InetAddress local, Collection<InetAddress> neighbors) {
        MessagingService ms = MessagingService.instance;
        try {
            Message message = TreeResponseVerbHandler.makeVerb(local, validator);
            logger.info((Object)("Sending AEService tree for " + validator.cf + " to: " + neighbors));
            for (InetAddress neighbor : neighbors) {
                ms.sendOneWay(message, neighbor);
            }
        }
        catch (Exception e) {
            logger.error((Object)("Could not send valid tree to endpoints: " + neighbors), (Throwable)e);
        }
    }

    TreePair getRendezvousPair_TestsOnly(String table, String cf, InetAddress remote) {
        return this.rendezvousPairs(new CFPair(table, cf)).get(remote);
    }

    void clearNaturalRepairs_TestsOnly() {
        this.naturalRepairs.clear();
    }

    private boolean shouldRunNaturally(CFPair cf) {
        Long curtime = System.currentTimeMillis();
        Long pretime = this.naturalRepairs.putIfAbsent(cf, curtime);
        if (pretime != null) {
            if (pretime < curtime - 3600000L) {
                return this.naturalRepairs.replace(cf, pretime, curtime);
            }
            logger.debug((Object)("Skipping natural repair: last occurred " + (curtime - pretime) + "ms ago."));
            return false;
        }
        return true;
    }

    public IValidator getValidator(String table, String cf, InetAddress initiator, boolean major) {
        if (!major || table.equals("system")) {
            return new NoopValidator();
        }
        if (StorageService.instance.getTokenMetadata().sortedTokens().size() < 1) {
            return new NoopValidator();
        }
        CFPair cfpair = new CFPair(table, cf);
        if (initiator == null && !this.shouldRunNaturally(cfpair)) {
            return new NoopValidator();
        }
        return new Validator(cfpair);
    }

    static class TreePair
    extends Pair<MerkleTree, MerkleTree> {
        public TreePair(MerkleTree local, MerkleTree remote) {
            super(local, remote);
            assert (local != null ^ remote != null);
        }
    }

    static class CFPair
    extends Pair<String, String> {
        public CFPair(String table, String cf) {
            super(table, cf);
            assert (table != null && cf != null);
        }
    }

    public static class TreeResponseVerbHandler
    implements IVerbHandler,
    ICompactSerializer<Validator> {
        public static final TreeResponseVerbHandler SERIALIZER = new TreeResponseVerbHandler();

        static Message makeVerb(InetAddress local, Validator validator) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(validator, dos);
                return new Message(local, "AE-SERVICE-STAGE", StorageService.Verb.TREE_RESPONSE, bos.toByteArray());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(Validator v, DataOutputStream dos) throws IOException {
            TreeRequestVerbHandler.SERIALIZER.serialize(v.cf, dos);
            ObjectOutputStream oos = new ObjectOutputStream(dos);
            oos.writeObject(v.tree);
            oos.flush();
        }

        @Override
        public Validator deserialize(DataInputStream dis) throws IOException {
            CFPair cf = TreeRequestVerbHandler.SERIALIZER.deserialize(dis);
            ObjectInputStream ois = new ObjectInputStream(dis);
            try {
                return new Validator(cf, (MerkleTree)ois.readObject());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void doVerb(Message message) {
            byte[] bytes = message.getMessageBody();
            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
            try {
                Validator rvalidator = this.deserialize(new DataInputStream(buffer));
                instance.rendezvous(rvalidator.cf, message.getFrom(), rvalidator.tree);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class TreeRequestVerbHandler
    implements IVerbHandler,
    ICompactSerializer<CFPair> {
        public static final TreeRequestVerbHandler SERIALIZER = new TreeRequestVerbHandler();

        static Message makeVerb(String table, String cf) {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(bos);
                SERIALIZER.serialize(new CFPair(table, cf), dos);
                return new Message(FBUtilities.getLocalAddress(), "AE-SERVICE-STAGE", StorageService.Verb.TREE_REQUEST, bos.toByteArray());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public void serialize(CFPair treerequest, DataOutputStream dos) throws IOException {
            dos.writeUTF((String)treerequest.left);
            dos.writeUTF((String)treerequest.right);
        }

        @Override
        public CFPair deserialize(DataInputStream dis) throws IOException {
            return new CFPair(dis.readUTF(), dis.readUTF());
        }

        @Override
        public void doVerb(Message message) {
            byte[] bytes = message.getMessageBody();
            ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
            try {
                CFPair request = this.deserialize(new DataInputStream(buffer));
                logger.debug((Object)("Queueing readonly compaction for request from " + message.getFrom() + " for " + request));
                Table table = Table.open((String)request.left);
                CompactionManager.instance.submitReadonly(table.getColumnFamilyStore((String)request.right), message.getFrom());
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }
    }

    public static class Differencer
    implements Runnable {
        public final CFPair cf;
        public final InetAddress local;
        public final InetAddress remote;
        public final MerkleTree ltree;
        public final MerkleTree rtree;
        public final List<MerkleTree.TreeRange> differences;

        public Differencer(CFPair cf, InetAddress local, InetAddress remote, MerkleTree ltree, MerkleTree rtree) {
            this.cf = cf;
            this.local = local;
            this.remote = remote;
            this.ltree = ltree;
            this.rtree = rtree;
            this.differences = new ArrayList<MerkleTree.TreeRange>();
        }

        @Override
        public void run() {
            StorageService ss = StorageService.instance;
            if (this.ltree.partitioner() == null) {
                this.ltree.partitioner(ss.getPartitioner());
            }
            if (this.rtree.partitioner() == null) {
                this.rtree.partitioner(ss.getPartitioner());
            }
            HashSet<Range> interesting = new HashSet<Range>(ss.getRangesForEndPoint((String)this.cf.left, this.local));
            interesting.retainAll(ss.getRangesForEndPoint((String)this.cf.left, this.remote));
            block2: for (MerkleTree.TreeRange diff : MerkleTree.difference(this.ltree, this.rtree)) {
                for (Range localrange : interesting) {
                    if (!diff.intersects(localrange)) continue;
                    this.differences.add(diff);
                    continue block2;
                }
            }
            float difference = this.differenceFraction();
            try {
                if ((double)difference == 0.0) {
                    logger.debug((Object)("Endpoints " + this.local + " and " + this.remote + " are consistent for " + this.cf));
                    return;
                }
                this.performStreamingRepair();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        float differenceFraction() {
            double fraction = 0.0;
            for (MerkleTree.TreeRange diff : this.differences) {
                fraction += 1.0 / Math.pow(2.0, diff.depth);
            }
            return (float)fraction;
        }

        void performStreamingRepair() throws IOException {
            logger.info((Object)("Performing streaming repair of " + this.differences.size() + " ranges to " + this.remote + " for " + this.cf));
            ColumnFamilyStore cfstore = Table.open((String)this.cf.left).getColumnFamilyStore((String)this.cf.right);
            try {
                ArrayList<Range> ranges = new ArrayList<Range>(this.differences);
                List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, this.remote).get();
                StreamOut.transferSSTables(this.remote, sstables, (String)this.cf.left);
            }
            catch (Exception e) {
                throw new IOException("Streaming repair failed.", e);
            }
            logger.debug((Object)("Finished streaming repair to " + this.remote + " for " + this.cf));
        }

        public String toString() {
            return "#<Differencer " + this.cf + " local=" + this.local + " remote=" + this.remote + ">";
        }
    }

    public static class NoopValidator
    implements IValidator {
        @Override
        public void prepare() {
        }

        @Override
        public void add(CompactionIterator.CompactedRow row) {
        }

        @Override
        public void complete() {
        }
    }

    public static class Validator
    implements IValidator,
    Callable<Object> {
        public final CFPair cf;
        public final MerkleTree tree;
        private transient List<MerkleTree.RowHash> minrows;
        private transient Token mintoken;
        private transient long validated;
        private transient MerkleTree.TreeRange range;
        private transient MerkleTree.TreeRangeIterator ranges;
        public static final MerkleTree.RowHash EMPTY_ROW = new MerkleTree.RowHash(null, new byte[0]);

        Validator(CFPair cf) {
            this(cf, new MerkleTree(DatabaseDescriptor.getPartitioner(), 126, (int)Math.pow(2.0, 15.0)));
        }

        Validator(CFPair cf, MerkleTree tree) {
            assert (cf != null && tree != null);
            this.cf = cf;
            this.tree = tree;
            this.minrows = new ArrayList<MerkleTree.RowHash>();
            this.mintoken = null;
            this.validated = 0L;
            this.range = null;
            this.ranges = null;
        }

        @Override
        public void prepare() {
            ColumnFamilyStore cfs;
            ArrayList<DecoratedKey> keys = new ArrayList<DecoratedKey>();
            try {
                cfs = Table.open((String)this.cf.left).getColumnFamilyStore((String)this.cf.right);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
            if (cfs != null) {
                for (IndexSummary.KeyPosition info : cfs.allIndexPositions()) {
                    keys.add(info.key);
                }
            }
            if (keys.isEmpty()) {
                this.tree.init();
            } else {
                DecoratedKey dk;
                int numkeys = keys.size();
                Random random = new Random();
                do {
                    dk = (DecoratedKey)keys.get(random.nextInt(numkeys));
                } while (this.tree.split((Token)dk.token));
            }
            logger.debug((Object)("Prepared AEService tree of size " + this.tree.size() + " for " + this.cf));
            this.mintoken = this.tree.partitioner().getMinimumToken();
            this.ranges = this.tree.invalids(new Range(this.mintoken, this.mintoken));
        }

        @Override
        public void add(CompactionIterator.CompactedRow row) {
            if (this.mintoken != null) {
                assert (this.ranges != null) : "Validator was not prepared()";
                if (((Token)row.key.token).compareTo(this.mintoken) == 0) {
                    this.minrows.add(this.rowHash(row));
                    return;
                }
                this.mintoken = null;
            }
            if (this.range == null) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            while (!this.range.contains((Token)row.key.token)) {
                this.range.addHash(EMPTY_ROW);
                this.range = (MerkleTree.TreeRange)this.ranges.next();
            }
            this.range.addHash(this.rowHash(row));
        }

        private MerkleTree.RowHash rowHash(CompactionIterator.CompactedRow row) {
            ++this.validated;
            byte[] rowhash = FBUtilities.hash("SHA-256", row.key.key.getBytes(), row.buffer.getData());
            return new MerkleTree.RowHash((Token)row.key.token, rowhash);
        }

        @Override
        public void complete() {
            assert (this.ranges != null) : "Validator was not prepared()";
            if (this.range != null) {
                this.range.addHash(EMPTY_ROW);
            }
            while (this.ranges.hasNext()) {
                this.range = (MerkleTree.TreeRange)this.ranges.next();
                this.range.addHash(EMPTY_ROW);
            }
            if (!this.minrows.isEmpty()) {
                for (MerkleTree.RowHash minrow : this.minrows) {
                    this.range.addHash(minrow);
                }
            }
            StageManager.getStage("AE-SERVICE-STAGE").submit(this);
            logger.debug((Object)("Validated " + this.validated + " rows into AEService tree for " + this.cf));
        }

        @Override
        public Object call() throws Exception {
            AntiEntropyService aes = instance;
            InetAddress local = FBUtilities.getLocalAddress();
            Set<InetAddress> neighbors = AntiEntropyService.getNeighbors((String)this.cf.left);
            aes.rendezvous(this.cf, local, this.tree);
            aes.notifyNeighbors(this, local, neighbors);
            return AntiEntropyService.class;
        }
    }

    public static interface IValidator {
        public void prepare();

        public void add(CompactionIterator.CompactedRow var1);

        public void complete();
    }
}

