/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.bucket;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.kv.AbstractKeyValueRequest;
import com.couchbase.client.core.message.kv.BinaryRequest;
import com.couchbase.client.core.message.kv.GetRequest;
import com.couchbase.client.core.message.kv.GetResponse;
import com.couchbase.client.core.message.kv.ReplicaGetRequest;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.ReplicaMode;
import com.couchbase.client.java.error.CouchbaseOutOfMemoryException;
import com.couchbase.client.java.error.TemporaryFailureException;
import java.util.ArrayList;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;

@InterfaceStability.Uncommitted
@InterfaceAudience.Private
public class ReplicaReader {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(ReplicaReader.class);

    public static Observable<GetResponse> read(final ClusterFacade core, String id, ReplicaMode type, String bucket) {
        return ReplicaReader.assembleRequests(core, id, type, bucket).flatMap(new Func1<BinaryRequest, Observable<GetResponse>>(){

            @Override
            public Observable<GetResponse> call(BinaryRequest request) {
                return core.send(request).filter(GetResponseFilter.INSTANCE).onErrorResumeNext(GetResponseErrorHandler.INSTANCE);
            }
        });
    }

    private static Observable<BinaryRequest> assembleRequests(final ClusterFacade core, final String id, ReplicaMode type, final String bucket) {
        if (type != ReplicaMode.ALL) {
            return Observable.just(new ReplicaGetRequest(id, bucket, (short)type.ordinal()));
        }
        return Observable.defer(new Func0<Observable<GetClusterConfigResponse>>(){

            @Override
            public Observable<GetClusterConfigResponse> call() {
                return core.send(new GetClusterConfigRequest());
            }
        }).map(new Func1<GetClusterConfigResponse, Integer>(){

            @Override
            public Integer call(GetClusterConfigResponse response) {
                CouchbaseBucketConfig conf = (CouchbaseBucketConfig)response.config().bucketConfig(bucket);
                return conf.numberOfReplicas();
            }
        }).flatMap(new Func1<Integer, Observable<BinaryRequest>>(){

            @Override
            public Observable<BinaryRequest> call(Integer max) {
                ArrayList<AbstractKeyValueRequest> requests = new ArrayList<AbstractKeyValueRequest>();
                requests.add(new GetRequest(id, bucket));
                for (int i = 0; i < max; ++i) {
                    requests.add(new ReplicaGetRequest(id, bucket, (short)(i + 1)));
                }
                return Observable.from(requests);
            }
        });
    }

    private static class GetResponseErrorHandler
    implements Func1<Throwable, Observable<? extends GetResponse>> {
        public static GetResponseErrorHandler INSTANCE = new GetResponseErrorHandler();

        private GetResponseErrorHandler() {
        }

        @Override
        public Observable<? extends GetResponse> call(Throwable throwable) {
            LOGGER.info("Individual ReplicaGet failed, but ignoring. Reason: {}", (Object)throwable.toString());
            return Observable.empty();
        }
    }

    private static class GetResponseFilter
    implements Func1<GetResponse, Boolean> {
        public static GetResponseFilter INSTANCE = new GetResponseFilter();

        private GetResponseFilter() {
        }

        @Override
        public Boolean call(GetResponse response) {
            if (response.status().isSuccess()) {
                return true;
            }
            ByteBuf content = response.content();
            if (content != null && content.refCnt() > 0) {
                content.release();
            }
            switch (response.status()) {
                case NOT_EXISTS: {
                    return false;
                }
                case TEMPORARY_FAILURE: 
                case SERVER_BUSY: {
                    throw new TemporaryFailureException();
                }
                case OUT_OF_MEMORY: {
                    throw new CouchbaseOutOfMemoryException();
                }
            }
            throw new CouchbaseException(response.status().toString());
        }
    }
}

