/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.streaming.StreamOutManager;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

public class StreamOut {
    private static Logger logger = Logger.getLogger(StreamOut.class);
    static String TABLE_NAME = "STREAMING-TABLE-NAME";

    private static void updateStatus(String msg) {
        StreamingService.instance.setStatus(msg);
        if (logger.isInfoEnabled() && !"Nothing is happening".equals(msg)) {
            logger.info((Object)msg);
        }
    }

    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback) {
        assert (ranges.size() > 0);
        logger.debug((Object)("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges, (String)", ")));
        try {
            Table table = Table.open(tableName);
            StreamOut.updateStatus("Flushing memtables for " + tableName + "...");
            for (Future<?> f : table.flush()) {
                try {
                    f.get();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }
            StreamOut.updateStatus("Performing anticompaction ...");
            StreamOut.transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName);
        }
        catch (IOException e) {
            throw new IOError(e);
        }
        finally {
            StreamingService.instance.setStatus("Nothing is happening");
        }
        if (callback != null) {
            callback.run();
        }
    }

    public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException {
        Object[] pendingFiles = new PendingFile[3 * sstables.size()];
        int i = 0;
        for (SSTableReader sstable : sstables) {
            for (String filename : sstable.getAllFilenames()) {
                File file = new File(filename);
                pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(), table);
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Stream context metadata " + StringUtils.join((Object[])pendingFiles, (String)(",  " + sstables.size() + " sstables."))));
        }
        StreamOutManager.get(target).addFilesToStream((PendingFile[])pendingFiles);
        StreamInitiateMessage biMessage = new StreamInitiateMessage((PendingFile[])pendingFiles);
        Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
        message.setHeader(TABLE_NAME, table.getBytes());
        StreamOut.updateStatus("Sending a stream initiate message to " + target + " ...");
        MessagingService.instance.sendOneWay(message, target);
        if (pendingFiles.length > 0) {
            StreamingService.instance.setStatus("Waiting for transfer to " + target + " to complete");
            StreamOutManager.get(target).waitForStreamCompletion();
            StreamOut.updateStatus("Done with transfer to " + target);
        }
    }
}

