/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.filter.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;

public class HintedHandOffManager {
    public static final HintedHandOffManager instance = new HintedHandOffManager();
    private static final Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
    static final long INTERVAL_IN_MS = 3600000L;
    public static final String HINTS_CF = "HintsColumnFamily";
    private static final int PAGE_SIZE = 10000;
    private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");

    protected HintedHandOffManager() {
        new Thread((Runnable)new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                while (true) {
                    Thread.sleep(3600000L);
                    HintedHandOffManager.deliverAllHints();
                }
            }
        }, "Hint delivery").start();
    }

    private static boolean sendMessage(InetAddress endPoint, String tableName, String key) throws IOException {
        if (!Gossiper.instance.isKnownEndpoint(endPoint)) {
            logger_.warn((Object)("Hints found for endpoint " + endPoint + " which is not part of the gossip network.  discarding."));
            return true;
        }
        if (!FailureDetector.instance.isAlive(endPoint)) {
            return false;
        }
        Table table = Table.open(tableName);
        RowMutation rm = new RowMutation(tableName, key);
        for (ColumnFamilyStore cfstore : table.getColumnFamilyStores()) {
            ColumnFamily cf = cfstore.getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfstore.getColumnFamilyName())));
            if (cf == null) continue;
            rm.add(cf);
        }
        Message message = rm.makeRowMutationMessage();
        WriteResponseHandler responseHandler = new WriteResponseHandler(1, tableName);
        MessagingService.instance.sendRR(message, new InetAddress[]{endPoint}, (IAsyncCallback)responseHandler);
        try {
            responseHandler.get();
        }
        catch (TimeoutException e) {
            return false;
        }
        return true;
    }

    private static void deleteEndPoint(byte[] endpointAddress, String tableName, byte[] key, long timestamp) throws IOException {
        RowMutation rm = new RowMutation("system", tableName);
        rm.delete(new QueryPath(HINTS_CF, key, endpointAddress), timestamp);
        rm.apply();
    }

    private static void deleteHintKey(String tableName, byte[] key) throws IOException {
        RowMutation rm = new RowMutation("system", tableName);
        rm.delete(new QueryPath(HINTS_CF, key, null), System.currentTimeMillis());
        rm.apply();
    }

    private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException, TimeoutException {
        if (logger_.isDebugEnabled()) {
            logger_.debug((Object)"Started deliverAllHints");
        }
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        for (String tableName : DatabaseDescriptor.getTables()) {
            SliceQueryFilter filter;
            ColumnFamily hintColumnFamily;
            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
            while (!HintedHandOffManager.pagingFinished(hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, 10000)), Integer.MAX_VALUE), startColumn)) {
                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
                for (IColumn keyColumn : keys) {
                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
                    String keyStr = new String(keyColumn.name(), "UTF-8");
                    int deleted = 0;
                    for (IColumn endpoint : endpoints) {
                        if (!HintedHandOffManager.sendMessage(InetAddress.getByAddress(endpoint.name()), tableName, keyStr)) continue;
                        HintedHandOffManager.deleteEndPoint(endpoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
                        ++deleted;
                    }
                    if (deleted == endpoints.size()) {
                        HintedHandOffManager.deleteHintKey(tableName, keyColumn.name());
                    }
                    startColumn = keyColumn.name();
                }
            }
        }
        hintStore.forceFlush();
        try {
            CompactionManager.instance.submitMajor(hintStore).get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug((Object)"Finished deliverAllHints");
        }
    }

    private static boolean pagingFinished(ColumnFamily hintColumnFamily, byte[] startColumn) {
        return hintColumnFamily == null || hintColumnFamily.getSortedColumns().size() == 1 && hintColumnFamily.getColumn(startColumn) != null;
    }

    private static void deliverHintsToEndpoint(InetAddress endPoint) throws IOException, DigestMismatchException, InvalidRequestException, TimeoutException {
        if (logger_.isDebugEnabled()) {
            logger_.debug((Object)("Started hinted handoff for endPoint " + endPoint));
        }
        byte[] targetEPBytes = endPoint.getAddress();
        ColumnFamilyStore hintStore = Table.open("system").getColumnFamilyStore(HINTS_CF);
        for (String tableName : DatabaseDescriptor.getTables()) {
            SliceQueryFilter filter;
            ColumnFamily hintColumnFamily;
            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
            while (!HintedHandOffManager.pagingFinished(hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter = new SliceQueryFilter(tableName, new QueryPath(HINTS_CF), startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, false, 10000)), Integer.MAX_VALUE), startColumn)) {
                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
                for (IColumn keyColumn : keys) {
                    String keyStr = new String(keyColumn.name(), "UTF-8");
                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
                    for (IColumn hintEndPoint : endpoints) {
                        if (!Arrays.equals(hintEndPoint.name(), targetEPBytes) || !HintedHandOffManager.sendMessage(endPoint, tableName, keyStr)) continue;
                        if (endpoints.size() == 1) {
                            HintedHandOffManager.deleteHintKey(tableName, keyColumn.name());
                            break;
                        }
                        HintedHandOffManager.deleteEndPoint(hintEndPoint.name(), tableName, keyColumn.name(), System.currentTimeMillis());
                        break;
                    }
                    startColumn = keyColumn.name();
                }
            }
        }
        if (logger_.isDebugEnabled()) {
            logger_.debug((Object)("Finished hinted handoff for endpoint " + endPoint));
        }
    }

    public void deliverHints(final InetAddress to) {
        WrappedRunnable r = new WrappedRunnable(){

            @Override
            public void runMayThrow() throws Exception {
                HintedHandOffManager.deliverHintsToEndpoint(to);
            }
        };
        this.executor_.submit(r);
    }
}

