package voldemort.store.readonly.mr.azkaban;

import azkaban.common.jobs.AbstractJob;
import azkaban.common.utils.Props;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.serialization.SerializerDefinition;
import voldemort.serialization.json.JsonTypeDefinition;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.mr.azkaban.VoldemortStoreBuilderJob;
import voldemort.store.readonly.mr.utils.HadoopUtils;
import voldemort.store.readonly.mr.utils.JsonSchema;
import voldemort.store.readonly.mr.utils.VoldemortUtils;
import voldemort.store.readonly.swapper.AdminStoreSwapper;
import voldemort.utils.Pair;

/* loaded from: input_file:voldemort/store/readonly/mr/azkaban/VoldemortMultiStoreBuildAndPushJob.class */
public class VoldemortMultiStoreBuildAndPushJob extends AbstractJob {
    private final Logger log;
    private final Props props;
    private List<String> storeNames;
    private final List<String> clusterUrls;
    private final HashMap<String, Path> inputDirsPerStore;
    private final Path outputDir;
    private final int nodeId;

    private Path getPath(String str) throws IOException {
        return HadoopUtils.getSanitizedPath(new Path(str));
    }

    public VoldemortMultiStoreBuildAndPushJob(String str, Props props) throws IOException {
        super(str);
        this.props = props;
        this.log = Logger.getLogger(str);
        this.nodeId = props.getInt("check.node", 0);
        List<String> commaSeparatedStringValues = VoldemortUtils.getCommaSeparatedStringValues(props.getString("build.input.path"), "input directory");
        this.storeNames = VoldemortUtils.getCommaSeparatedStringValues(props.getString("push.store.name"), "store name");
        if (this.storeNames.size() != commaSeparatedStringValues.size()) {
            throw new RuntimeException("Number of stores ( " + this.storeNames.size() + " ) is not equal to number of input directories ( " + commaSeparatedStringValues.size() + " )");
        }
        this.inputDirsPerStore = Maps.newHashMap();
        int i = 0;
        Iterator<String> it = commaSeparatedStringValues.iterator();
        while (it.hasNext()) {
            this.inputDirsPerStore.put(this.storeNames.get(i), getPath(it.next()));
            i++;
        }
        this.outputDir = getPath(props.getString("build.output.dir", "/tmp/voldemort-build-and-push-temp-" + new Random().nextLong()));
        this.log.info("Storing output of all push jobs in " + this.outputDir);
        this.clusterUrls = VoldemortUtils.getCommaSeparatedStringValues(props.getString("push.cluster"), "cluster urls");
    }

    public long sizeOfPath(FileSystem fileSystem, Path path) throws IOException {
        long j;
        long len;
        long j2 = 0;
        FileStatus[] listStatus = fileSystem.listStatus(path);
        if (listStatus != null) {
            for (FileStatus fileStatus : listStatus) {
                if (fileStatus.isDir()) {
                    j = j2;
                    len = sizeOfPath(fileSystem, fileStatus.getPath());
                } else {
                    j = j2;
                    len = fileStatus.getLen();
                }
                j2 = j + len;
            }
        }
        return j2;
    }

    /* JADX WARN: Finally extract failed */
    public void run() throws Exception {
        AdminClient adminClient;
        HashMultimap create = HashMultimap.create();
        final FileSystem fileSystem = this.outputDir.getFileSystem(new Configuration());
        TreeMap newTreeMap = Maps.newTreeMap();
        for (String str : this.storeNames) {
            newTreeMap.put(Long.valueOf(sizeOfPath(fileSystem, this.inputDirsPerStore.get(str))), str);
        }
        this.log.info("Store names along with their input file sizes - " + newTreeMap);
        this.storeNames = Lists.newArrayList(newTreeMap.values());
        Collections.reverse(this.storeNames);
        this.log.info("Store names in the order of which we'll run build and push - " + this.storeNames);
        final long parseLong = this.props.containsKey("push.version.timestamp") ? Long.parseLong(new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())) : this.props.getLong("push.version", -1L);
        HashMap newHashMap = Maps.newHashMap();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        HashMap hashMap = new HashMap();
        HashMap newHashMap2 = Maps.newHashMap();
        ExecutorService executorService = null;
        try {
            executorService = Executors.newFixedThreadPool(this.props.getInt("build.push.parallel", 1));
            for (final String str2 : this.storeNames) {
                for (int i = 0; i < this.clusterUrls.size(); i++) {
                    final String str3 = this.clusterUrls.get(i);
                    newHashMap.put(Pair.create(str3, str2), executorService.submit(new Callable<List<String>>() { // from class: voldemort.store.readonly.mr.azkaban.VoldemortMultiStoreBuildAndPushJob.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public List<String> call() throws Exception {
                            VoldemortMultiStoreBuildAndPushJob.this.log.info("========= Working on build + push phase for store '" + str2 + "' and cluster '" + str3 + "' ==========");
                            AdminClient adminClient2 = null;
                            ExecutorService executorService2 = null;
                            try {
                                AdminClient adminClient3 = new AdminClient(str3, new AdminClientConfig());
                                Pair<StoreDefinition, Cluster> verifySchema = VoldemortMultiStoreBuildAndPushJob.this.verifySchema(str2, str3, (Path) VoldemortMultiStoreBuildAndPushJob.this.inputDirsPerStore.get(str2), adminClient3);
                                concurrentHashMap.put(str3, verifySchema.getSecond());
                                Path path = new Path(VoldemortMultiStoreBuildAndPushJob.this.outputDir + "/" + str2, new URI(str3).getHost());
                                VoldemortMultiStoreBuildAndPushJob.this.log.info("Running build phase for store '" + str2 + "' and url '" + str3 + "'. Reading from input directory '" + VoldemortMultiStoreBuildAndPushJob.this.inputDirsPerStore.get(str2) + "' and writing to " + path);
                                VoldemortMultiStoreBuildAndPushJob.this.runBuildStore((Cluster) verifySchema.getSecond(), (StoreDefinition) verifySchema.getFirst(), (Path) VoldemortMultiStoreBuildAndPushJob.this.inputDirsPerStore.get(str2), path);
                                VoldemortMultiStoreBuildAndPushJob.this.log.info("Finished running build phase for store " + str2 + " and url '" + str3 + "'. Written to directory " + path);
                                long j = parseLong;
                                if (j == -1) {
                                    VoldemortMultiStoreBuildAndPushJob.this.log.info("Retrieving version number for store '" + str2 + "' and cluster '" + str3 + "'");
                                    Map rOMaxVersion = adminClient3.readonlyOps.getROMaxVersion(Lists.newArrayList(new String[]{str2}));
                                    if (rOMaxVersion == null || !rOMaxVersion.containsKey(str2)) {
                                        throw new RuntimeException("Could not retrieve version for store '" + str2 + "'");
                                    }
                                    j = ((Long) rOMaxVersion.get(str2)).longValue() + 1;
                                    VoldemortMultiStoreBuildAndPushJob.this.log.info("Retrieved max version number for store '" + str2 + "' and cluster '" + str3 + "' = " + j);
                                }
                                VoldemortMultiStoreBuildAndPushJob.this.log.info("Running push for cluster url " + str3);
                                ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
                                AdminStoreSwapper adminStoreSwapper = new AdminStoreSwapper((Cluster) verifySchema.getSecond(), newCachedThreadPool, adminClient3, 1000 * VoldemortMultiStoreBuildAndPushJob.this.props.getInt("timeout.seconds", 86400), true, true);
                                String path2 = path.makeQualified(fileSystem).toString();
                                if (!fileSystem.exists(path)) {
                                    throw new RuntimeException("Output directory for store " + str2 + " and cluster '" + str3 + "' - " + path2 + " does not exist");
                                }
                                VoldemortMultiStoreBuildAndPushJob.this.log.info("Pushing data to store '" + str2 + "' on cluster " + str3 + " from path  " + path2 + " with version " + j);
                                List<String> invokeFetch = adminStoreSwapper.invokeFetch(str2, path2, j);
                                VoldemortMultiStoreBuildAndPushJob.this.log.info("Successfully pushed data to store '" + str2 + "' on cluster " + str3 + " from path  " + path2 + " with version " + j);
                                if (newCachedThreadPool != null) {
                                    newCachedThreadPool.shutdownNow();
                                    newCachedThreadPool.awaitTermination(10L, TimeUnit.SECONDS);
                                }
                                if (adminClient3 != null) {
                                    adminClient3.stop();
                                }
                                return invokeFetch;
                            } catch (Throwable th) {
                                if (0 != 0) {
                                    executorService2.shutdownNow();
                                    executorService2.awaitTermination(10L, TimeUnit.SECONDS);
                                }
                                if (0 != 0) {
                                    adminClient2.stop();
                                }
                                throw th;
                            }
                        }
                    }));
                }
            }
            for (String str4 : this.storeNames) {
                for (int i2 = 0; i2 < this.clusterUrls.size(); i2++) {
                    Pair create2 = Pair.create(this.clusterUrls.get(i2), str4);
                    try {
                        hashMap.put(create2, ((Future) newHashMap.get(create2)).get());
                    } catch (Exception e) {
                        newHashMap2.put(create2, e);
                    }
                }
            }
            if (executorService != null) {
                executorService.shutdownNow();
                executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            if (!newHashMap2.isEmpty()) {
                this.log.error("Got an exception during pushes. Deleting data already pushed on successful nodes");
                for (int i3 = 0; i3 < this.clusterUrls.size(); i3++) {
                    String str5 = this.clusterUrls.get(i3);
                    AdminClient adminClient2 = null;
                    try {
                        adminClient2 = new AdminClient((Cluster) concurrentHashMap.get(str5), new AdminClientConfig());
                        for (String str6 : this.storeNames) {
                            Pair create3 = Pair.create(str5, str6);
                            if (hashMap.containsKey(create3)) {
                                List<String> list = (List) hashMap.get(create3);
                                this.log.info("Deleting data for successful pushes to " + str5 + " and store " + str6);
                                int i4 = 0;
                                for (String str7 : list) {
                                    try {
                                        this.log.info("Deleting data ( " + str7 + " ) for successful pushes to '" + str5 + "' and store '" + str6 + "' and node " + i4);
                                        adminClient2.readonlyOps.failedFetchStore(i4, str6, str7);
                                        this.log.info("Successfully deleted data for successful pushes to '" + str5 + "' and store '" + str6 + "' and node " + i4);
                                    } catch (Exception e2) {
                                        this.log.error("Failure while deleting data on node " + i4 + " for store '" + str6 + "' and url '" + str5 + "'");
                                    }
                                    i4++;
                                }
                            }
                        }
                        if (adminClient2 != null) {
                            adminClient2.stop();
                        }
                    } catch (Throwable th) {
                        if (adminClient2 != null) {
                            adminClient2.stop();
                        }
                        throw th;
                    }
                }
                int i5 = 1;
                for (Pair pair : newHashMap2.keySet()) {
                    this.log.error("Error no " + i5 + "] Error pushing for cluster '" + ((String) pair.getFirst()) + "' and store '" + ((String) pair.getSecond()) + "' :", (Throwable) newHashMap2.get(pair));
                    i5++;
                }
                throw new VoldemortException("Exception during build + push");
            }
            if (!this.props.getBoolean("build.output.keep", false)) {
                JobConf jobConf = new JobConf();
                if (this.props.containsKey("hadoop.job.ugi")) {
                    jobConf.set("hadoop.job.ugi", this.props.getString("hadoop.job.ugi"));
                }
                this.log.info("Deleting output directory since we have finished the pushes " + this.outputDir);
                HadoopUtils.deletePathIfExists(jobConf, this.outputDir.toString());
                this.log.info("Successfully deleted output directory since we have finished the pushes" + this.outputDir);
            }
            for (int i6 = 0; i6 < this.clusterUrls.size(); i6++) {
                try {
                    String str8 = this.clusterUrls.get(i6);
                    Cluster cluster = (Cluster) concurrentHashMap.get(str8);
                    adminClient = new AdminClient(cluster, new AdminClientConfig());
                    this.log.info("Swapping all stores on cluster " + str8);
                    try {
                        for (Node node : cluster.getNodes()) {
                            this.log.info("Swapping all stores on cluster " + str8 + " and node " + node.getId());
                            for (String str9 : this.storeNames) {
                                Pair create4 = Pair.create(str8, str9);
                                this.log.info("Swapping '" + str9 + "' store on cluster " + str8 + " and node " + node.getId() + " - " + ((String) ((List) hashMap.get(create4)).get(node.getId())));
                                create.put(create4, Pair.create(Integer.valueOf(node.getId()), adminClient.readonlyOps.swapStore(node.getId(), str9, (String) ((List) hashMap.get(create4)).get(node.getId()))));
                                this.log.info("Successfully swapped '" + str9 + "' store on cluster " + str8 + " and node " + node.getId());
                            }
                        }
                        if (adminClient != null) {
                            adminClient.stop();
                        }
                    } finally {
                    }
                } catch (Exception e3) {
                    this.log.error("Got an exception during swaps. Rolling back data already pushed on successful nodes");
                    for (Pair pair2 : create.keySet()) {
                        Collection<Pair> collection = create.get(pair2);
                        String str10 = (String) pair2.getFirst();
                        Cluster cluster2 = (Cluster) concurrentHashMap.get(str10);
                        this.log.info("Rolling back for cluster " + str10 + " and store  " + ((String) pair2.getSecond()));
                        adminClient = new AdminClient(cluster2, new AdminClientConfig());
                        try {
                            for (Pair pair3 : collection) {
                                this.log.info("Rolling back for cluster " + str10 + " and store " + ((String) pair2.getSecond()) + " and node " + pair3.getFirst() + " to dir " + ((String) pair3.getSecond()));
                                adminClient.readonlyOps.rollbackStore(((Integer) pair3.getFirst()).intValue(), (String) pair3.getSecond(), ReadOnlyUtils.getVersionId(new File((String) pair3.getSecond())));
                                this.log.info("Successfully rolled back for cluster " + str10 + " and store " + ((String) pair2.getSecond()) + " and node " + pair3.getFirst() + " to dir " + ((String) pair3.getSecond()));
                            }
                            if (adminClient != null) {
                                adminClient.stop();
                            }
                        } finally {
                        }
                    }
                    throw e3;
                }
            }
        } catch (Throwable th2) {
            if (executorService != null) {
                executorService.shutdownNow();
                executorService.awaitTermination(10L, TimeUnit.SECONDS);
            }
            throw th2;
        }
    }

    public Pair<StoreDefinition, Cluster> verifySchema(String str, String str2, Path path, AdminClient adminClient) throws IOException {
        JsonSchema schemaFromPath = HadoopUtils.getSchemaFromPath(path);
        int i = this.props.getInt("build.replication.factor." + str, this.props.getInt("build.replication.factor", 2));
        int i2 = this.props.getInt("build.required.reads." + str, this.props.getInt("build.required.reads", 1));
        int i3 = this.props.getInt("build.required.writes." + str, this.props.getInt("build.required.writes", 1));
        int i4 = this.props.getInt("build.preferred.reads." + str, this.props.getInt("build.preferred.reads", -1));
        int i5 = this.props.getInt("build.preferred.writes." + str, this.props.getInt("build.preferred.writes", -1));
        String string = this.props.getString("push.store.description." + str, this.props.getString("push.store.description", ""));
        String string2 = this.props.getString("push.store.owners." + str, this.props.getString("push.store.owners", ""));
        String str3 = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">" + schemaFromPath.getKeyType() + "</schema-info>\n\t";
        String str4 = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">" + schemaFromPath.getValueType() + "</schema-info>\n\t";
        String str5 = "";
        if (this.props.containsKey("build.compress.key." + str) || (this.storeNames.size() == 1 && this.props.containsKey("build.compress.key"))) {
            str5 = "\t<compression><type>gzip</type></compression>\n\t";
            str3 = str3 + str5;
        }
        String str6 = "";
        if (this.props.containsKey("build.compress.value." + str) || (this.storeNames.size() == 1 && this.props.containsKey("build.compress.value"))) {
            str6 = "\t<compression><type>gzip</type></compression>\n\t";
            str4 = str4 + str6;
        }
        if (this.props.containsKey("build.force.schema.key." + str)) {
            str3 = this.props.get("build.force.schema.key." + str);
        }
        if (this.props.containsKey("build.force.schema.value." + str)) {
            str4 = this.props.get("build.force.schema.value." + str);
        }
        if (this.props.containsKey("build.force.schema.key") && this.storeNames.size() == 1) {
            str3 = this.props.get("build.force.schema.key");
        }
        if (this.props.containsKey("build.force.schema.value") && this.storeNames.size() == 1) {
            str4 = this.props.get("build.force.schema.value");
        }
        String storeDefXml = VoldemortUtils.getStoreDefXml(str, i, i2, i3, i4 < 0 ? null : Integer.valueOf(i4), i5 < 0 ? null : Integer.valueOf(i5), str3, str4, string, string2);
        this.log.info("Verifying store: \n" + storeDefXml.toString());
        StoreDefinition storeDef = VoldemortUtils.getStoreDef(storeDefXml);
        this.log.info("Getting store definition from: " + str2 + " ( node id " + this.nodeId + " )");
        boolean z = false;
        Iterator it = ((List) adminClient.metadataMgmtOps.getRemoteStoreDefList(this.nodeId).getValue()).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            StoreDefinition storeDefinition = (StoreDefinition) it.next();
            if (storeDefinition.getName().equals(str)) {
                if (!storeDefinition.equals(storeDef)) {
                    SerializerDefinition keySerializer = storeDef.getKeySerializer();
                    SerializerDefinition valueSerializer = storeDef.getValueSerializer();
                    SerializerDefinition keySerializer2 = storeDefinition.getKeySerializer();
                    SerializerDefinition valueSerializer2 = storeDefinition.getValueSerializer();
                    if (keySerializer2.getName().equals("json") && valueSerializer2.getName().equals("json") && keySerializer2.getAllSchemaInfoVersions().size() == 1 && valueSerializer2.getAllSchemaInfoVersions().size() == 1) {
                        JsonTypeDefinition fromJson = JsonTypeDefinition.fromJson(keySerializer2.getCurrentSchemaInfo());
                        JsonTypeDefinition fromJson2 = JsonTypeDefinition.fromJson(valueSerializer2.getCurrentSchemaInfo());
                        JsonTypeDefinition fromJson3 = JsonTypeDefinition.fromJson(keySerializer.getCurrentSchemaInfo());
                        JsonTypeDefinition fromJson4 = JsonTypeDefinition.fromJson(valueSerializer.getCurrentSchemaInfo());
                        if (!fromJson.equals(fromJson3) || !fromJson2.equals(fromJson4)) {
                            throw new RuntimeException("Your store definition does not match the store definition that is already in the cluster. Tried to resolve identical schemas between local and remote, but failed. Have: " + storeDef + "\nBut expected: " + storeDefinition);
                        }
                        str3 = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">" + keySerializer2.getCurrentSchemaInfo() + "</schema-info>\n\t" + str5;
                        str4 = "\n\t\t<type>json</type>\n\t\t<schema-info version=\"0\">" + valueSerializer2.getCurrentSchemaInfo() + "</schema-info>\n\t" + str6;
                        storeDef = VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(str, i, i2, i3, i4 < 0 ? null : Integer.valueOf(i4), i5 < 0 ? null : Integer.valueOf(i5), str3, str4, string, string2));
                        if (!storeDefinition.equals(storeDef)) {
                            throw new RuntimeException("Your store schema is identical, but the store definition does not match. Have: " + storeDef + "\nBut expected: " + storeDefinition);
                        }
                    }
                }
                z = true;
            }
        }
        if (!z) {
            if (string.length() == 0) {
                throw new RuntimeException("Description field missing in store definition. Please add \"push.store.description\" with a line describing your store");
            }
            if (string2.length() == 0) {
                throw new RuntimeException("Owner field missing in store definition. Please add \"push.store.owners\" with value being comma-separated list of LinkedIn email ids");
            }
            this.log.info("Could not find store " + str + " on Voldemort. Adding it to all nodes for cluster " + str2);
            adminClient.storeMgmtOps.addStore(storeDef);
        }
        return Pair.create(VoldemortUtils.getStoreDef(VoldemortUtils.getStoreDefXml(str, i, i2, i3, i4 < 0 ? null : Integer.valueOf(i4), i5 < 0 ? null : Integer.valueOf(i5), str3, str4, string, string2)), adminClient.getAdminClientCluster());
    }

    public void runBuildStore(Cluster cluster, StoreDefinition storeDefinition, Path path, Path path2) throws Exception {
        new VoldemortStoreBuilderJob(getId() + "-build-store", this.props, new VoldemortStoreBuilderJob.VoldemortStoreBuilderConf(storeDefinition.getReplicationFactor(), this.props.getInt("build.chunk.size." + storeDefinition.getName(), this.props.getInt("build.chunk.size", 1073741824)), new Path(this.props.getString("build.temp.dir", "/tmp/voldemort-build-temp-" + new Random().nextLong())), path2, path, cluster, Lists.newArrayList(new StoreDefinition[]{storeDefinition}), storeDefinition.getName(), this.props.getString("build.key.selection." + storeDefinition.getName(), this.props.getString("build.key.selection", (String) null)), this.props.getString("build.value.selection." + storeDefinition.getName(), this.props.getString("build.key.selection", (String) null)), null, null, CheckSum.fromString(this.props.getString("checksum.type", CheckSum.toString(CheckSum.CheckSumType.MD5))), this.props.getBoolean("save.keys", true), this.props.getBoolean("reducer.per.bucket", false), this.props.getInt("num.chunks." + storeDefinition.getName(), this.props.getInt("num.chunks", -1)))).run();
    }
}
