package com.couchbase.client.core.endpoint.dcp;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.dcp.DCPMessage;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogRequest;
import com.couchbase.client.core.message.dcp.GetFailoverLogResponse;
import com.couchbase.client.core.message.dcp.StreamCloseRequest;
import com.couchbase.client.core.message.dcp.StreamCloseResponse;
import com.couchbase.client.core.message.dcp.StreamEndMessage;
import com.couchbase.client.core.message.dcp.StreamRequestRequest;
import com.couchbase.client.core.message.dcp.StreamRequestResponse;
import com.couchbase.client.core.message.kv.GetAllMutationTokensRequest;
import com.couchbase.client.core.message.kv.GetAllMutationTokensResponse;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.utils.UnicastAutoReleaseSubject;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.BinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.handler.codec.memcache.binary.DefaultBinaryMemcacheRequest;
import com.couchbase.client.deps.io.netty.util.Attribute;
import com.couchbase.client.deps.io.netty.util.AttributeKey;
import com.couchbase.client.deps.io.netty.util.internal.ConcurrentSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Action2;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

@InterfaceAudience.Public
@Deprecated
@InterfaceStability.Experimental
/* loaded from: input_file:WEB-INF/lib/core-io-1.4.7.jar:com/couchbase/client/core/endpoint/dcp/DCPConnection.class */
public class DCPConnection {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) DCPConnection.class);
    private static final AttributeKey<Integer> CONSUMED_BYTES = AttributeKey.newInstance("CONSUMED_BYTES");
    private static final int MINIMUM_HEADER_SIZE = 24;
    private final SerializedSubject<DCPRequest, DCPRequest> subject;
    private final Set<Short> streams;
    private final ClusterFacade core;
    private final String bucket;
    private final String password;
    private final CoreEnvironment env;
    private final ConcurrentMap<Short, ChannelHandlerContext> contexts;

    public DCPConnection(CoreEnvironment coreEnvironment, ClusterFacade clusterFacade, String str, String str2) {
        this(coreEnvironment, clusterFacade, str, str2, UnicastAutoReleaseSubject.create(coreEnvironment.autoreleaseAfter(), TimeUnit.MILLISECONDS, coreEnvironment.scheduler()).withTraceIdentifier("DCPConnection." + coreEnvironment.dcpConnectionName()).toSerialized());
    }

    public DCPConnection(CoreEnvironment coreEnvironment, ClusterFacade clusterFacade, String str, String str2, SerializedSubject<DCPRequest, DCPRequest> serializedSubject) {
        this.env = coreEnvironment;
        this.core = clusterFacade;
        this.subject = serializedSubject;
        this.bucket = str;
        this.password = str2;
        this.streams = new ConcurrentSet();
        this.contexts = new ConcurrentHashMap();
    }

    public String bucket() {
        return this.bucket;
    }

    public Subject<DCPRequest, DCPRequest> subject() {
        return this.subject;
    }

    public Observable<ResponseStatus> addStream(short s) {
        return addStream(s, 0L, 0L, -1L, 0L, 0L);
    }

    public Observable<ResponseStatus> addStream(final short s, final long j, final long j2, final long j3, final long j4, final long j5) {
        return this.streams.contains(Short.valueOf(s)) ? Observable.just(ResponseStatus.EXISTS) : Observable.defer(new Func0<Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.3
            @Override // rx.functions.Func0, java.util.concurrent.Callable
            public Observable<StreamRequestResponse> call() {
                return DCPConnection.this.core.send(new StreamRequestRequest(s, j, j2, j3, j4, j5, DCPConnection.this.bucket, DCPConnection.this.password, this));
            }
        }).flatMap(new Func1<StreamRequestResponse, Observable<StreamRequestResponse>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.2
            @Override // rx.functions.Func1
            public Observable<StreamRequestResponse> call(StreamRequestResponse streamRequestResponse) {
                long rollbackToSequenceNumber;
                switch (streamRequestResponse.status()) {
                    case RANGE_ERROR:
                        rollbackToSequenceNumber = 0;
                        break;
                    case ROLLBACK:
                        rollbackToSequenceNumber = streamRequestResponse.rollbackToSequenceNumber();
                        break;
                    default:
                        return Observable.just(streamRequestResponse);
                }
                return DCPConnection.this.core.send(new StreamRequestRequest(s, j, rollbackToSequenceNumber, j3, rollbackToSequenceNumber, j5, DCPConnection.this.bucket, DCPConnection.this.password, this));
            }
        }).map(new Func1<StreamRequestResponse, ResponseStatus>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.1
            @Override // rx.functions.Func1
            public ResponseStatus call(StreamRequestResponse streamRequestResponse) {
                if (streamRequestResponse.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.add(Short.valueOf(s));
                }
                return streamRequestResponse.status();
            }
        });
    }

    public Observable<ResponseStatus> removeStream(final short s) {
        return !this.streams.contains(Short.valueOf(s)) ? Observable.just(ResponseStatus.NOT_EXISTS) : Observable.defer(new Func0<Observable<StreamCloseResponse>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.5
            @Override // rx.functions.Func0, java.util.concurrent.Callable
            public Observable<StreamCloseResponse> call() {
                return DCPConnection.this.core.send(new StreamCloseRequest(s, DCPConnection.this.bucket, DCPConnection.this.password));
            }
        }).map(new Func1<StreamCloseResponse, ResponseStatus>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.4
            @Override // rx.functions.Func1
            public ResponseStatus call(StreamCloseResponse streamCloseResponse) {
                if (streamCloseResponse.status() == ResponseStatus.SUCCESS) {
                    DCPConnection.this.streams.remove(Short.valueOf(s));
                }
                return streamCloseResponse.status();
            }
        });
    }

    public Observable<MutationToken> getCurrentState() {
        return this.core.send(new GetClusterConfigRequest()).flatMap(new Func1<GetClusterConfigResponse, Observable<NodeInfo>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.12
            @Override // rx.functions.Func1
            public Observable<NodeInfo> call(GetClusterConfigResponse getClusterConfigResponse) {
                return Observable.from(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(DCPConnection.this.bucket)).nodes());
            }
        }).filter(new Func1<NodeInfo, Boolean>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.11
            @Override // rx.functions.Func1
            public Boolean call(NodeInfo nodeInfo) {
                return Boolean.valueOf(nodeInfo.services().containsKey(ServiceType.DCP) || nodeInfo.sslServices().containsKey(ServiceType.DCP));
            }
        }).flatMap(new Func1<NodeInfo, Observable<GetAllMutationTokensResponse>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.10
            @Override // rx.functions.Func1
            public Observable<GetAllMutationTokensResponse> call(NodeInfo nodeInfo) {
                return DCPConnection.this.core.send(new GetAllMutationTokensRequest(nodeInfo.hostname(), DCPConnection.this.bucket));
            }
        }).collect(new Func0<Map<Integer, MutationToken>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.8
            @Override // rx.functions.Func0, java.util.concurrent.Callable
            public Map<Integer, MutationToken> call() {
                return new HashMap(1024);
            }
        }, new Action2<Map<Integer, MutationToken>, GetAllMutationTokensResponse>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.9
            @Override // rx.functions.Action2
            public void call(Map<Integer, MutationToken> map, GetAllMutationTokensResponse getAllMutationTokensResponse) {
                for (MutationToken mutationToken : getAllMutationTokensResponse.mutationTokens()) {
                    int vbucketID = (int) mutationToken.vbucketID();
                    MutationToken mutationToken2 = map.get(Integer.valueOf(vbucketID));
                    MutationToken mutationToken3 = mutationToken;
                    if (mutationToken2 != null && mutationToken2.sequenceNumber() != mutationToken.sequenceNumber()) {
                        if (mutationToken3.sequenceNumber() < mutationToken2.sequenceNumber()) {
                            mutationToken3 = mutationToken2;
                        }
                        DCPConnection.LOGGER.debug("nodes are not agree on sequence number for vbucket {}: old={}, new={}, selected={}", Long.valueOf(mutationToken.vbucketID()), Long.valueOf(mutationToken2.sequenceNumber()), Long.valueOf(mutationToken.sequenceNumber()), Long.valueOf(mutationToken3.sequenceNumber()));
                    }
                    map.put(Integer.valueOf(vbucketID), mutationToken3);
                }
            }
        }).flatMap(new Func1<Map<Integer, MutationToken>, Observable<MutationToken>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.7
            @Override // rx.functions.Func1
            public Observable<MutationToken> call(Map<Integer, MutationToken> map) {
                return Observable.from(map.values());
            }
        }).flatMap(new Func1<MutationToken, Observable<MutationToken>>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.6
            @Override // rx.functions.Func1
            public Observable<MutationToken> call(final MutationToken mutationToken) {
                return DCPConnection.this.core.send(new GetFailoverLogRequest((short) mutationToken.vbucketID(), DCPConnection.this.bucket)).map(new Func1<GetFailoverLogResponse, MutationToken>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.6.1
                    @Override // rx.functions.Func1
                    public MutationToken call(GetFailoverLogResponse getFailoverLogResponse) {
                        return new MutationToken(getFailoverLogResponse.partition(), getFailoverLogResponse.failoverLog().get(0).vbucketUUID(), mutationToken.sequenceNumber(), DCPConnection.this.bucket);
                    }
                });
            }
        });
    }

    public void consumed(DCPMessage dCPMessage) {
        consumed(dCPMessage.partition(), dCPMessage.totalBodyLength());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void consumed(short s, int i) {
        ChannelHandlerContext channelHandlerContext;
        if (this.env.dcpConnectionBufferSize() <= 0 || (channelHandlerContext = this.contexts.get(Short.valueOf(s))) == null) {
            return;
        }
        synchronized (channelHandlerContext) {
            Attribute attr = channelHandlerContext.attr(CONSUMED_BYTES);
            Integer num = (Integer) attr.get();
            if (num == null) {
                num = 0;
            }
            Integer valueOf = Integer.valueOf(num.intValue() + 24 + i);
            if (valueOf.intValue() >= this.env.dcpConnectionBufferSize() * this.env.dcpConnectionBufferAckThreshold()) {
                channelHandlerContext.writeAndFlush(createBufferAcknowledgmentRequest(channelHandlerContext, valueOf.intValue()));
                valueOf = 0;
            }
            attr.set(valueOf);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void streamClosed(short s, StreamEndMessage.Reason reason) {
        this.streams.remove(Short.valueOf(s));
    }

    private Observable<Integer> partitionSize() {
        return this.core.send(new GetClusterConfigRequest()).map(new Func1<GetClusterConfigResponse, Integer>() { // from class: com.couchbase.client.core.endpoint.dcp.DCPConnection.13
            @Override // rx.functions.Func1
            public Integer call(GetClusterConfigResponse getClusterConfigResponse) {
                return Integer.valueOf(((CouchbaseBucketConfig) getClusterConfigResponse.config().bucketConfig(DCPConnection.this.bucket)).numberOfPartitions());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerContext(short s, ChannelHandlerContext channelHandlerContext) {
        this.contexts.put(Short.valueOf(s), channelHandlerContext);
    }

    private BinaryMemcacheRequest createBufferAcknowledgmentRequest(ChannelHandlerContext channelHandlerContext, int i) {
        ByteBuf writeInt = channelHandlerContext.alloc().buffer(4).writeInt(i);
        DefaultBinaryMemcacheRequest defaultBinaryMemcacheRequest = new DefaultBinaryMemcacheRequest(new byte[0], writeInt);
        defaultBinaryMemcacheRequest.setOpcode((byte) 93);
        defaultBinaryMemcacheRequest.setExtrasLength((byte) writeInt.readableBytes());
        defaultBinaryMemcacheRequest.setTotalBodyLength(writeInt.readableBytes());
        return defaultBinaryMemcacheRequest;
    }
}
