/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemoteBlockReader2;
import org.apache.hadoop.hdfs.SocketCache;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.security.token.Token;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestConnCache {
    static final Log LOG = LogFactory.getLog(TestConnCache.class);
    static final int BLOCK_SIZE = 4096;
    static final int FILE_SIZE = 12288;
    static Configuration conf = null;
    static MiniDFSCluster cluster = null;
    static FileSystem fs = null;
    static final Path testFile = new Path("/testConnCache.dat");
    static byte[] authenticData = null;
    static BlockReaderTestUtil util = null;

    @BeforeClass
    public static void setupCluster() throws Exception {
        boolean REPLICATION_FACTOR = true;
        util = new BlockReaderTestUtil(1);
        cluster = util.getCluster();
        conf = util.getConf();
        fs = cluster.getFileSystem();
        authenticData = util.writeFile(testFile, 12);
    }

    private void pread(DFSInputStream in, long pos, byte[] buffer, int offset, int length) throws IOException {
        Assert.assertTrue((String)"Test buffer too small", (buffer.length >= offset + length ? 1 : 0) != 0);
        if (pos >= 0L) {
            in.seek(pos);
        }
        LOG.info((Object)("Reading from file of size " + in.getFileLength() + " at offset " + in.getPos()));
        while (length > 0) {
            int cnt = in.read(buffer, offset, length);
            Assert.assertTrue((String)"Error in read", (cnt > 0 ? 1 : 0) != 0);
            offset += cnt;
            length -= cnt;
        }
        for (int i = 0; i < length; ++i) {
            byte actual = buffer[i];
            byte expect = authenticData[(int)pos + i];
            Assert.assertEquals((String)("Read data mismatch at file offset " + (pos + (long)i) + ". Expects " + expect + "; got " + actual), (long)actual, (long)expect);
        }
    }

    @Test
    public void testSocketCache() throws IOException {
        int CACHE_SIZE = 4;
        SocketCache cache = new SocketCache(4);
        InetSocketAddress nnAddr = new InetSocketAddress("localhost", cluster.getNameNodePort());
        DFSClient client = new DFSClient(nnAddr, conf);
        LocatedBlock block = (LocatedBlock)client.getNamenode().getBlockLocations(testFile.toString(), 0L, 12288L).getLocatedBlocks().get(0);
        DataNode dn = util.getDataNode(block);
        InetSocketAddress dnAddr = dn.getXferAddress();
        Socket[] dnSockets = new Socket[4];
        for (int i = 0; i < dnSockets.length; ++i) {
            dnSockets[i] = client.socketFactory.createSocket(dnAddr.getAddress(), dnAddr.getPort());
        }
        Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort());
        cache.put(nnSock, null);
        Assert.assertSame((String)"Read the write", (Object)nnSock, (Object)cache.get((SocketAddress)nnAddr).sock);
        cache.put(nnSock, null);
        for (Socket dnSock : dnSockets) {
            cache.put(dnSock, null);
        }
        Assert.assertEquals((String)"NN socket evicted", null, (Object)cache.get((SocketAddress)nnAddr));
        Assert.assertTrue((String)"Evicted socket closed", (boolean)nnSock.isClosed());
        for (Socket dnSock : dnSockets) {
            Assert.assertEquals((String)"Retrieve cached sockets", (Object)dnSock, (Object)cache.get((SocketAddress)dnAddr).sock);
            dnSock.close();
        }
        Assert.assertEquals((String)"Cache is empty", (long)0L, (long)cache.size());
    }

    @Test
    public void testReadFromOneDN() throws IOException {
        LOG.info((Object)"Starting testReadFromOneDN()");
        DFSClient client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
        DFSInputStream in = (DFSInputStream)Mockito.spy((Object)client.open(testFile.toString()));
        LOG.info((Object)("opened " + testFile.toString()));
        byte[] dataBuf = new byte[4096];
        MockGetBlockReader answer = new MockGetBlockReader();
        ((DFSInputStream)Mockito.doAnswer((Answer)answer).when((Object)in)).getBlockReader((InetSocketAddress)Matchers.anyObject(), (DatanodeInfo)Matchers.anyObject(), Matchers.anyString(), (ExtendedBlock)Matchers.anyObject(), (Token)Matchers.anyObject(), Matchers.anyLong(), Matchers.anyLong(), Matchers.anyInt(), Matchers.anyBoolean(), Matchers.anyString());
        this.pread(in, 0L, dataBuf, 0, dataBuf.length);
        this.pread(in, 12288 - dataBuf.length, dataBuf, 0, dataBuf.length);
        this.pread(in, 1024L, dataBuf, 0, dataBuf.length);
        this.pread(in, -1L, dataBuf, 0, dataBuf.length);
        this.pread(in, 64L, dataBuf, 0, dataBuf.length / 2);
        in.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDisableCache() throws IOException {
        LOG.info((Object)"Starting testDisableCache()");
        DFSTestUtil.readFile(fs, testFile);
        Assert.assertEquals((long)1L, (long)((DistributedFileSystem)TestConnCache.fs).dfs.socketCache.size());
        Configuration confWithoutCache = new Configuration(fs.getConf());
        confWithoutCache.setInt("dfs.client.socketcache.capacity", 0);
        FileSystem fsWithoutCache = FileSystem.newInstance((Configuration)confWithoutCache);
        try {
            DFSTestUtil.readFile(fsWithoutCache, testFile);
            Assert.assertEquals((long)0L, (long)((DistributedFileSystem)fsWithoutCache).dfs.socketCache.size());
        }
        finally {
            fsWithoutCache.close();
        }
    }

    @AfterClass
    public static void teardownCluster() throws Exception {
        util.shutdown();
    }

    private class MockGetBlockReader
    implements Answer<RemoteBlockReader2> {
        public RemoteBlockReader2 reader = null;
        private Socket sock = null;

        private MockGetBlockReader() {
        }

        public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable {
            RemoteBlockReader2 prevReader = this.reader;
            this.reader = (RemoteBlockReader2)invocation.callRealMethod();
            if (this.sock == null) {
                this.sock = this.reader.dnSock;
            } else if (prevReader != null) {
                Assert.assertSame((String)"DFSInputStream should use the same socket", (Object)this.sock, (Object)this.reader.dnSock);
            }
            return this.reader;
        }
    }
}

