/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import org.apache.cassandra.streaming.CompletedFileStatus;
import org.apache.cassandra.streaming.IStreamComplete;
import org.apache.cassandra.streaming.PendingFile;
import org.apache.cassandra.streaming.StreamInManager;
import org.apache.cassandra.streaming.StreamingService;
import org.apache.log4j.Logger;

public class IncomingStreamReader {
    private static Logger logger = Logger.getLogger(IncomingStreamReader.class);
    private PendingFile pendingFile;
    private CompletedFileStatus streamStatus;
    private SocketChannel socketChannel;

    public IncomingStreamReader(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
        InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
        this.pendingFile = StreamInManager.getStreamContext(remoteAddress.getAddress());
        StreamInManager.activeStreams.put((Object)remoteAddress.getAddress(), (Object)this.pendingFile);
        assert (this.pendingFile != null);
        this.streamStatus = StreamInManager.getStreamStatus(remoteAddress.getAddress());
        assert (this.streamStatus != null);
    }

    public void read() throws IOException {
        long bytesRead;
        StreamingService.instance.setStatus("Receiving stream");
        InetSocketAddress remoteAddress = (InetSocketAddress)this.socketChannel.socket().getRemoteSocketAddress();
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Creating file for " + this.pendingFile.getTargetFile()));
        }
        FileOutputStream fos = new FileOutputStream(this.pendingFile.getTargetFile(), true);
        FileChannel fc = fos.getChannel();
        try {
            for (bytesRead = 0L; bytesRead < this.pendingFile.getExpectedBytes(); bytesRead += fc.transferFrom(this.socketChannel, bytesRead, 0x2000000L)) {
                this.pendingFile.update(bytesRead);
            }
            StreamingService.instance.setStatus("Receiving stream: finished reading chunk, awaiting more");
        }
        catch (IOException ex) {
            this.streamStatus.setAction(CompletedFileStatus.StreamCompletionAction.STREAM);
            this.handleStreamCompletion(remoteAddress.getAddress());
            File file = new File(this.pendingFile.getTargetFile());
            file.delete();
            StreamingService.instance.setStatus("Receiving stream: recovering from IO error");
            throw ex;
        }
        finally {
            StreamInManager.activeStreams.remove((Object)remoteAddress.getAddress(), (Object)this.pendingFile);
        }
        if (bytesRead == this.pendingFile.getExpectedBytes()) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Removing stream context " + this.pendingFile));
            }
            fc.close();
            StreamingService.instance.setStatus("Nothing is happening");
            this.handleStreamCompletion(remoteAddress.getAddress());
        }
    }

    private void handleStreamCompletion(InetAddress remoteHost) throws IOException {
        IStreamComplete streamComplete = StreamInManager.getStreamCompletionHandler(remoteHost);
        if (streamComplete != null) {
            streamComplete.onStreamCompletion(remoteHost, this.pendingFile, this.streamStatus);
        }
    }
}

