/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.lookup;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.primitives.Ints;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.RocksDBOptions;
import org.apache.flink.table.store.connector.lookup.LookupTable;
import org.apache.flink.table.store.connector.lookup.RocksDBStateFactory;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.file.predicate.PredicateFilter;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.source.TableStreamingReader;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreLookupFunction
extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final FileStoreTable table;
    private final List<String> projectFields;
    private final List<String> joinKeys;
    @Nullable
    private final Predicate predicate;
    private transient Duration refreshInterval;
    private transient File path;
    private transient RocksDBStateFactory stateFactory;
    private transient LookupTable lookupTable;
    private transient long nextLoadTime;
    private transient TableStreamingReader streamingReader;

    public FileStoreLookupFunction(FileStoreTable table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
        TableSchema schema = table.schema();
        Preconditions.checkArgument((boolean)schema.partitionKeys().isEmpty(), (Object)"Currently only support non-partitioned table.");
        Preconditions.checkArgument((schema.primaryKeys().size() > 0 ? 1 : 0) != 0, (Object)"Currently only support primary key table.");
        ContinuousDataFileSnapshotEnumerator.validate(table.schema());
        this.table = table;
        this.joinKeys = Arrays.stream(joinKeyIndex).mapToObj(i -> schema.fieldNames().get(projection[i])).collect(Collectors.toList());
        this.projectFields = Arrays.stream(projection).mapToObj(i -> schema.fieldNames().get(i)).collect(Collectors.toList());
        for (String field : schema.primaryKeys()) {
            if (this.projectFields.contains(field)) continue;
            this.projectFields.add(field);
        }
        this.predicate = predicate;
    }

    public void open(FunctionContext context) throws Exception {
        super.open(context);
        String tmpDirectory = FileStoreLookupFunction.getTmpDirectory(context);
        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
        Configuration options = Configuration.fromMap(this.table.schema().options());
        this.refreshInterval = (Duration)options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
        this.stateFactory = new RocksDBStateFactory(this.path.toString(), options);
        List fieldNames = this.table.schema().logicalRowType().getFieldNames();
        int[] projection = this.projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
        RowType rowType = TypeUtils.project(this.table.schema().logicalRowType(), projection);
        PredicateFilter recordFilter = this.createRecordFilter(projection);
        this.lookupTable = LookupTable.create(this.stateFactory, rowType, this.table.schema().primaryKeys(), this.joinKeys, recordFilter, options.getLong(RocksDBOptions.LOOKUP_CACHE_ROWS));
        this.nextLoadTime = -1L;
        this.streamingReader = new TableStreamingReader(this.table, projection, this.predicate);
        this.refresh();
    }

    private PredicateFilter createRecordFilter(int[] projection) {
        Predicate adjustedPredicate = null;
        if (this.predicate != null) {
            adjustedPredicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.schema().fields().size()).map(i -> Ints.indexOf((int[])projection, (int)i)).toArray()).orElse(null);
        }
        return new PredicateFilter(TypeUtils.project(this.table.schema().logicalRowType(), projection), adjustedPredicate);
    }

    public void eval(Object ... values) throws Exception {
        this.checkRefresh();
        List<RowData> results = this.lookupTable.get((RowData)GenericRowData.of((Object[])values));
        for (RowData matchedRow : results) {
            this.collect(matchedRow);
        }
    }

    private void checkRefresh() throws Exception {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0L) {
            LOG.info("Lookup table has refreshed after {} minute(s), refreshing", (Object)this.refreshInterval.toMinutes());
        }
        this.refresh();
        this.nextLoadTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
    }

    private void refresh() throws Exception {
        Iterator<RowData> batch;
        while ((batch = this.streamingReader.nextBatch()) != null) {
            this.lookupTable.refresh(batch);
        }
        return;
    }

    public void close() throws IOException {
        if (this.stateFactory != null) {
            this.stateFactory.close();
            this.stateFactory = null;
        }
        if (this.path != null) {
            FileUtils.deleteDirectoryQuietly((File)this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext context) {
        try {
            Field field = context.getClass().getDeclaredField("context");
            field.setAccessible(true);
            StreamingRuntimeContext runtimeContext = (StreamingRuntimeContext)field.get(context);
            String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }
}

