/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.util;

import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.RandomAccessInputView;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.HeaderlessChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.io.BinaryRowChannelInputViewIterator;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.runtime.util.ResettableRowBuffer;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResettableExternalBuffer
implements ResettableRowBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(ResettableExternalBuffer.class);
    public static final int MIN_NUM_MEMORY = 327680;
    private static final int READ_BUFFER = 2;
    private final IOManager ioManager;
    private final LazyMemorySegmentPool pool;
    private final BinaryRowDataSerializer binaryRowSerializer;
    private final InMemoryBuffer inMemoryBuffer;
    private final int segmentSize;
    private long spillSize;
    private long rowLength;
    private boolean isRowAllInFixedPart;
    private final List<ChannelWithMeta> spilledChannelIDs;
    private final List<Integer> spilledChannelRowOffsets;
    private int numRows;
    private int iterOpenedCount;
    private int externalBufferVersion;
    private boolean addCompleted;

    public ResettableExternalBuffer(IOManager ioManager, LazyMemorySegmentPool pool, AbstractRowDataSerializer serializer, boolean isRowAllInFixedPart) {
        this.ioManager = ioManager;
        this.pool = pool;
        this.binaryRowSerializer = serializer instanceof BinaryRowDataSerializer ? (BinaryRowDataSerializer)serializer.duplicate() : new BinaryRowDataSerializer(serializer.getArity());
        this.segmentSize = pool.pageSize();
        this.spilledChannelIDs = new ArrayList<ChannelWithMeta>();
        this.spillSize = 0L;
        this.spilledChannelRowOffsets = new ArrayList<Integer>();
        this.numRows = 0;
        this.iterOpenedCount = 0;
        this.externalBufferVersion = 0;
        this.isRowAllInFixedPart = isRowAllInFixedPart;
        this.rowLength = isRowAllInFixedPart ? (long)this.binaryRowSerializer.getSerializedRowFixedPartLength() : -1L;
        this.addCompleted = false;
        this.inMemoryBuffer = new InMemoryBuffer(serializer);
    }

    @Override
    public void reset() {
        this.clearChannels();
        this.inMemoryBuffer.reset();
        this.numRows = 0;
        ++this.externalBufferVersion;
        this.addCompleted = false;
    }

    @Override
    public void add(RowData row) throws IOException {
        Preconditions.checkState(!this.addCompleted, "This buffer has add completed.");
        if (!this.inMemoryBuffer.write(row)) {
            if (this.inMemoryBuffer.getCurrentDataBufferOffset() == 0L) {
                this.throwTooBigException(row);
            }
            this.spill();
            if (!this.inMemoryBuffer.write(row)) {
                this.throwTooBigException(row);
            }
        }
        ++this.numRows;
    }

    @Override
    public void complete() {
        this.addCompleted = true;
    }

    @Override
    public BufferIterator newIterator() {
        return this.newIterator(0);
    }

    @Override
    public BufferIterator newIterator(int beginRow) {
        Preconditions.checkState(this.addCompleted, "This buffer has not add completed.");
        Preconditions.checkArgument(beginRow >= 0, "`beginRow` can't be negative!");
        ++this.iterOpenedCount;
        return new BufferIterator(beginRow);
    }

    @Override
    public void close() {
        this.clearChannels();
        this.inMemoryBuffer.close();
        this.pool.close();
    }

    private void throwTooBigException(RowData row) throws IOException {
        int rowSize = InstantiationUtil.serializeToByteArray(this.inMemoryBuffer.serializer, row).length;
        throw new IOException("Record is too big, it can't be added to a empty InMemoryBuffer! Record size: " + rowSize + ", Buffer: " + this.memorySize());
    }

    private void spill() throws IOException {
        FileIOChannel.ID channel = this.ioManager.createChannel();
        BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
        int numRecordBuffers = this.inMemoryBuffer.getNumRecordBuffers();
        ArrayList segments = this.inMemoryBuffer.getRecordBufferSegments();
        try {
            for (int i = 0; i < numRecordBuffers; ++i) {
                writer.writeBlock((MemorySegment)segments.get(i));
            }
            LOG.info("here spill the reset buffer data with {} bytes", (Object)writer.getSize());
            writer.close();
        }
        catch (IOException e) {
            writer.closeAndDelete();
            throw e;
        }
        this.spillSize += (long)(numRecordBuffers * this.segmentSize);
        this.spilledChannelIDs.add(new ChannelWithMeta(channel, this.inMemoryBuffer.getNumRecordBuffers(), this.inMemoryBuffer.getNumBytesInLastBuffer()));
        this.spilledChannelRowOffsets.add(this.numRows);
        this.inMemoryBuffer.reset();
    }

    public int size() {
        return this.numRows;
    }

    private int memorySize() {
        return this.pool.freePages() * this.segmentSize;
    }

    public long getUsedMemoryInBytes() {
        return this.memorySize() + this.iterOpenedCount * 2 * this.segmentSize;
    }

    public int getNumSpillFiles() {
        return this.spilledChannelIDs.size();
    }

    public long getSpillInBytes() {
        return this.spillSize;
    }

    private void clearChannels() {
        for (ChannelWithMeta meta : this.spilledChannelIDs) {
            File f = new File(meta.getChannel().getPath());
            if (!f.exists()) continue;
            f.delete();
        }
        this.spilledChannelIDs.clear();
        this.spillSize = 0L;
        this.spilledChannelRowOffsets.clear();
    }

    @VisibleForTesting
    List<ChannelWithMeta> getSpillChannels() {
        return this.spilledChannelIDs;
    }

    private class InMemoryBuffer
    implements Closeable {
        private final AbstractRowDataSerializer serializer;
        private final ArrayList<MemorySegment> recordBufferSegments;
        private final SimpleCollectingOutputView recordCollector;
        private long currentDataBufferOffset;
        private int numBytesInLastBuffer;
        private int recordCount;

        private InMemoryBuffer(AbstractRowDataSerializer serializer) {
            this.serializer = (AbstractRowDataSerializer)serializer.duplicate();
            this.recordBufferSegments = new ArrayList();
            this.recordCollector = new SimpleCollectingOutputView(this.recordBufferSegments, ResettableExternalBuffer.this.pool, ResettableExternalBuffer.this.segmentSize);
            this.recordCount = 0;
        }

        private void reset() {
            this.currentDataBufferOffset = 0L;
            this.recordCount = 0;
            this.returnToSegmentPool();
            this.recordCollector.reset();
        }

        @Override
        public void close() {
            this.returnToSegmentPool();
        }

        private void returnToSegmentPool() {
            ResettableExternalBuffer.this.pool.returnAll(this.recordBufferSegments);
            this.recordBufferSegments.clear();
        }

        public boolean write(RowData row) throws IOException {
            try {
                this.serializer.serializeToPages(row, this.recordCollector);
                this.currentDataBufferOffset = this.recordCollector.getCurrentOffset();
                this.numBytesInLastBuffer = this.recordCollector.getCurrentPositionInSegment();
                ++this.recordCount;
                return true;
            }
            catch (EOFException e) {
                return false;
            }
        }

        private ArrayList<MemorySegment> getRecordBufferSegments() {
            return this.recordBufferSegments;
        }

        private long getCurrentDataBufferOffset() {
            return this.currentDataBufferOffset;
        }

        private int getNumRecordBuffers() {
            int result = (int)(this.currentDataBufferOffset / (long)ResettableExternalBuffer.this.segmentSize);
            long mod = this.currentDataBufferOffset % (long)ResettableExternalBuffer.this.segmentSize;
            if (mod != 0L) {
                ++result;
            }
            return result;
        }

        private int getNumBytesInLastBuffer() {
            return this.numBytesInLastBuffer;
        }

        private InMemoryBufferIterator newIterator(int beginRow, long offset) {
            Preconditions.checkArgument(offset >= 0L, "`offset` can't be negative!");
            RandomAccessInputView recordBuffer = new RandomAccessInputView(this.recordBufferSegments, ResettableExternalBuffer.this.segmentSize, this.numBytesInLastBuffer);
            return new InMemoryBufferIterator(this.recordCount, beginRow, offset, recordBuffer);
        }

        public class InMemoryBufferIterator
        implements MutableObjectIterator<BinaryRowData>,
        Closeable {
            private final int beginRow;
            private int nextRow;
            private RandomAccessInputView recordBuffer;
            private int expectedRecordCount;

            private InMemoryBufferIterator(int expectedRecordCount, int beginRow, long offset, RandomAccessInputView recordBuffer) {
                this.beginRow = beginRow;
                this.recordBuffer = recordBuffer;
                this.reset(expectedRecordCount, offset);
            }

            public void reset(int expectedRecordCount, long offset) {
                this.nextRow = this.beginRow;
                this.expectedRecordCount = expectedRecordCount;
                this.recordBuffer.setReadPosition(offset);
            }

            @Override
            public BinaryRowData next(BinaryRowData reuse) throws IOException {
                try {
                    if (this.expectedRecordCount != InMemoryBuffer.this.recordCount) {
                        throw new ConcurrentModificationException();
                    }
                    if (this.nextRow >= InMemoryBuffer.this.recordCount) {
                        return null;
                    }
                    ++this.nextRow;
                    return InMemoryBuffer.this.serializer.mapFromPages(reuse, this.recordBuffer);
                }
                catch (EOFException e) {
                    return null;
                }
            }

            @Override
            public BinaryRowData next() throws IOException {
                throw new RuntimeException("Not support!");
            }

            @Override
            public void close() {
            }
        }
    }

    public class BufferIterator
    implements ResettableRowBuffer.ResettableIterator {
        MutableObjectIterator<BinaryRowData> currentIterator;
        List<MemorySegment> freeMemory = null;
        BlockChannelReader<MemorySegment> fileReader;
        int currentChannelID = -1;
        BinaryRowData reuse = ResettableExternalBuffer.access$800(ResettableExternalBuffer.this).createInstance();
        BinaryRowData row;
        int beginRow;
        int nextRow;
        InMemoryBuffer.InMemoryBufferIterator reusableMemoryIterator;
        int versionSnapshot;
        boolean closed;

        private BufferIterator(int beginRow) {
            this.nextRow = this.beginRow = Math.min(beginRow, ResettableExternalBuffer.this.numRows);
            this.versionSnapshot = ResettableExternalBuffer.this.externalBufferVersion;
            this.closed = false;
        }

        private void checkValidity() {
            if (this.closed) {
                throw new RuntimeException("This iterator is closed!");
            }
            if (this.versionSnapshot != ResettableExternalBuffer.this.externalBufferVersion) {
                throw new RuntimeException("This iterator is no longer valid!");
            }
        }

        @Override
        public void reset() throws IOException {
            this.checkValidity();
            this.resetImpl();
        }

        private void resetImpl() throws IOException {
            this.closeCurrentFileReader();
            this.nextRow = this.beginRow;
            this.currentChannelID = -1;
            this.currentIterator = null;
            this.row = null;
            this.reuse.clear();
        }

        @Override
        public void close() {
            if (this.closed) {
                return;
            }
            try {
                this.resetImpl();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.freeMemory != null) {
                this.freeMemory.clear();
            }
            if (this.reusableMemoryIterator != null) {
                this.reusableMemoryIterator.close();
            }
            this.closed = true;
            ResettableExternalBuffer.this.iterOpenedCount--;
        }

        @Override
        public boolean advanceNext() {
            this.checkValidity();
            try {
                this.updateIteratorIfNeeded();
                do {
                    if (this.currentIterator == null || (this.row = this.currentIterator.next(this.reuse)) == null) continue;
                    ++this.nextRow;
                    return true;
                } while (this.nextIterator());
                return false;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean nextIterator() throws IOException {
            if (this.currentChannelID == -1) {
                if (ResettableExternalBuffer.this.isRowAllInFixedPart) {
                    this.gotoAllInFixedPartRow(this.beginRow);
                } else {
                    this.gotoVariableLengthRow(this.beginRow);
                }
            } else {
                if (this.currentChannelID == Integer.MAX_VALUE) {
                    return false;
                }
                if (this.currentChannelID < ResettableExternalBuffer.this.spilledChannelIDs.size() - 1) {
                    this.nextSpilledIterator();
                } else {
                    this.newMemoryIterator();
                }
            }
            return true;
        }

        private boolean iteratorNeedsUpdate() {
            int size = ResettableExternalBuffer.this.spilledChannelRowOffsets.size();
            return size > 0 && this.currentChannelID == Integer.MAX_VALUE && this.nextRow <= (Integer)ResettableExternalBuffer.this.spilledChannelRowOffsets.get(size - 1);
        }

        private void updateIteratorIfNeeded() throws IOException {
            if (this.iteratorNeedsUpdate()) {
                this.reuse.clear();
                this.reusableMemoryIterator = null;
                if (ResettableExternalBuffer.this.isRowAllInFixedPart) {
                    this.gotoAllInFixedPartRow(this.nextRow);
                } else {
                    this.gotoVariableLengthRow(this.nextRow);
                }
            }
        }

        @Override
        public BinaryRowData getRow() {
            return this.row;
        }

        private void closeCurrentFileReader() throws IOException {
            if (this.fileReader != null) {
                this.fileReader.close();
                this.fileReader = null;
            }
        }

        private void gotoAllInFixedPartRow(int beginRow) throws IOException {
            int beginChannel = this.upperBound(beginRow, ResettableExternalBuffer.this.spilledChannelRowOffsets);
            int beginRowInChannel = this.getBeginIndexInChannel(beginRow, beginChannel);
            if (beginRow == ResettableExternalBuffer.this.numRows) {
                this.newMemoryIterator(beginRowInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            long numRecordsInSegment = (long)ResettableExternalBuffer.this.segmentSize / ResettableExternalBuffer.this.rowLength;
            long offset = (long)beginRowInChannel / numRecordsInSegment * (long)ResettableExternalBuffer.this.segmentSize + (long)beginRowInChannel % numRecordsInSegment * ResettableExternalBuffer.this.rowLength;
            if (beginChannel < ResettableExternalBuffer.this.spilledChannelRowOffsets.size()) {
                this.newSpilledIterator(beginChannel, offset);
            } else {
                this.newMemoryIterator(beginRowInChannel, offset);
            }
        }

        private void gotoVariableLengthRow(int beginRow) throws IOException {
            int beginChannel = this.upperBound(beginRow, ResettableExternalBuffer.this.spilledChannelRowOffsets);
            int beginRowInChannel = this.getBeginIndexInChannel(beginRow, beginChannel);
            if (beginRow == ResettableExternalBuffer.this.numRows) {
                this.newMemoryIterator(beginRowInChannel, ResettableExternalBuffer.this.inMemoryBuffer.getCurrentDataBufferOffset());
                return;
            }
            if (beginChannel < ResettableExternalBuffer.this.spilledChannelRowOffsets.size()) {
                this.newSpilledIterator(beginChannel);
            } else {
                this.newMemoryIterator();
            }
            this.nextRow -= beginRowInChannel;
            for (int i = 0; i < beginRowInChannel; ++i) {
                this.advanceNext();
            }
        }

        private void nextSpilledIterator() throws IOException {
            this.newSpilledIterator(this.currentChannelID + 1);
        }

        private void newSpilledIterator(int channelID) throws IOException {
            this.newSpilledIterator(channelID, 0L);
        }

        private void newSpilledIterator(int channelID, long offset) throws IOException {
            ChannelWithMeta channel = (ChannelWithMeta)ResettableExternalBuffer.this.spilledChannelIDs.get(channelID);
            this.currentChannelID = channelID;
            this.closeCurrentFileReader();
            int segmentNum = (int)(offset / (long)ResettableExternalBuffer.this.segmentSize);
            long seekPosition = segmentNum * ResettableExternalBuffer.this.segmentSize;
            this.fileReader = ResettableExternalBuffer.this.ioManager.createBlockChannelReader(channel.getChannel());
            if (offset > 0L) {
                this.fileReader.seekToPosition(seekPosition);
            }
            HeaderlessChannelReaderInputView inView = new HeaderlessChannelReaderInputView(this.fileReader, this.getReadMemory(), channel.getBlockCount() - segmentNum, channel.getNumBytesInLastBlock(), false, offset - seekPosition);
            this.currentIterator = new BinaryRowChannelInputViewIterator(inView, ResettableExternalBuffer.this.binaryRowSerializer);
        }

        private void newMemoryIterator() throws IOException {
            this.newMemoryIterator(0, 0L);
        }

        private void newMemoryIterator(int beginRow, long offset) throws IOException {
            this.currentChannelID = Integer.MAX_VALUE;
            this.closeCurrentFileReader();
            if (this.reusableMemoryIterator == null) {
                this.reusableMemoryIterator = ResettableExternalBuffer.this.inMemoryBuffer.newIterator(beginRow, offset);
            } else {
                this.reusableMemoryIterator.reset(ResettableExternalBuffer.this.inMemoryBuffer.recordCount, offset);
            }
            this.currentIterator = this.reusableMemoryIterator;
        }

        private int getBeginIndexInChannel(int beginRow, int beginChannel) {
            if (beginChannel > 0) {
                return beginRow - (Integer)ResettableExternalBuffer.this.spilledChannelRowOffsets.get(beginChannel - 1);
            }
            return beginRow;
        }

        private List<MemorySegment> getReadMemory() {
            if (this.freeMemory == null) {
                this.freeMemory = new ArrayList<MemorySegment>();
                for (int i = 0; i < 2; ++i) {
                    this.freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(ResettableExternalBuffer.this.segmentSize));
                }
            }
            return this.freeMemory;
        }

        private int upperBound(int goal, List<Integer> list) {
            if (list.size() == 0) {
                return 0;
            }
            if (list.get(list.size() - 1) <= goal) {
                return list.size();
            }
            int head = 0;
            int tail = list.size() - 1;
            while (head < tail) {
                int mid = (head + tail) / 2;
                if (list.get(mid) <= goal) {
                    head = mid + 1;
                    continue;
                }
                tail = mid;
            }
            return head;
        }
    }
}

