/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLogExecutorService;
import org.apache.cassandra.db.commitlog.CommitLogHeader;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.io.DeletionService;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;

public class CommitLog {
    private static volatile int SEGMENT_SIZE = 0x8000000;
    private static final Logger logger = Logger.getLogger(CommitLog.class);
    private final Deque<CommitLogSegment> segments = new ArrayDeque<CommitLogSegment>();
    private final ExecutorService executor = new CommitLogExecutorService();

    public static CommitLog instance() {
        return CLHandle.instance;
    }

    public static void setSegmentSize(int size) {
        SEGMENT_SIZE = size;
    }

    public int getSegmentCount() {
        return this.segments.size();
    }

    private CommitLog() {
        int cfSize = Table.TableMetadata.getColumnFamilyCount();
        this.segments.add(new CommitLogSegment(cfSize));
        if (DatabaseDescriptor.getCommitLogSync() == DatabaseDescriptor.CommitLogSync.periodic) {
            final WrappedRunnable syncer = new WrappedRunnable(){

                @Override
                public void runMayThrow() throws IOException {
                    CommitLog.this.sync();
                }
            };
            new Thread(new Runnable(){

                @Override
                public void run() {
                    try {
                        while (true) {
                            CommitLog.this.executor.submit(syncer).get();
                            Thread.sleep(DatabaseDescriptor.getCommitLogSyncPeriod());
                        }
                    }
                    catch (InterruptedException e) {
                        throw new AssertionError((Object)e);
                    }
                    catch (ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
            }, "PERIODIC-COMMIT-LOG-SYNCER").start();
        }
    }

    public static void recover() throws IOException {
        String directory = DatabaseDescriptor.getLogFileLocation();
        File file = new File(directory);
        Object[] files = file.listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return !name.matches("\\..*");
            }
        });
        if (files.length == 0) {
            return;
        }
        Arrays.sort(files, new FileUtils.FileComparator());
        logger.info((Object)("Replaying " + StringUtils.join((Object[])files, (String)", ")));
        CommitLog.recover((File[])files);
        FileUtils.delete((File[])files);
        logger.info((Object)"Log replay complete");
    }

    /*
     * WARNING - void declaration
     */
    public static void recover(File[] clogs) throws IOException {
        void var5_8;
        HashSet<Table> tablesRecovered = new HashSet<Table>();
        final AtomicInteger counter = new AtomicInteger(0);
        File[] arr$ = clogs;
        int len$ = arr$.length;
        boolean bl = false;
        while (var5_8 < len$) {
            File file = arr$[var5_8];
            int bufferSize = (int)Math.min(file.length(), 0x2000000L);
            BufferedRandomAccessFile reader = new BufferedRandomAccessFile(file.getAbsolutePath(), "r", bufferSize);
            final CommitLogHeader clHeader = CommitLogHeader.readCommitLogHeader(reader);
            int lowPos = CommitLogHeader.getLowestPosition(clHeader);
            if (lowPos == 0) break;
            reader.seek(lowPos);
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Replaying " + file + " starting at " + lowPos));
            }
            while (!reader.isEOF()) {
                long claimedCRC32;
                byte[] bytes;
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Reading mutation at " + reader.getFilePointer()));
                }
                try {
                    bytes = new byte[(int)reader.readLong()];
                    reader.readFully(bytes);
                    claimedCRC32 = reader.readLong();
                }
                catch (EOFException e) {
                    break;
                }
                ByteArrayInputStream bufIn = new ByteArrayInputStream(bytes);
                CRC32 checksum = new CRC32();
                checksum.update(bytes, 0, bytes.length);
                if (claimedCRC32 != checksum.getValue()) continue;
                final RowMutation rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn));
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)String.format("replaying mutation for %s.%s: %s", rm.getTable(), rm.key(), "{" + StringUtils.join(rm.getColumnFamilies(), (String)", ") + "}"));
                }
                final Table table = Table.open(rm.getTable());
                tablesRecovered.add(table);
                final ArrayList<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>(rm.getColumnFamilies());
                final long entryLocation = reader.getFilePointer();
                WrappedRunnable runnable = new WrappedRunnable(){

                    @Override
                    public void runMayThrow() throws IOException {
                        for (ColumnFamily columnFamily : columnFamilies) {
                            int id = table.getColumnFamilyId(columnFamily.name());
                            if (clHeader.isDirty(id) && entryLocation >= (long)clHeader.getPosition(id)) continue;
                            rm.removeColumnFamily(columnFamily);
                        }
                        if (!rm.isEmpty()) {
                            Table.open(rm.getTable()).apply(rm, null, false);
                        }
                        counter.decrementAndGet();
                    }
                };
                counter.incrementAndGet();
                StageManager.getStage("ROW-MUTATION-STAGE").submit(runnable);
            }
            reader.close();
            ++var5_8;
        }
        while (counter.get() > 0) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new AssertionError((Object)e);
            }
        }
        ArrayList futures = new ArrayList();
        for (Table table : tablesRecovered) {
            futures.addAll(table.flush());
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private CommitLogSegment currentSegment() {
        return this.segments.getLast();
    }

    public CommitLogSegment.CommitLogContext getContext() throws IOException {
        Callable<CommitLogSegment.CommitLogContext> task = new Callable<CommitLogSegment.CommitLogContext>(){

            @Override
            public CommitLogSegment.CommitLogContext call() throws Exception {
                return CommitLog.this.currentSegment().getContext();
            }
        };
        try {
            return this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public Future<CommitLogSegment.CommitLogContext> add(RowMutation rowMutation, Object serializedRow) throws IOException {
        LogRecordAdder task = new LogRecordAdder(rowMutation, serializedRow);
        return this.executor.submit(task);
    }

    public void discardCompletedSegments(final String tableName, final String cf, final CommitLogSegment.CommitLogContext context) throws IOException {
        Callable task = new Callable(){

            public Object call() throws IOException {
                int id = Table.open(tableName).getColumnFamilyId(cf);
                CommitLog.this.discardCompletedSegmentsInternal(context, id);
                return null;
            }
        };
        try {
            this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void discardCompletedSegmentsInternal(CommitLogSegment.CommitLogContext context, int id) throws IOException {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("discard completed log segments for " + context + ", column family " + id + ". CFIDs are " + Table.TableMetadata.getColumnFamilyIDString()));
        }
        assert (context.position >= (long)context.getSegment().getHeader().getPosition(id)) : "discard at " + context + " is not after last flush at " + context.getSegment().getHeader().getPosition(id);
        Iterator<CommitLogSegment> iter = this.segments.iterator();
        while (iter.hasNext()) {
            CommitLogSegment segment = iter.next();
            CommitLogHeader header = segment.getHeader();
            if (segment.equals(context.getSegment())) {
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Marking replay position " + context.position + " on commit log " + segment));
                }
                header.turnOn(id, context.position);
                segment.writeHeader();
                break;
            }
            header.turnOff(id);
            if (header.isSafeToDelete()) {
                logger.info((Object)("Discarding obsolete commit log:" + segment));
                segment.close();
                DeletionService.submitDelete(segment.getPath());
                iter.remove();
                continue;
            }
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Not safe to delete commit log " + segment + "; dirty is " + header.dirtyString()));
            }
            segment.writeHeader();
        }
    }

    void sync() throws IOException {
        this.currentSegment().sync();
    }

    public void forceNewSegment() {
        Callable task = new Callable(){

            public Object call() throws Exception {
                CommitLog.this.sync();
                CommitLog.this.segments.add(new CommitLogSegment(CommitLog.this.currentSegment().getHeader().getColumnFamilyCount()));
                return null;
            }
        };
        try {
            this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    class LogRecordAdder
    implements Callable<CommitLogSegment.CommitLogContext> {
        final RowMutation rowMutation;
        final Object serializedRow;

        LogRecordAdder(RowMutation rm, Object serializedRow) {
            this.rowMutation = rm;
            this.serializedRow = serializedRow;
        }

        @Override
        public CommitLogSegment.CommitLogContext call() throws Exception {
            CommitLogSegment.CommitLogContext context = CommitLog.this.currentSegment().write(this.rowMutation, this.serializedRow);
            if (CommitLog.this.currentSegment().length() >= (long)SEGMENT_SIZE) {
                CommitLog.this.sync();
                CommitLog.this.segments.add(new CommitLogSegment(CommitLog.this.currentSegment().getHeader().getColumnFamilyCount()));
            }
            return context;
        }
    }

    private static class CLHandle {
        public static final CommitLog instance = new CommitLog();

        private CLHandle() {
        }
    }
}

