/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.CompletedFileStatus;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamCompletionHandler;
import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.streaming.StreamInitiateMessage;
import org.apache.cassandra.streaming.StreamOut;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.log4j.Logger;

public class StreamInitiateVerbHandler
implements IVerbHandler {
    private static Logger logger = Logger.getLogger(StreamInitiateVerbHandler.class);

    @Override
    public void doVerb(Message message) {
        byte[] body = message.getMessageBody();
        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
        if (logger.isDebugEnabled()) {
            logger.debug((Object)String.format("StreamInitiateVerbeHandler.doVerb %s %s %s", new Object[]{message.getVerb(), message.getMessageId(), message.getMessageType()}));
        }
        try {
            StreamInitiateMessage biMsg = StreamInitiateMessage.serializer().deserialize(new DataInputStream(bufIn));
            PendingFile[] pendingFiles = biMsg.getStreamContext();
            if (pendingFiles.length == 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("no data needed from " + message.getFrom()));
                }
                if (StorageService.instance.isBootstrapMode()) {
                    StorageService.instance.removeBootstrapSource(message.getFrom(), new String(message.getHeader(StreamOut.TABLE_NAME)));
                }
                return;
            }
            Map<String, String> fileNames = this.getNewNames(pendingFiles);
            HashMap<String, String> pathNames = new HashMap<String, String>();
            for (String ssName : fileNames.keySet()) {
                pathNames.put(ssName, DatabaseDescriptor.getNextAvailableDataLocation());
            }
            for (PendingFile pendingFile : pendingFiles) {
                CompletedFileStatus streamStatus = new CompletedFileStatus(pendingFile.getTargetFile(), pendingFile.getExpectedBytes());
                String file = this.getNewFileNameFromOldContextAndNames(fileNames, pathNames, pendingFile);
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Received Data from  : " + message.getFrom() + " " + pendingFile.getTargetFile() + " " + file));
                }
                pendingFile.setTargetFile(file);
                this.addStreamContext(message.getFrom(), pendingFile, streamStatus);
            }
            StreamInManager.registerStreamCompletionHandler(message.getFrom(), new StreamCompletionHandler());
            if (logger.isDebugEnabled()) {
                logger.debug((Object)"Sending a stream initiate done message ...");
            }
            Message doneMessage = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.STREAM_INITIATE_DONE, new byte[0]);
            MessagingService.instance.sendOneWay(doneMessage, message.getFrom());
        }
        catch (IOException ex) {
            throw new IOError(ex);
        }
    }

    public String getNewFileNameFromOldContextAndNames(Map<String, String> fileNames, Map<String, String> pathNames, PendingFile pendingFile) {
        File sourceFile = new File(pendingFile.getTargetFile());
        String[] piece = FBUtilities.strip(sourceFile.getName(), "-");
        String cfName = piece[0];
        String ssTableNum = piece[1];
        String typeOfFile = piece[2];
        String newFileNameExpanded = fileNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum);
        String path = pathNames.get(pendingFile.getTable() + "-" + cfName + "-" + ssTableNum);
        String newFileName = newFileNameExpanded.replace("Data.db", typeOfFile);
        return path + File.separator + pendingFile.getTable() + File.separator + newFileName;
    }

    public Map<String, String> getNewNames(PendingFile[] pendingFiles) throws IOException {
        HashMap<String, String> fileNames = new HashMap<String, String>();
        HashSet<String> distinctEntries = new HashSet<String>();
        for (PendingFile pendingFile : pendingFiles) {
            String[] pieces = FBUtilities.strip(new File(pendingFile.getTargetFile()).getName(), "-");
            distinctEntries.add(pendingFile.getTable() + "-" + pieces[0] + "-" + pieces[1]);
        }
        for (String distinctEntry : distinctEntries) {
            String[] pieces = FBUtilities.strip(distinctEntry, "-");
            String tableName = pieces[0];
            Table table = Table.open(tableName);
            ColumnFamilyStore cfStore = table.getColumnFamilyStore(pieces[1]);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Generating file name for " + distinctEntry + " ..."));
            }
            fileNames.put(distinctEntry, cfStore.getTempSSTableFileName());
        }
        return fileNames;
    }

    private void addStreamContext(InetAddress host, PendingFile pendingFile, CompletedFileStatus streamStatus) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Adding stream context " + pendingFile + " for " + host + " ..."));
        }
        StreamInManager.addStreamContext(host, pendingFile, streamStatus);
    }
}

