/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.service;

import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.endpoint.Endpoint;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.CouchbaseRequest;
import com.couchbase.client.core.message.internal.SignalFlush;
import com.couchbase.client.core.retry.RetryHelper;
import com.couchbase.client.core.service.AbstractDynamicService;
import com.couchbase.client.core.service.Service;
import com.couchbase.client.core.state.LifecycleState;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;

public abstract class AbstractOnDemandService
extends AbstractDynamicService {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(Service.class);
    private final List<Endpoint> onDemandEndpoints = new CopyOnWriteArrayList<Endpoint>();
    private volatile boolean disconnect = false;
    private final CoreContext ctx;
    private final String hostname;

    protected AbstractOnDemandService(String hostname, String bucket, String username, String password, int port, CoreContext ctx, Service.EndpointFactory endpointFactory) {
        super(hostname, bucket, username, password, port, ctx, 0, endpointFactory);
        this.ctx = ctx;
        this.hostname = hostname;
    }

    @Override
    protected void dispatch(final CouchbaseRequest request) {
        if (this.disconnect) {
            RetryHelper.retryOrCancel(this.ctx.environment(), request, this.ctx.responseRingBuffer());
            return;
        }
        final Endpoint endpoint = this.createEndpoint();
        this.endpointStates().register(endpoint, endpoint);
        this.onDemandEndpoints.add(endpoint);
        endpoint.connect().subscribe(new Subscriber<LifecycleState>(){

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                request.observable().onError(e);
            }

            @Override
            public void onNext(LifecycleState lifecycleState) {
                if (lifecycleState == LifecycleState.DISCONNECTED) {
                    request.observable().onError(new CouchbaseException("Could not connect endpoint."));
                }
            }
        });
        AbstractOnDemandService.whenState(endpoint, LifecycleState.CONNECTED, new Action1<LifecycleState>(){

            @Override
            public void call(LifecycleState lifecycleState) {
                if (AbstractOnDemandService.this.disconnect) {
                    RetryHelper.retryOrCancel(AbstractOnDemandService.this.ctx.environment(), request, AbstractOnDemandService.this.ctx.responseRingBuffer());
                    LOGGER.debug(AbstractDynamicService.logIdent(AbstractOnDemandService.this.hostname, AbstractOnDemandService.this) + "Initializing disconnect on Endpoint.");
                    endpoint.disconnect().subscribe(new Subscriber<LifecycleState>(){

                        @Override
                        public void onCompleted() {
                        }

                        @Override
                        public void onError(Throwable e) {
                            LOGGER.warn("Error while disconnecting endpoint.", e);
                        }

                        @Override
                        public void onNext(LifecycleState lifecycleState) {
                        }
                    });
                } else {
                    endpoint.send(request);
                    endpoint.send(SignalFlush.INSTANCE);
                }
            }
        });
        AbstractOnDemandService.whenState(endpoint, LifecycleState.DISCONNECTED, new Action1<LifecycleState>(){

            @Override
            public void call(LifecycleState lifecycleState) {
                AbstractOnDemandService.this.endpointStates().deregister(endpoint);
                AbstractOnDemandService.this.onDemandEndpoints.remove(endpoint);
            }
        });
    }

    @Override
    public Observable<LifecycleState> disconnect() {
        this.disconnect = true;
        LOGGER.debug(AbstractOnDemandService.logIdent(this.hostname, this) + "Got instructed to disconnect.");
        return Observable.from(this.onDemandEndpoints).flatMap(new Func1<Endpoint, Observable<LifecycleState>>(){

            @Override
            public Observable<LifecycleState> call(Endpoint endpoint) {
                LOGGER.debug(AbstractDynamicService.logIdent(AbstractOnDemandService.this.hostname, AbstractOnDemandService.this) + "Initializing disconnect on Endpoint.");
                return endpoint.disconnect();
            }
        }).lastOrDefault(LifecycleState.IDLE).map(new Func1<LifecycleState, LifecycleState>(){

            @Override
            public LifecycleState call(LifecycleState state) {
                AbstractOnDemandService.this.endpointStates().terminate();
                return (LifecycleState)((Object)AbstractOnDemandService.this.state());
            }
        });
    }
}

