/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.net;

import java.io.DataOutputStream;
import java.io.IOError;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ServerSocketChannel;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.AsyncResult;
import org.apache.cassandra.net.FileStreamTask;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.IncomingTcpConnection;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDeliveryTask;
import org.apache.cassandra.net.OutboundTcpConnection;
import org.apache.cassandra.net.OutboundTcpConnectionPool;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ExpiringMap;
import org.apache.cassandra.utils.GuidGenerator;
import org.apache.log4j.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

public class MessagingService
implements IFailureDetectionEventListener {
    private static int version_ = 1;
    private static SerializerType serializerType_ = SerializerType.BINARY;
    public static final int PROTOCOL_MAGIC = -900387334;
    private static ExpiringMap<String, IAsyncCallback> callbackMap_;
    private static ExpiringMap<String, IAsyncResult> taskCompletionMap_;
    private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
    private static ExecutorService messageDeserializerExecutor_;
    private static ExecutorService streamExecutor_;
    private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_;
    private static Logger logger_;
    public static final MessagingService instance;
    private SocketThread socketThread;

    public Object clone() throws CloneNotSupportedException {
        throw new CloneNotSupportedException();
    }

    protected MessagingService() {
        verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
        callbackMap_ = new ExpiringMap(2L * DatabaseDescriptor.getRpcTimeout());
        taskCompletionMap_ = new ExpiringMap(2L * DatabaseDescriptor.getRpcTimeout());
        messageDeserializerExecutor_ = new JMXEnabledThreadPoolExecutor(1, Runtime.getRuntime().availableProcessors(), Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MESSAGE-DESERIALIZER-POOL"));
        streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
    }

    public byte[] hash(String type, byte[] data) {
        byte[] result;
        try {
            MessageDigest messageDigest = MessageDigest.getInstance(type);
            result = messageDigest.digest(data);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return result;
    }

    @Override
    public void convict(InetAddress ep) {
        logger_.trace((Object)("Resetting pool for " + ep));
        MessagingService.getConnectionPool(ep).reset();
    }

    public void listen(InetAddress localEp) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        ss.setReuseAddress(true);
        ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
        this.socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
        this.socketThread.start();
    }

    public static OutboundTcpConnectionPool getConnectionPool(InetAddress to) {
        OutboundTcpConnectionPool cp = (OutboundTcpConnectionPool)connectionManagers_.get((Object)to);
        if (cp == null) {
            connectionManagers_.putIfAbsent((Object)to, (Object)new OutboundTcpConnectionPool(to));
            cp = (OutboundTcpConnectionPool)connectionManagers_.get((Object)to);
        }
        return cp;
    }

    public static OutboundTcpConnection getConnection(InetAddress to, Message msg) {
        return MessagingService.getConnectionPool(to).getConnection(msg);
    }

    public void registerVerbHandlers(StorageService.Verb verb, IVerbHandler verbHandler) {
        assert (!verbHandlers_.containsKey((Object)verb));
        verbHandlers_.put(verb, verbHandler);
    }

    public IVerbHandler getVerbHandler(StorageService.Verb type) {
        return verbHandlers_.get((Object)type);
    }

    public String sendRR(Message message, InetAddress[] to, IAsyncCallback cb) {
        String messageId = message.getMessageId();
        this.addCallback(cb, messageId);
        for (InetAddress endpoint : to) {
            this.sendOneWay(message, endpoint);
        }
        return messageId;
    }

    public void addCallback(IAsyncCallback cb, String messageId) {
        callbackMap_.put(messageId, cb);
    }

    public String sendRR(Message message, InetAddress to, IAsyncCallback cb) {
        String messageId = message.getMessageId();
        this.addCallback(cb, messageId);
        this.sendOneWay(message, to);
        return messageId;
    }

    public String sendRR(Message[] messages, InetAddress[] to, IAsyncCallback cb) {
        if (messages.length != to.length) {
            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
        }
        String groupId = GuidGenerator.guid();
        this.addCallback(cb, groupId);
        for (int i = 0; i < messages.length; ++i) {
            messages[i].setMessageId(groupId);
            this.sendOneWay(messages[i], to[i]);
        }
        return groupId;
    }

    public void sendOneWay(Message message, InetAddress to) {
        byte[] data;
        Object buffer;
        if (message.getFrom().equals(to)) {
            MessagingService.receive(message);
            return;
        }
        Message processedMessage = SinkManager.processClientMessageSink(message);
        if (processedMessage == null) {
            return;
        }
        OutboundTcpConnection connection = MessagingService.getConnection(to, message);
        try {
            buffer = new DataOutputBuffer();
            Message.serializer().serialize(message, (DataOutputStream)buffer);
            data = ((DataOutputBuffer)buffer).getData();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        assert (data.length > 0);
        buffer = MessagingService.packIt(data, false);
        connection.write((ByteBuffer)buffer);
    }

    public IAsyncResult sendRR(Message message, InetAddress to) {
        AsyncResult iar = new AsyncResult();
        taskCompletionMap_.put(message.getMessageId(), iar);
        this.sendOneWay(message, to);
        return iar;
    }

    public void stream(String file, long startPosition, long endPosition, InetAddress from, InetAddress to) {
        FileStreamTask streamingTask = new FileStreamTask(file, startPosition, endPosition, from, to);
        streamExecutor_.execute(streamingTask);
    }

    public static void waitFor() throws InterruptedException {
        while (!messageDeserializerExecutor_.isTerminated()) {
            messageDeserializerExecutor_.awaitTermination(5L, TimeUnit.SECONDS);
        }
        while (!streamExecutor_.isTerminated()) {
            streamExecutor_.awaitTermination(5L, TimeUnit.SECONDS);
        }
    }

    public static void shutdown() {
        logger_.info((Object)"Shutting down MessageService...");
        try {
            MessagingService.instance.socketThread.close();
        }
        catch (IOException e) {
            throw new IOError(e);
        }
        messageDeserializerExecutor_.shutdownNow();
        streamExecutor_.shutdownNow();
        taskCompletionMap_.shutdown();
        callbackMap_.shutdown();
        logger_.info((Object)"Shutdown complete (no further commands will be processed)");
    }

    public static void receive(Message message) {
        MessageDeliveryTask runnable = new MessageDeliveryTask(message);
        ThreadPoolExecutor stage = StageManager.getStage(message.getMessageType());
        if (stage == null) {
            if (logger_.isDebugEnabled()) {
                logger_.debug((Object)("Running " + message.getMessageType() + " on default stage"));
            }
            messageDeserializerExecutor_.execute(runnable);
        } else {
            stage.execute(runnable);
        }
    }

    public static IAsyncCallback getRegisteredCallback(String key) {
        return callbackMap_.get(key);
    }

    public static void removeRegisteredCallback(String key) {
        callbackMap_.remove(key);
    }

    public static IAsyncResult getAsyncResult(String key) {
        return taskCompletionMap_.remove(key);
    }

    public static ExecutorService getDeserializationExecutor() {
        return messageDeserializerExecutor_;
    }

    public static void validateMagic(int magic) throws IOException {
        if (magic != -900387334) {
            throw new IOException("invalid protocol header");
        }
    }

    public static int getBits(int x, int p, int n) {
        return x >>> p + 1 - n & ~(-1 << n);
    }

    public static ByteBuffer packIt(byte[] bytes, boolean compress) {
        int header = 0;
        header |= serializerType_.ordinal();
        if (compress) {
            header |= 4;
        }
        ByteBuffer buffer = ByteBuffer.allocate(12 + bytes.length);
        buffer.putInt(-900387334);
        buffer.putInt(header |= version_ << 8);
        buffer.putInt(bytes.length);
        buffer.put(bytes);
        buffer.flip();
        return buffer;
    }

    public static ByteBuffer constructStreamHeader(boolean compress) {
        int header = 0;
        header |= serializerType_.ordinal();
        if (compress) {
            header |= 4;
        }
        header |= 8;
        ByteBuffer buffer = ByteBuffer.allocate(8);
        buffer.putInt(-900387334);
        buffer.putInt(header |= version_ << 8);
        buffer.flip();
        return buffer;
    }

    static {
        connectionManagers_ = new NonBlockingHashMap();
        logger_ = Logger.getLogger(MessagingService.class);
        instance = new MessagingService();
    }

    private class SocketThread
    extends Thread {
        private final ServerSocket server;

        SocketThread(ServerSocket server, String name) {
            super(name);
            this.server = server;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    Socket socket = this.server.accept();
                    new IncomingTcpConnection(socket).start();
                }
            }
            catch (AsynchronousCloseException e) {
                logger_.info((Object)"MessagingService shutting down server thread.");
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        void close() throws IOException {
            this.server.close();
        }
    }
}

