/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.store.connector.lookup.PrimaryKeyLookupTable;
import org.apache.flink.table.store.connector.lookup.RocksDBSetState;
import org.apache.flink.table.store.connector.lookup.RocksDBStateFactory;
import org.apache.flink.table.store.utils.KeyProjectedRowData;
import org.apache.flink.table.store.utils.TypeUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class SecondaryIndexLookupTable
extends PrimaryKeyLookupTable {
    private final RocksDBSetState indexState;
    private final KeyProjectedRowData secKeyRow;

    public SecondaryIndexLookupTable(RocksDBStateFactory stateFactory, RowType rowType, List<String> primaryKey, List<String> secKey, Predicate<RowData> recordFilter, long lruCacheSize) throws IOException {
        super(stateFactory, rowType, primaryKey, recordFilter, lruCacheSize / 2L);
        List fieldNames = rowType.getFieldNames();
        int[] secKeyMapping = secKey.stream().mapToInt(fieldNames::indexOf).toArray();
        this.secKeyRow = new KeyProjectedRowData(secKeyMapping);
        this.indexState = stateFactory.setState("sec-index", (TypeSerializer<RowData>)InternalSerializers.create((RowType)TypeUtils.project(rowType, secKeyMapping)), (TypeSerializer<RowData>)InternalSerializers.create((RowType)TypeUtils.project(rowType, this.primaryKeyMapping)), lruCacheSize / 2L);
    }

    @Override
    public List<RowData> get(RowData key) throws IOException {
        List<RowData> pks = this.indexState.get(key);
        ArrayList<RowData> values = new ArrayList<RowData>(pks.size());
        for (RowData pk : pks) {
            RowData value = this.tableState.get(pk);
            if (value == null) continue;
            values.add(value);
        }
        return values;
    }

    @Override
    public void refresh(Iterator<RowData> incremental) throws IOException {
        while (incremental.hasNext()) {
            RowData row = incremental.next();
            this.primaryKey.replaceRow(row);
            if (row.getRowKind() == RowKind.INSERT || row.getRowKind() == RowKind.UPDATE_AFTER) {
                RowData previous = this.tableState.get(this.primaryKey);
                if (previous != null) {
                    this.indexState.retract(this.secKeyRow.replaceRow(previous), this.primaryKey);
                }
                if (this.recordFilter.test(row)) {
                    this.tableState.put(this.primaryKey, row);
                    this.indexState.add(this.secKeyRow.replaceRow(row), this.primaryKey);
                    continue;
                }
                this.tableState.delete(this.primaryKey);
                continue;
            }
            this.tableState.delete(this.primaryKey);
            this.indexState.retract(this.secKeyRow.replaceRow(row), this.primaryKey);
        }
    }
}

