/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.cassandra.cache.ICacheExpungeHook;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.IResponseResolver;
import org.apache.cassandra.service.ReadResponseResolver;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

class ConsistencyChecker
implements Runnable {
    private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
    private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap(DatabaseDescriptor.getRpcTimeout());
    private final String table_;
    private final Row row_;
    protected final List<InetAddress> replicas_;
    private final ReadCommand readCommand_;

    public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand readCommand) {
        this.table_ = table;
        this.row_ = row;
        this.replicas_ = endpoints;
        this.readCommand_ = readCommand;
    }

    @Override
    public void run() {
        ReadCommand readCommandDigestOnly = this.constructReadMessage(true);
        try {
            Message message = readCommandDigestOnly.makeReadMessage();
            if (logger_.isDebugEnabled()) {
                logger_.debug((Object)("Reading consistency digest for " + this.readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(this.replicas_, (String)", ") + "]"));
            }
            MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
            for (InetAddress endpoint : this.replicas_) {
                if (endpoint.equals(FBUtilities.getLocalAddress())) continue;
                MessagingService.instance.sendOneWay(message, endpoint);
            }
        }
        catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    }

    private ReadCommand constructReadMessage(boolean isDigestQuery) {
        ReadCommand readCommand = this.readCommand_.copy();
        readCommand.setDigestQuery(isDigestQuery);
        return readCommand;
    }

    static class DataRepairHandler
    implements IAsyncCallback,
    ICacheExpungeHook<String, String> {
        private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
        private final IResponseResolver<Row> readResponseResolver_;
        private final int majority_;

        DataRepairHandler(int responseCount, IResponseResolver<Row> readResponseResolver) {
            this.readResponseResolver_ = readResponseResolver;
            this.majority_ = responseCount / 2 + 1;
        }

        public DataRepairHandler(Row localRow, int responseCount, IResponseResolver<Row> readResponseResolver) throws IOException {
            this(responseCount, readResponseResolver);
            ReadResponse readResponse = new ReadResponse(localRow);
            DataOutputBuffer out = new DataOutputBuffer();
            ReadResponse.serializer().serialize(readResponse, out);
            byte[] bytes = new byte[out.getLength()];
            System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
            this.responses_.add(new Message(FBUtilities.getLocalAddress(), "RESPONSE-STAGE", StorageService.Verb.READ_RESPONSE, bytes));
        }

        @Override
        public synchronized void response(Message message) {
            if (logger_.isDebugEnabled()) {
                logger_.debug((Object)("Received responses in DataRepairHandler : " + message.toString()));
            }
            this.responses_.add(message);
            if (this.responses_.size() == this.majority_) {
                String messageId = message.getMessageId();
                readRepairTable_.put(messageId, messageId, this);
            }
        }

        @Override
        public void callMe(String key, String value) {
            try {
                this.readResponseResolver_.resolve(this.responses_);
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    class DigestResponseHandler
    implements IAsyncCallback {
        private boolean repairInvoked;

        DigestResponseHandler() {
        }

        @Override
        public synchronized void response(Message response) {
            if (this.repairInvoked) {
                return;
            }
            try {
                byte[] body = response.getMessageBody();
                ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                byte[] digest = result.digest();
                if (!Arrays.equals(ColumnFamily.digest(((ConsistencyChecker)ConsistencyChecker.this).row_.cf), digest)) {
                    ReadResponseResolver readResponseResolver = new ReadResponseResolver(ConsistencyChecker.this.table_, ConsistencyChecker.this.replicas_.size());
                    DataRepairHandler responseHandler = ConsistencyChecker.this.replicas_.contains(FBUtilities.getLocalAddress()) ? new DataRepairHandler(ConsistencyChecker.this.row_, ConsistencyChecker.this.replicas_.size(), readResponseResolver) : new DataRepairHandler(ConsistencyChecker.this.replicas_.size(), readResponseResolver);
                    ReadCommand readCommand = ConsistencyChecker.this.constructReadMessage(false);
                    Message message = readCommand.makeReadMessage();
                    if (logger_.isDebugEnabled()) {
                        logger_.debug((Object)("Performing read repair for " + ((ConsistencyChecker)ConsistencyChecker.this).readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(ConsistencyChecker.this.replicas_, (String)", ") + "]"));
                    }
                    MessagingService.instance.addCallback(responseHandler, message.getMessageId());
                    for (InetAddress endpoint : ConsistencyChecker.this.replicas_) {
                        if (endpoint.equals(FBUtilities.getLocalAddress())) continue;
                        MessagingService.instance.sendOneWay(message, endpoint);
                    }
                    this.repairInvoked = true;
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error handling responses for " + ConsistencyChecker.this.row_, e);
            }
        }
    }
}

