package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigurationException;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.config.BucketStreamingRequest;
import com.couchbase.client.core.message.config.BucketStreamingResponse;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

/* loaded from: input_file:WEB-INF/lib/core-io-1.2.8.jar:com/couchbase/client/core/config/refresher/HttpRefresher.class */
public class HttpRefresher extends AbstractRefresher {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance((Class<?>) HttpRefresher.class);
    private static final String TERSE_PATH = "/pools/default/bs/";
    private static final String VERBOSE_PATH = "/pools/default/bucketsStreaming/";

    public HttpRefresher(ClusterFacade clusterFacade) {
        super(clusterFacade);
    }

    @Override // com.couchbase.client.core.config.refresher.AbstractRefresher, com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> registerBucket(final String str, final String str2) {
        Observable<BucketStreamingResponse> onErrorResumeNext = super.registerBucket(str, str2).flatMap(new Func1<Boolean, Observable<BucketStreamingResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.2
            @Override // rx.functions.Func1
            public Observable<BucketStreamingResponse> call(Boolean bool) {
                return HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.TERSE_PATH, str, str2)).doOnNext(new Action1<BucketStreamingResponse>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.2.1
                    @Override // rx.functions.Action1
                    public void call(BucketStreamingResponse bucketStreamingResponse) {
                        if (!bucketStreamingResponse.status().isSuccess()) {
                            throw new ConfigurationException("Could not load terse config.");
                        }
                    }
                });
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<BucketStreamingResponse>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.1
            @Override // rx.functions.Func1
            public Observable<BucketStreamingResponse> call(Throwable th) {
                return HttpRefresher.this.cluster().send(new BucketStreamingRequest(HttpRefresher.VERBOSE_PATH, str, str2)).doOnNext(new Action1<BucketStreamingResponse>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.1.1
                    @Override // rx.functions.Action1
                    public void call(BucketStreamingResponse bucketStreamingResponse) {
                        if (!bucketStreamingResponse.status().isSuccess()) {
                            throw new ConfigurationException("Could not load terse config.");
                        }
                    }
                });
            }
        });
        repeatConfigUntilUnsubscribed(str, onErrorResumeNext);
        return onErrorResumeNext.map(new Func1<BucketStreamingResponse, Boolean>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.3
            @Override // rx.functions.Func1
            public Boolean call(BucketStreamingResponse bucketStreamingResponse) {
                return Boolean.valueOf(bucketStreamingResponse.status().isSuccess());
            }
        });
    }

    private void repeatConfigUntilUnsubscribed(final String str, Observable<BucketStreamingResponse> observable) {
        observable.flatMap(new Func1<BucketStreamingResponse, Observable<String>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6
            @Override // rx.functions.Func1
            public Observable<String> call(final BucketStreamingResponse bucketStreamingResponse) {
                HttpRefresher.LOGGER.debug("Config stream started for {} on {}.", str, bucketStreamingResponse.host());
                return bucketStreamingResponse.configs().map(new Func1<String, String>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6.2
                    @Override // rx.functions.Func1
                    public String call(String str2) {
                        return str2.replace("$HOST", bucketStreamingResponse.host());
                    }
                }).doOnCompleted(new Action0() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.6.1
                    @Override // rx.functions.Action0
                    public void call() {
                        HttpRefresher.LOGGER.debug("Config stream ended for {} on {}.", str, bucketStreamingResponse.host());
                    }
                });
            }
        }).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.5
            @Override // rx.functions.Func1
            public Observable<?> call(Observable<? extends Void> observable2) {
                return observable2.flatMap(new Func1<Void, Observable<?>>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.5.1
                    @Override // rx.functions.Func1
                    public Observable<?> call(Void r5) {
                        if (HttpRefresher.this.registrations().containsKey(str)) {
                            HttpRefresher.LOGGER.debug("Resubscribing config stream for bucket {}, still registered.", str);
                            return Observable.just(true);
                        }
                        HttpRefresher.LOGGER.debug("Not resubscribing config stream for bucket {}, not registered.", str);
                        return Observable.empty();
                    }
                });
            }
        }).subscribe(new Action1<String>() { // from class: com.couchbase.client.core.config.refresher.HttpRefresher.4
            @Override // rx.functions.Action1
            public void call(String str2) {
                HttpRefresher.this.pushConfig(str2);
            }
        });
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public Observable<Boolean> shutdown() {
        return null;
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markTainted(BucketConfig bucketConfig) {
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void markUntainted(BucketConfig bucketConfig) {
    }

    @Override // com.couchbase.client.core.config.refresher.Refresher
    public void refresh(ClusterConfig clusterConfig) {
    }
}
