/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.HashSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.QuorumEngine;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.log4j.Logger;

public class BookieHandle
extends Thread {
    static Logger LOG = Logger.getLogger(BookieClient.class);
    volatile boolean stop = false;
    boolean noreception = false;
    private BookieClient client;
    InetSocketAddress addr;
    static int recvTimeout = 2000;
    private ArrayBlockingQueue<ToSend> incomingQueue;
    private int refCount = 0;
    HashSet<LedgerHandle> ledgers;
    Mac mac = null;

    BookieHandle(InetSocketAddress addr, boolean enabled) throws IOException {
        this.stop = !enabled;
        this.noreception = !enabled;
        this.client = !this.stop ? new BookieClient(addr, recvTimeout) : null;
        this.addr = addr;
        this.incomingQueue = new ArrayBlockingQueue(2000);
        this.ledgers = new HashSet();
    }

    void restart() throws IOException {
        this.client = new BookieClient(this.addr, recvTimeout);
    }

    public synchronized void sendAdd(LedgerHandle lh, QuorumEngine.SubOp.SubAddOp r, long entry) throws IOException, BKException {
        block4: {
            try {
                if (!this.noreception) {
                    ToSend ts = new ToSend(lh, r, entry);
                    if (!this.incomingQueue.offer(ts, 1000L, TimeUnit.MILLISECONDS)) {
                        throw BKException.create(-8);
                    }
                    break block4;
                }
                throw BKException.create(-8);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting for room in the incoming queue");
            }
        }
    }

    private synchronized void sendStop() {
        try {
            this.noreception = true;
            LOG.debug((Object)"Sending stop signal");
            this.incomingQueue.put(new ToSend(null, new QuorumEngine.SubOp.SubStopOp(new QuorumEngine.Operation.StopOp()), -1L));
            LOG.debug((Object)"Sent stop signal");
        }
        catch (InterruptedException e) {
            LOG.fatal((Object)"Interrupted while sending stop signal to bookie handle");
        }
    }

    Mac getMac(byte[] macKey, String alg) throws NoSuchAlgorithmException, InvalidKeyException {
        if (this.mac == null) {
            this.mac = Mac.getInstance(alg);
            this.mac.init(new SecretKeySpec(macKey, "HmacSHA1"));
        }
        return this.mac;
    }

    public synchronized void sendRead(LedgerHandle lh, QuorumEngine.SubOp.SubReadOp r, long entry) throws IOException, BKException {
        block4: {
            try {
                if (!this.noreception) {
                    ToSend ts = new ToSend(lh, r, entry);
                    if (!this.incomingQueue.offer(ts, 1000L, TimeUnit.MILLISECONDS)) {
                        throw BKException.create(-8);
                    }
                    break block4;
                }
                throw BKException.create(-8);
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting for room in the incoming queue");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (!this.stop) {
                ToSend ts = this.incomingQueue.poll(1000L, TimeUnit.MILLISECONDS);
                if (ts != null) {
                    LedgerHandle self = ts.lh;
                    switch (ts.type) {
                        case 2: {
                            LOG.info((Object)("Stopping BookieHandle: " + this.addr));
                            this.client.errorOut();
                            this.cleanQueue();
                            LOG.debug((Object)"Stopped");
                            break;
                        }
                        case 1: {
                            ByteBuffer extendedData;
                            QuorumEngine.SubOp.SubAddOp aOp = (QuorumEngine.SubOp.SubAddOp)ts.ctx;
                            QuorumEngine.Operation.AddOp op = (QuorumEngine.Operation.AddOp)aOp.op;
                            long confirmed = self.getAddConfirmed();
                            if (self.getQMode() == LedgerHandle.QMode.VERIFIABLE) {
                                extendedData = ByteBuffer.allocate(op.data.length + 28 + 16);
                                extendedData.putLong(self.getId());
                                extendedData.putLong(ts.entry);
                                extendedData.putLong(confirmed);
                                extendedData.put(op.data);
                                extendedData.rewind();
                                byte[] toProcess = new byte[op.data.length + 24];
                                extendedData.get(toProcess, 0, op.data.length + 24);
                                extendedData.position(extendedData.capacity() - 20);
                                if (this.mac == null) {
                                    this.getMac(self.getMacKey(), "HmacSHA1");
                                }
                                extendedData.put(this.mac.doFinal(toProcess));
                                extendedData.position(16);
                            } else {
                                extendedData = ByteBuffer.allocate(op.data.length + 8);
                                extendedData.putLong(confirmed);
                                extendedData.put(op.data);
                                extendedData.flip();
                            }
                            this.client.addEntry(self.getId(), self.getLedgerKey(), ts.entry, extendedData, aOp.wcb, ts.ctx);
                            break;
                        }
                        case 0: {
                            if (this.client != null) {
                                this.client.readEntry(self.getId(), ts.entry, ((QuorumEngine.SubOp.SubReadOp)ts.ctx).rcb, ts.ctx);
                                break;
                            }
                            ((QuorumEngine.SubOp.SubReadOp)ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
                        }
                    }
                    continue;
                }
                LOG.debug((Object)("Empty queue: " + this.addr));
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Handling exception before halting BookieHandle", (Throwable)e);
            for (LedgerHandle lh : this.ledgers) {
                lh.removeBookie(this);
            }
            BookieHandle bookieHandle = this;
            synchronized (bookieHandle) {
                this.noreception = true;
            }
            this.client.halt();
            this.client.errorOut();
            this.cleanQueue();
        }
        LOG.info((Object)("Exiting bookie handle thread: " + this.addr));
    }

    int incRefCount(LedgerHandle lh) {
        this.ledgers.add(lh);
        return ++this.refCount;
    }

    synchronized int halt(LedgerHandle lh) {
        LOG.info((Object)"Calling halt");
        this.ledgers.remove(lh);
        int currentCount = --this.refCount;
        if (currentCount <= 0) {
            this.shutdown();
        }
        if (currentCount < 0) {
            LOG.warn((Object)("Miscalculated the number of reference counts: " + this.addr));
        }
        return currentCount;
    }

    public synchronized int halt() {
        if (!this.stop) {
            LOG.info((Object)"Calling halt");
            for (LedgerHandle lh : this.ledgers) {
                lh.removeBookie(this);
            }
            this.refCount = 0;
            this.shutdown();
        }
        return this.refCount;
    }

    public void shutdown() {
        if (!this.stop) {
            LOG.info((Object)"Calling shutdown");
            LOG.debug((Object)"Halting client");
            this.client.halt();
            LOG.debug((Object)"Cleaning queue");
            this.sendStop();
            LOG.debug((Object)"Finished shutdown");
        }
    }

    private void cleanQueue() {
        this.stop = true;
        ToSend ts = this.incomingQueue.poll();
        while (ts != null) {
            switch (ts.type) {
                case 1: {
                    QuorumEngine.SubOp.SubAddOp aOp = (QuorumEngine.SubOp.SubAddOp)ts.ctx;
                    aOp.wcb.writeComplete(-1, ts.lh.getId(), ts.entry, ts.ctx);
                    break;
                }
                case 0: {
                    ((QuorumEngine.SubOp.SubReadOp)ts.ctx).rcb.readEntryComplete(-1, ts.lh.getId(), ts.entry, null, ts.ctx);
                }
            }
            ts = this.incomingQueue.poll();
        }
    }

    boolean isEnabled() {
        return !this.stop;
    }

    private static class ToSend {
        LedgerHandle lh;
        long entry = -1L;
        Object ctx;
        int type;

        ToSend(LedgerHandle lh, QuorumEngine.SubOp sop, long entry) {
            this.lh = lh;
            this.type = sop.op.type;
            this.entry = entry;
            this.ctx = sop;
        }
    }
}

