/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.promote;

import eu.dnetlib.dhp.actionmanager.promote.PromoteAction;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import scala.Tuple2;

public class PromoteActionPayloadFunctions {
    private PromoteActionPayloadFunctions() {
    }

    public static <G extends Oaf, A extends Oaf> Dataset<G> joinGraphTableWithActionPayloadAndMerge(Dataset<G> rowDS, Dataset<A> actionPayloadDS, FunctionalInterfaceSupport.SerializableSupplier<Function<G, String>> rowIdFn, FunctionalInterfaceSupport.SerializableSupplier<Function<A, String>> actionPayloadIdFn, FunctionalInterfaceSupport.SerializableSupplier<BiFunction<G, A, G>> mergeAndGetFn, PromoteAction.Strategy promoteActionStrategy, Class<G> rowClazz, Class<A> actionPayloadClazz) {
        if (Boolean.FALSE.equals(ModelSupport.isSubClass(rowClazz, actionPayloadClazz))) {
            throw new RuntimeException("action payload type must be the same or be a super type of table row type");
        }
        Dataset<Tuple2<String, G>> rowWithIdDS = PromoteActionPayloadFunctions.mapToTupleWithId(rowDS, rowIdFn, rowClazz);
        Dataset<Tuple2<String, A>> actionPayloadWithIdDS = PromoteActionPayloadFunctions.mapToTupleWithId(actionPayloadDS, actionPayloadIdFn, actionPayloadClazz);
        return rowWithIdDS.joinWith(actionPayloadWithIdDS, rowWithIdDS.col("_1").equalTo((Object)actionPayloadWithIdDS.col("_1")), PromoteAction.joinTypeForStrategy(promoteActionStrategy)).map((MapFunction & Serializable)value -> {
            Optional<Oaf> rowOpt = Optional.ofNullable((Tuple2)value._1()).map(Tuple2::_2);
            Optional<Oaf> actionPayloadOpt = Optional.ofNullable((Tuple2)value._2()).map(Tuple2::_2);
            return rowOpt.map(row -> actionPayloadOpt.map(actionPayload -> (Oaf)((BiFunction)mergeAndGetFn.get()).apply(row, actionPayload)).orElse((Oaf)row)).orElseGet(() -> actionPayloadOpt.filter(actionPayload -> actionPayload.getClass().equals(rowClazz)).map(rowClazz::cast).orElse(null));
        }, Encoders.kryo(rowClazz)).filter(Objects::nonNull);
    }

    private static <T extends Oaf> Dataset<Tuple2<String, T>> mapToTupleWithId(Dataset<T> ds, FunctionalInterfaceSupport.SerializableSupplier<Function<T, String>> idFn, Class<T> clazz) {
        return ds.map((MapFunction & Serializable)value -> new Tuple2((Object)((String)((Function)idFn.get()).apply(value)), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
    }

    public static <G extends Oaf> Dataset<G> groupGraphTableByIdAndMerge(Dataset<G> rowDS, FunctionalInterfaceSupport.SerializableSupplier<Function<G, String>> rowIdFn, FunctionalInterfaceSupport.SerializableSupplier<BiFunction<G, G, G>> mergeAndGetFn, FunctionalInterfaceSupport.SerializableSupplier<G> zeroFn, FunctionalInterfaceSupport.SerializableSupplier<Function<G, Boolean>> isNotZeroFn, Class<G> rowClazz) {
        TypedColumn aggregator = new TableAggregator<G>(zeroFn, mergeAndGetFn, isNotZeroFn, rowClazz).toColumn();
        return rowDS.filter((FilterFunction & Serializable)o -> (Boolean)((Function)isNotZeroFn.get()).apply(o)).groupByKey((MapFunction & Serializable)x -> (String)((Function)rowIdFn.get()).apply(x), Encoders.STRING()).agg(aggregator).map(Tuple2::_2, Encoders.kryo(rowClazz));
    }

    public static class TableAggregator<G extends Oaf>
    extends Aggregator<G, G, G> {
        private final FunctionalInterfaceSupport.SerializableSupplier<G> zeroFn;
        private final FunctionalInterfaceSupport.SerializableSupplier<BiFunction<G, G, G>> mergeAndGetFn;
        private final FunctionalInterfaceSupport.SerializableSupplier<Function<G, Boolean>> isNotZeroFn;
        private final Class<G> rowClazz;

        public TableAggregator(FunctionalInterfaceSupport.SerializableSupplier<G> zeroFn, FunctionalInterfaceSupport.SerializableSupplier<BiFunction<G, G, G>> mergeAndGetFn, FunctionalInterfaceSupport.SerializableSupplier<Function<G, Boolean>> isNotZeroFn, Class<G> rowClazz) {
            this.zeroFn = zeroFn;
            this.mergeAndGetFn = mergeAndGetFn;
            this.isNotZeroFn = isNotZeroFn;
            this.rowClazz = rowClazz;
        }

        public G zero() {
            return (G)((Oaf)this.zeroFn.get());
        }

        public G reduce(G b, G a) {
            return this.zeroSafeMergeAndGet(b, a);
        }

        public G merge(G b1, G b2) {
            return this.zeroSafeMergeAndGet(b1, b2);
        }

        private G zeroSafeMergeAndGet(G left, G right) {
            Function isNotZero = (Function)this.isNotZeroFn.get();
            if (((Boolean)isNotZero.apply(left)).booleanValue() && ((Boolean)isNotZero.apply(right)).booleanValue()) {
                return (G)((Oaf)((BiFunction)this.mergeAndGetFn.get()).apply(left, right));
            }
            if (((Boolean)isNotZero.apply(left)).booleanValue() && !((Boolean)isNotZero.apply(right)).booleanValue()) {
                return left;
            }
            if (!((Boolean)isNotZero.apply(left)).booleanValue() && ((Boolean)isNotZero.apply(right)).booleanValue()) {
                return right;
            }
            throw new RuntimeException("internal aggregation error: left and right objects are zero");
        }

        public G finish(G reduction) {
            return reduction;
        }

        public Encoder<G> bufferEncoder() {
            return Encoders.kryo(this.rowClazz);
        }

        public Encoder<G> outputEncoder() {
            return Encoders.kryo(this.rowClazz);
        }
    }
}

