package voldemort.store.readonly.disk;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.cluster.Cluster;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.disk.KeyValueWriter;
import voldemort.utils.ByteUtils;
import voldemort.xml.ClusterMapper;
import voldemort.xml.StoreDefinitionsMapper;

/* loaded from: input_file:voldemort/store/readonly/disk/HadoopStoreWriterPerBucket.class */
public class HadoopStoreWriterPerBucket implements KeyValueWriter<BytesWritable, BytesWritable> {
    private static final Logger logger = Logger.getLogger(HadoopStoreWriterPerBucket.class);
    private int[] position;
    private Path[] taskIndexFileName;
    private Path[] taskValueFileName;
    private JobConf conf;
    private CheckSum.CheckSumType checkSumType;
    private CheckSum[] checkSumDigestIndex;
    private CheckSum[] checkSumDigestValue;
    private String outputDir;
    private FileSystem fs;
    private int numChunks;
    private Cluster cluster;
    private StoreDefinition storeDef;
    private boolean saveKeys;
    private boolean reducerPerBucket;
    private DataOutputStream[] indexFileStream = null;
    private DataOutputStream[] valueFileStream = null;
    private String taskId = null;
    private int nodeId = -1;
    private int partitionId = -1;
    private int replicaType = -1;

    @Override // voldemort.store.readonly.disk.KeyValueWriter
    public void conf(JobConf jobConf) {
        try {
            this.cluster = new ClusterMapper().readCluster(new StringReader(jobConf.get("cluster.xml")));
            List readStoreList = new StoreDefinitionsMapper().readStoreList(new StringReader(jobConf.get("stores.xml")));
            if (readStoreList.size() != 1) {
                throw new IllegalStateException("Expected to find only a single store, but found multiple!");
            }
            this.storeDef = (StoreDefinition) readStoreList.get(0);
            this.numChunks = jobConf.getInt("num.chunks", -1);
            if (this.numChunks < 1) {
                throw new VoldemortException("num.chunks not specified in the job conf.");
            }
            this.saveKeys = jobConf.getBoolean("save.keys", false);
            this.reducerPerBucket = jobConf.getBoolean("reducer.per.bucket", false);
            this.conf = jobConf;
            this.outputDir = jobConf.get("final.output.dir");
            this.taskId = jobConf.get("mapred.task.id");
            this.checkSumType = CheckSum.fromString(jobConf.get("checksum.type"));
            this.checkSumDigestIndex = new CheckSum[getNumChunks()];
            this.checkSumDigestValue = new CheckSum[getNumChunks()];
            this.position = new int[getNumChunks()];
            this.taskIndexFileName = new Path[getNumChunks()];
            this.taskValueFileName = new Path[getNumChunks()];
            this.indexFileStream = new DataOutputStream[getNumChunks()];
            this.valueFileStream = new DataOutputStream[getNumChunks()];
            for (int i = 0; i < getNumChunks(); i++) {
                this.checkSumDigestIndex[i] = CheckSum.getInstance(this.checkSumType);
                this.checkSumDigestValue[i] = CheckSum.getInstance(this.checkSumType);
                this.position[i] = 0;
                this.taskIndexFileName[i] = new Path(FileOutputFormat.getOutputPath(jobConf), getStoreName() + "." + Integer.toString(i) + "_" + this.taskId + ".index");
                this.taskValueFileName[i] = new Path(FileOutputFormat.getOutputPath(jobConf), getStoreName() + "." + Integer.toString(i) + "_" + this.taskId + ".data");
                if (this.fs == null) {
                    this.fs = this.taskIndexFileName[i].getFileSystem(jobConf);
                }
                this.indexFileStream[i] = this.fs.create(this.taskIndexFileName[i]);
                this.fs.setPermission(this.taskIndexFileName[i], new FsPermission((short) 493));
                logger.info("Setting permission to 755 for " + this.taskIndexFileName[i]);
                this.valueFileStream[i] = this.fs.create(this.taskValueFileName[i]);
                this.fs.setPermission(this.taskValueFileName[i], new FsPermission((short) 493));
                logger.info("Setting permission to 755 for " + this.taskValueFileName[i]);
                logger.info("Opening " + this.taskIndexFileName[i] + " and " + this.taskValueFileName[i] + " for writing.");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override // voldemort.store.readonly.disk.KeyValueWriter
    public void write(BytesWritable bytesWritable, Iterator<BytesWritable> it, Reporter reporter) throws IOException {
        int chunk = ReadOnlyUtils.chunk(bytesWritable.get(), getNumChunks());
        this.indexFileStream[chunk].write(bytesWritable.get(), 0, bytesWritable.getSize());
        this.indexFileStream[chunk].writeInt(this.position[chunk]);
        if (this.checkSumDigestIndex[chunk] != null) {
            this.checkSumDigestIndex[chunk].update(bytesWritable.get(), 0, bytesWritable.getSize());
            this.checkSumDigestIndex[chunk].update(this.position[chunk]);
        }
        short s = 0;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
        while (it.hasNext()) {
            BytesWritable next = it.next();
            byte[] bArr = next.get();
            if (this.nodeId == -1) {
                this.nodeId = ByteUtils.readInt(bArr, 0);
            }
            int i = 0 + 4;
            if (this.partitionId == -1) {
                this.partitionId = ByteUtils.readInt(bArr, i);
            }
            int i2 = i + 4;
            if (getSaveKeys()) {
                if (this.replicaType == -1) {
                    this.replicaType = (int) ByteUtils.readBytes(bArr, i2, 1);
                }
                i2++;
            }
            int size = next.getSize() - i2;
            if (getSaveKeys()) {
                dataOutputStream.write(bArr, i2, size);
            } else {
                dataOutputStream.writeInt(size);
                dataOutputStream.write(bArr, i2, size);
            }
            s = (short) (s + 1);
            if (!getSaveKeys() && s > 1) {
                throw new VoldemortException("Duplicate keys detected for md5 sum " + ByteUtils.toHexString(ByteUtils.copy(bytesWritable.get(), 0, bytesWritable.getSize())));
            }
        }
        if (s < 0) {
            throw new VoldemortException("Found too many collisions: chunk " + chunk + " has exceeded 32767 collisions.");
        }
        if (s > 1) {
            reporter.incrCounter(KeyValueWriter.CollisionCounter.NUM_COLLISIONS, 1L);
            long counter = reporter.getCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS).getCounter();
            if (s > counter) {
                reporter.incrCounter(KeyValueWriter.CollisionCounter.MAX_COLLISIONS, s - counter);
            }
        }
        dataOutputStream.flush();
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        if (getSaveKeys()) {
            this.valueFileStream[chunk].writeShort(s);
            int[] iArr = this.position;
            iArr[chunk] = iArr[chunk] + 2;
            if (this.checkSumDigestValue[chunk] != null) {
                this.checkSumDigestValue[chunk].update(s);
            }
        }
        this.valueFileStream[chunk].write(byteArray);
        int[] iArr2 = this.position;
        iArr2[chunk] = iArr2[chunk] + byteArray.length;
        if (this.checkSumDigestValue[chunk] != null) {
            this.checkSumDigestValue[chunk].update(byteArray);
        }
        if (this.position[chunk] < 0) {
            throw new VoldemortException("Chunk overflow exception: chunk " + chunk + " has exceeded 2147483647 bytes.");
        }
    }

    @Override // voldemort.store.readonly.disk.KeyValueWriter
    public void close() throws IOException {
        for (int i = 0; i < getNumChunks(); i++) {
            this.indexFileStream[i].close();
            this.valueFileStream[i].close();
        }
        if (this.nodeId == -1 || this.partitionId == -1) {
            return;
        }
        if (getSaveKeys() && this.replicaType == -1) {
            throw new RuntimeException("Could not read the replica type correctly for node " + this.nodeId + " ( partition - " + this.partitionId + " )");
        }
        String str = getSaveKeys() ? new String(Integer.toString(this.partitionId) + "_" + Integer.toString(this.replicaType) + "_") : new String(Integer.toString(this.partitionId) + "_");
        Path path = new Path(this.outputDir, "node-" + this.nodeId);
        FileSystem fileSystem = path.getFileSystem(this.conf);
        fileSystem.mkdirs(path);
        fileSystem.setPermission(path, new FsPermission((short) 493));
        logger.info("Setting permission to 755 for " + path);
        for (int i2 = 0; i2 < getNumChunks(); i2++) {
            String str2 = str + Integer.toString(i2);
            if (this.checkSumType != CheckSum.CheckSumType.NONE) {
                if (this.checkSumDigestIndex[i2] == null || this.checkSumDigestValue[i2] == null) {
                    throw new RuntimeException("Failed to open checksum digest for node " + this.nodeId + " ( partition - " + this.partitionId + ", chunk - " + i2 + " )");
                }
                Path path2 = new Path(path, str2 + ".index.checksum");
                Path path3 = new Path(path, str2 + ".data.checksum");
                if (fileSystem.exists(path2)) {
                    fileSystem.delete(path2);
                }
                FSDataOutputStream create = fileSystem.create(path2);
                fileSystem.setPermission(path2, new FsPermission((short) 493));
                create.write(this.checkSumDigestIndex[i2].getCheckSum());
                create.close();
                if (fileSystem.exists(path3)) {
                    fileSystem.delete(path3);
                }
                FSDataOutputStream create2 = fileSystem.create(path3);
                fileSystem.setPermission(path3, new FsPermission((short) 493));
                create2.write(this.checkSumDigestValue[i2].getCheckSum());
                create2.close();
            }
            Path path4 = new Path(path, str2 + ".index");
            Path path5 = new Path(path, str2 + ".data");
            logger.info("Moving " + this.taskIndexFileName[i2] + " to " + path4);
            if (fileSystem.exists(path4)) {
                fileSystem.delete(path4);
            }
            this.fs.rename(this.taskIndexFileName[i2], path4);
            logger.info("Moving " + this.taskValueFileName[i2] + " to " + path5);
            if (fileSystem.exists(path5)) {
                fileSystem.delete(path5);
            }
            this.fs.rename(this.taskValueFileName[i2], path5);
        }
    }

    public Cluster getCluster() {
        checkNotNull(this.cluster);
        return this.cluster;
    }

    public boolean getSaveKeys() {
        return this.saveKeys;
    }

    public boolean getReducerPerBucket() {
        return this.reducerPerBucket;
    }

    public StoreDefinition getStoreDef() {
        checkNotNull(this.storeDef);
        return this.storeDef;
    }

    public String getStoreName() {
        checkNotNull(this.storeDef);
        return this.storeDef.getName();
    }

    private final void checkNotNull(Object obj) {
        if (obj == null) {
            throw new VoldemortException("Not configured yet!");
        }
    }

    public int getNumChunks() {
        return this.numChunks;
    }
}
