/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.java.query.core;

import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.annotations.InterfaceAudience;
import com.couchbase.client.core.annotations.InterfaceStability;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.cluster.GetClusterConfigRequest;
import com.couchbase.client.core.message.cluster.GetClusterConfigResponse;
import com.couchbase.client.core.message.query.GenericQueryRequest;
import com.couchbase.client.core.message.query.GenericQueryResponse;
import com.couchbase.client.core.utils.Buffers;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.java.CouchbaseAsyncBucket;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.QueryExecutionException;
import com.couchbase.client.java.error.TranscodingException;
import com.couchbase.client.java.query.AsyncN1qlQueryResult;
import com.couchbase.client.java.query.AsyncN1qlQueryRow;
import com.couchbase.client.java.query.DefaultAsyncN1qlQueryResult;
import com.couchbase.client.java.query.DefaultAsyncN1qlQueryRow;
import com.couchbase.client.java.query.N1qlMetrics;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.ParameterizedN1qlQuery;
import com.couchbase.client.java.query.PrepareStatement;
import com.couchbase.client.java.query.PreparedN1qlQuery;
import com.couchbase.client.java.query.PreparedPayload;
import com.couchbase.client.java.query.SimpleN1qlQuery;
import com.couchbase.client.java.query.Statement;
import com.couchbase.client.java.transcoder.TranscoderUtils;
import com.couchbase.client.java.util.LRUCache;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import rx.Observable;
import rx.exceptions.CompositeException;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;

@InterfaceStability.Experimental
@InterfaceAudience.Private
public class N1qlQueryExecutor {
    private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(N1qlQueryExecutor.class);
    private static final int QUERY_CACHE_SIZE = 5000;
    private static final String ERROR_FIELD_CODE = "code";
    private static final String ERROR_FIELD_MSG = "msg";
    protected static final String ERROR_5000_SPECIFIC_MESSAGE = "queryport.indexNotFound";
    public static final String ENCODED_PLAN_ENABLED_PROPERTY = "com.couchbase.query.encodedPlanEnabled";
    private final ClusterFacade core;
    private final String bucket;
    private final String password;
    private final Map<String, PreparedPayload> queryCache;
    private final boolean encodedPlanEnabled;
    private static final Func1<AsyncN1qlQueryResult, Observable<AsyncN1qlQueryResult>> QUERY_RESULT_PEEK_FOR_RETRY = new Func1<AsyncN1qlQueryResult, Observable<AsyncN1qlQueryResult>>(){

        @Override
        public Observable<AsyncN1qlQueryResult> call(final AsyncN1qlQueryResult aqr) {
            final Observable<JsonObject> cachedErrors = aqr.errors().cache();
            return cachedErrors.filter(new Func1<JsonObject, Boolean>(){

                @Override
                public Boolean call(JsonObject e) {
                    return N1qlQueryExecutor.shouldRetry(e);
                }
            }).lastOrDefault(null).flatMap(new Func1<JsonObject, Observable<AsyncN1qlQueryResult>>(){

                @Override
                public Observable<AsyncN1qlQueryResult> call(JsonObject errorJson) {
                    if (errorJson == null) {
                        DefaultAsyncN1qlQueryResult copyResult = new DefaultAsyncN1qlQueryResult(aqr.rows(), aqr.signature(), aqr.info(), cachedErrors, aqr.status(), aqr.parseSuccess(), aqr.requestId(), aqr.clientContextId());
                        return Observable.just(copyResult);
                    }
                    return Observable.error(new QueryExecutionException("Error with prepared query", errorJson));
                }
            });
        }
    };

    public N1qlQueryExecutor(ClusterFacade core, String bucket, String password) {
        this(core, bucket, password, new LRUCache<String, PreparedPayload>(5000), true);
    }

    public N1qlQueryExecutor(ClusterFacade core, String bucket, String password, boolean encodedPlanEnabled) {
        this(core, bucket, password, new LRUCache<String, PreparedPayload>(5000), encodedPlanEnabled);
    }

    protected N1qlQueryExecutor(ClusterFacade core, String bucket, String password, LRUCache<String, PreparedPayload> lruCache, boolean encodedPlanEnabled) {
        this.core = core;
        this.bucket = bucket;
        this.password = password;
        this.encodedPlanEnabled = encodedPlanEnabled;
        this.queryCache = Collections.synchronizedMap(lruCache);
    }

    public Observable<AsyncN1qlQueryResult> execute(N1qlQuery query) {
        if (query.params().isAdhoc()) {
            return this.executeQuery(query);
        }
        return this.dispatchPrepared(query);
    }

    protected Observable<AsyncN1qlQueryResult> executeQuery(final N1qlQuery query) {
        return Observable.defer(new Func0<Observable<GenericQueryResponse>>(){

            @Override
            public Observable<GenericQueryResponse> call() {
                return N1qlQueryExecutor.this.core.send(N1qlQueryExecutor.this.createN1qlRequest(query, N1qlQueryExecutor.this.bucket, N1qlQueryExecutor.this.password, null));
            }
        }).flatMap(new Func1<GenericQueryResponse, Observable<AsyncN1qlQueryResult>>(){

            @Override
            public Observable<AsyncN1qlQueryResult> call(GenericQueryResponse response) {
                Observable<AsyncN1qlQueryRow> rows = response.rows().map(new Func1<ByteBuf, AsyncN1qlQueryRow>(){

                    @Override
                    public AsyncN1qlQueryRow call(ByteBuf byteBuf) {
                        try {
                            TranscoderUtils.ByteBufToArray rawData = TranscoderUtils.byteBufToByteArray(byteBuf);
                            byte[] copy = Arrays.copyOfRange(rawData.byteArray, rawData.offset, rawData.offset + rawData.length);
                            DefaultAsyncN1qlQueryRow defaultAsyncN1qlQueryRow = new DefaultAsyncN1qlQueryRow(copy);
                            return defaultAsyncN1qlQueryRow;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode N1QL Query Row.", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                });
                Observable<Object> signature = response.signature().map(new Func1<ByteBuf, Object>(){

                    @Override
                    public Object call(ByteBuf byteBuf) {
                        try {
                            Object object = CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.byteBufJsonValueToObject(byteBuf);
                            return object;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode N1QL Query Signature", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                });
                Observable<N1qlMetrics> info = response.info().map(new Func1<ByteBuf, JsonObject>(){

                    @Override
                    public JsonObject call(ByteBuf byteBuf) {
                        try {
                            JsonObject jsonObject = CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.byteBufToJsonObject(byteBuf);
                            return jsonObject;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode N1QL Query Info.", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                }).map(new Func1<JsonObject, N1qlMetrics>(){

                    @Override
                    public N1qlMetrics call(JsonObject jsonObject) {
                        return new N1qlMetrics(jsonObject);
                    }
                });
                Observable<String> finalStatus = response.queryStatus();
                Observable<JsonObject> errors = response.errors().map(new Func1<ByteBuf, JsonObject>(){

                    @Override
                    public JsonObject call(ByteBuf byteBuf) {
                        try {
                            JsonObject jsonObject = CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.byteBufToJsonObject(byteBuf);
                            return jsonObject;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode View Info.", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                });
                boolean parseSuccess = response.status().isSuccess();
                String contextId = response.clientRequestId() == null ? "" : response.clientRequestId();
                String requestId = response.requestId();
                DefaultAsyncN1qlQueryResult r = new DefaultAsyncN1qlQueryResult(rows, signature, info, errors, finalStatus, parseSuccess, requestId, contextId);
                return Observable.just(r);
            }
        });
    }

    private static boolean shouldRetry(JsonObject errorJson) {
        if (errorJson == null) {
            return false;
        }
        Integer code = errorJson.getInt(ERROR_FIELD_CODE);
        String msg = errorJson.getString(ERROR_FIELD_MSG);
        if (code == null || msg == null) {
            return false;
        }
        return code == 4050 || code == 4070 || code == 5000 && msg.contains(ERROR_5000_SPECIFIC_MESSAGE);
    }

    protected Observable<AsyncN1qlQueryResult> dispatchPrepared(final N1qlQuery query) {
        PreparedPayload payload = this.queryCache.get(query.statement().toString());
        Func1<Throwable, Observable<AsyncN1qlQueryResult>> retryFunction = new Func1<Throwable, Observable<AsyncN1qlQueryResult>>(){

            @Override
            public Observable<AsyncN1qlQueryResult> call(Throwable throwable) {
                return N1qlQueryExecutor.this.retryPrepareAndExecuteOnce(throwable, query);
            }
        };
        if (payload != null) {
            return this.executePrepared(query, payload).flatMap(QUERY_RESULT_PEEK_FOR_RETRY).onErrorResumeNext(retryFunction);
        }
        return this.prepareAndExecute(query).flatMap(QUERY_RESULT_PEEK_FOR_RETRY).onErrorResumeNext(retryFunction);
    }

    protected Observable<AsyncN1qlQueryResult> retryPrepareAndExecuteOnce(Throwable error, N1qlQuery query) {
        if (error instanceof QueryExecutionException && N1qlQueryExecutor.shouldRetry(((QueryExecutionException)error).getN1qlError())) {
            this.queryCache.remove(query.statement().toString());
            return this.prepareAndExecute(query);
        }
        return Observable.error(error);
    }

    protected Observable<AsyncN1qlQueryResult> prepareAndExecute(final N1qlQuery query) {
        return this.prepare(query.statement()).flatMap(new Func1<PreparedPayload, Observable<AsyncN1qlQueryResult>>(){

            @Override
            public Observable<AsyncN1qlQueryResult> call(PreparedPayload payload) {
                N1qlQueryExecutor.this.queryCache.put(query.statement().toString(), payload);
                return N1qlQueryExecutor.this.executePrepared(query, payload);
            }
        });
    }

    protected Observable<AsyncN1qlQueryResult> executePrepared(N1qlQuery query, PreparedPayload payload) {
        ParameterizedN1qlQuery pq;
        PreparedN1qlQuery preparedQuery = query instanceof ParameterizedN1qlQuery ? ((pq = (ParameterizedN1qlQuery)query).isPositional() ? new PreparedN1qlQuery(payload, (JsonArray)pq.statementParameters(), query.params()) : new PreparedN1qlQuery(payload, (JsonObject)pq.statementParameters(), query.params())) : new PreparedN1qlQuery(payload, query.params());
        preparedQuery.setEncodedPlanEnabled(this.isEncodedPlanEnabled());
        return this.executeQuery(preparedQuery);
    }

    protected Observable<PreparedPayload> prepare(Statement statement) {
        final PrepareStatement prepared = statement instanceof PrepareStatement ? (PrepareStatement)statement : PrepareStatement.prepare(statement);
        final SimpleN1qlQuery query = N1qlQuery.simple(prepared);
        Observable<GenericQueryResponse> source = this.isEncodedPlanEnabled() ? Observable.defer(new Func0<Observable<GenericQueryResponse>>(){

            @Override
            public Observable<GenericQueryResponse> call() {
                return N1qlQueryExecutor.this.core.send(N1qlQueryExecutor.this.createN1qlRequest(query, N1qlQueryExecutor.this.bucket, N1qlQueryExecutor.this.password, null));
            }
        }) : Observable.defer(new Func0<Observable<GetClusterConfigResponse>>(){

            @Override
            public Observable<GetClusterConfigResponse> call() {
                return N1qlQueryExecutor.this.core.send(new GetClusterConfigRequest());
            }
        }).flatMap(new Func1<GetClusterConfigResponse, Observable<NodeInfo>>(){

            @Override
            public Observable<NodeInfo> call(GetClusterConfigResponse getClusterConfigResponse) {
                return Observable.from(getClusterConfigResponse.config().bucketConfig(N1qlQueryExecutor.this.bucket).nodes());
            }
        }).flatMap(new Func1<NodeInfo, Observable<GenericQueryResponse>>(){

            @Override
            public Observable<GenericQueryResponse> call(NodeInfo nodeInfo) {
                GenericQueryRequest req = N1qlQueryExecutor.this.createN1qlRequest(query, N1qlQueryExecutor.this.bucket, N1qlQueryExecutor.this.password, nodeInfo.hostname());
                return N1qlQueryExecutor.this.core.send(req);
            }
        });
        return source.flatMap(new Func1<GenericQueryResponse, Observable<PreparedPayload>>(){

            @Override
            public Observable<PreparedPayload> call(GenericQueryResponse r) {
                if (r.status().isSuccess()) {
                    r.info().subscribe(Buffers.BYTE_BUF_RELEASER);
                    r.signature().subscribe(Buffers.BYTE_BUF_RELEASER);
                    r.errors().subscribe(Buffers.BYTE_BUF_RELEASER);
                    return r.rows().map(new Func1<ByteBuf, PreparedPayload>(){

                        @Override
                        public PreparedPayload call(ByteBuf byteBuf) {
                            try {
                                JsonObject value = CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.byteBufToJsonObject(byteBuf);
                                PreparedPayload preparedPayload = N1qlQueryExecutor.this.extractPreparedPayloadFromResponse(prepared, value);
                                return preparedPayload;
                            }
                            catch (Exception e) {
                                throw new TranscodingException("Could not decode N1QL Query Plan.", e);
                            }
                            finally {
                                byteBuf.release();
                            }
                        }
                    });
                }
                r.info().subscribe(Buffers.BYTE_BUF_RELEASER);
                r.signature().subscribe(Buffers.BYTE_BUF_RELEASER);
                r.rows().subscribe(Buffers.BYTE_BUF_RELEASER);
                return r.errors().map(new Func1<ByteBuf, Exception>(){

                    @Override
                    public Exception call(ByteBuf byteBuf) {
                        try {
                            JsonObject value = CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER.byteBufToJsonObject(byteBuf);
                            CouchbaseException couchbaseException = new CouchbaseException("N1qlQuery Error - " + value.toString());
                            return couchbaseException;
                        }
                        catch (Exception e) {
                            throw new TranscodingException("Could not decode N1QL Query Plan.", e);
                        }
                        finally {
                            byteBuf.release();
                        }
                    }
                }).reduce(new ArrayList(), new Func2<ArrayList<Throwable>, Exception, ArrayList<Throwable>>(){

                    @Override
                    public ArrayList<Throwable> call(ArrayList<Throwable> throwables, Exception error) {
                        throwables.add(error);
                        return throwables;
                    }
                }).flatMap(new Func1<ArrayList<Throwable>, Observable<PreparedPayload>>(){

                    @Override
                    public Observable<PreparedPayload> call(ArrayList<Throwable> errors) {
                        if (errors.size() == 1) {
                            return Observable.error(new CouchbaseException("Error while preparing plan", errors.get(0)));
                        }
                        return Observable.error(new CompositeException("Multiple errors while preparing plan", errors));
                    }
                });
            }
        }).last();
    }

    private GenericQueryRequest createN1qlRequest(N1qlQuery query, String bucket, String password, InetAddress targetNode) {
        String rawQuery = query.n1ql().toString();
        rawQuery = rawQuery.replaceAll("#CURRENT_BUCKET#", "`" + bucket + "`");
        if (targetNode != null) {
            return GenericQueryRequest.jsonQuery(rawQuery, bucket, password, targetNode);
        }
        return GenericQueryRequest.jsonQuery(rawQuery, bucket, password);
    }

    protected PreparedPayload extractPreparedPayloadFromResponse(PrepareStatement prepared, JsonObject response) {
        return new PreparedPayload(prepared.originalStatement(), response.getString("name"), response.getString("encoded_plan"));
    }

    public int invalidateQueryCache() {
        int oldSize = this.queryCache.size();
        this.queryCache.clear();
        return oldSize;
    }

    public boolean isEncodedPlanEnabled() {
        return this.encodedPlanEnabled;
    }
}

