/*
 * Decompiled with CFR 0.152.
 */
package org.gcube.data.streams.publishers;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.Utils;
import org.gcube.data.streams.exceptions.StreamContingencyException;
import org.gcube.data.streams.exceptions.StreamPublishException;
import org.gcube.data.streams.generators.Generator;
import org.gcube.data.streams.handlers.FaultHandler;
import org.gcube.data.streams.handlers.FaultResponse;
import org.gcube.data.streams.handlers.StopUnrecoverableHandler;
import org.gcube.data.streams.publishers.RecordFactory;
import org.gcube.data.streams.publishers.RsStringRecordFactory;
import org.gcube.data.streams.publishers.RsTransport;
import org.gcube.data.streams.publishers.StreamPublisher;
import org.gcube.data.streams.publishers.ThreadProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RsPublisher<E>
implements StreamPublisher {
    private static Logger log = LoggerFactory.getLogger(RsPublisher.class);
    private final Stream<E> stream;
    private final RecordFactory<E> factory;
    private RsTransport transport;
    private int bufferSize = RecordWriter.DefaultBufferCapacity;
    private long timeout = RecordWriter.DefaultInactivityTimeout;
    private TimeUnit timeoutUnit = RecordWriter.DefaultInactivityTimeUnit;
    private boolean onDemand = true;
    private ThreadProvider provider = new ThreadProvider(){

        @Override
        public Thread newThread(Runnable task) {
            return new Thread(task);
        }
    };
    private FaultHandler handler = new StopUnrecoverableHandler();

    public RsPublisher(Stream<E> stream, Generator<E, String> serialiser) {
        this(stream, new RsStringRecordFactory<E>(serialiser));
    }

    public RsPublisher(Stream<E> stream, RecordFactory<E> factory) {
        if (stream == null || factory == null || factory.definitions() == null) {
            throw new IllegalArgumentException("invalid or null inputs");
        }
        this.stream = stream;
        this.factory = factory;
    }

    public void setBufferSize(int size) throws IllegalArgumentException {
        if (size <= 0) {
            throw new IllegalArgumentException("invalid empty buffer");
        }
        this.bufferSize = size;
    }

    public void setTimeout(long timeout, TimeUnit timeoutUnit) throws IllegalArgumentException {
        if (timeout <= 0L || timeoutUnit == null) {
            throw new IllegalArgumentException("invalid  timeout or time unit");
        }
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public void setTransport(RsTransport transport) {
        if (transport == null) {
            throw new IllegalArgumentException("invalid null transport");
        }
        this.transport = transport;
    }

    public void setOnDemand(boolean onDemand) {
        this.onDemand = onDemand;
    }

    public void setThreadProvider(ThreadProvider provider) {
        if (provider == null) {
            throw new IllegalArgumentException("invalid null provider");
        }
        this.provider = provider;
    }

    public void setFaultHandler(FaultHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("invalid null handler");
        }
        this.handler = handler;
    }

    @Override
    public URI publish() throws StreamPublishException {
        URI locator;
        RecordWriter writer;
        Utils.initialiseRS();
        if (this.transport == null) {
            this.transport = RsTransport.TCP;
        }
        try {
            writer = new RecordWriter(this.transport.proxy(), this.factory.definitions(), this.bufferSize, RecordWriter.DefaultConcurrentPartialCapacity, RecordWriter.DefaultMirrorBufferFactor, this.timeout, this.timeoutUnit);
            locator = writer.getLocator();
        }
        catch (GRS2WriterException e) {
            throw new StreamPublishException("cannot publish stream as resultset", e);
        }
        Runnable feeder = this.newFeeder((RecordWriter<Record>)writer, locator);
        this.provider.newThread(feeder).start();
        return locator;
    }

    private Runnable newFeeder(final RecordWriter<Record> writer, final URI locator) {
        return new Runnable(){

            @Override
            public void run() {
                while (RsPublisher.this.stream.hasNext()) {
                    Object element;
                    try {
                        element = RsPublisher.this.stream.next();
                    }
                    catch (RuntimeException e) {
                        try {
                            FaultResponse response = RsPublisher.this.handler.handle(e);
                            if (response == FaultResponse.CONTINUE) continue;
                            break;
                        }
                        catch (StreamContingencyException contingency) {
                            log.error("contingency failure in input stream, continue publication until failure can be added to resultset (gRS2 requirement)", (Throwable)contingency);
                            continue;
                        }
                        catch (RuntimeException outage) {
                            log.error("unrecoverable failure in input stream, silently stopping publication", (Throwable)outage);
                            break;
                        }
                    }
                    if (writer.getStatus() == IBuffer.Status.Open) {
                        Record record = null;
                        try {
                            record = RsPublisher.this.factory.newRecord(element);
                        }
                        catch (RuntimeException e) {
                            try {
                                FaultResponse response = RsPublisher.this.handler.handle(e);
                                if (response == FaultResponse.CONTINUE) continue;
                                break;
                            }
                            catch (StreamContingencyException contingency) {
                                log.error("contingency failure in record generation " + record.getID() + ", continue publication until failure can be added to resultset (gRS2 requirement)", (Throwable)contingency);
                                continue;
                            }
                            catch (RuntimeException outage) {
                                log.error("unrecoverable failure in record generation " + record.getID() + ", stopping publication", (Throwable)outage);
                                break;
                            }
                        }
                        try {
                            if (writer.put(record, RsPublisher.this.timeout, RsPublisher.this.timeoutUnit)) continue;
                            log.trace("client is not consuming resulset, stop publishing");
                            try {
                                writer.close();
                            }
                            catch (GRS2WriterException e) {
                                log.error("error closing resultset at " + locator, (Throwable)e);
                            }
                        }
                        catch (GRS2WriterException e) {
                            log.error("unrecoverable error writing record in resultset, stop publishing", (Throwable)e);
                        }
                        continue;
                    }
                    if (!RsPublisher.this.onDemand) continue;
                    break;
                }
                if (writer.getStatus() != IBuffer.Status.Dispose) {
                    try {
                        writer.close();
                    }
                    catch (GRS2WriterException e) {
                        log.error("error closing resultset at " + locator, (Throwable)e);
                    }
                }
                RsPublisher.this.stream.close();
            }
        };
    }
}

