package org.apache.flink.odps.sink.cache;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.odps.sink.cache.InProgressFileWriter;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/odps/sink/cache/PartitionInProgressFileWriter.class */
public class PartitionInProgressFileWriter implements InProgressFileWriter<RowData, String> {
    private final String bucketID;
    private final long creationTime;
    private long lastUpdateTime;
    private BufferPool bufferPool;
    private SubPartition subPartition;
    private TypeSerializer<RowData> inputSerializer;
    private BufferBuilder currentBufferBuilder;
    private boolean isFinished;
    private Path path;
    private final boolean flushAlways;
    private final String tmpDir;
    private Counter numBytesOut = new SimpleCounter();
    private Counter numBuffersOut = new SimpleCounter();
    private Counter numRowOut = new SimpleCounter();
    private DataOutputSerializer serializer = new DataOutputSerializer(2048);

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/PartitionInProgressFileWriter$PartitionCommittingFile.class */
    public static class PartitionCommittingFile extends PartitionPendingFile implements InProgressFileWriter.CommittingFile<RowData> {
        public PartitionCommittingFile(Path path, long j, long j2, SubPartition subPartition, TypeSerializer<RowData> typeSerializer, String str) {
            super(path, j, j2, subPartition, typeSerializer, str);
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/PartitionInProgressFileWriter$PartitionCommittingFileSerializer.class */
    public static class PartitionCommittingFileSerializer implements SimpleVersionedSerializer<InProgressFileWriter.CommittingFile<RowData>> {
        private final TypeSerializer<RowData> inputSerializer;
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        private static final int MAGIC_NUMBER = 746929298;
        private final String tmpDir;
        private final int readBufferSize;

        public PartitionCommittingFileSerializer(TypeSerializer<RowData> typeSerializer, String str, int i) {
            this.inputSerializer = typeSerializer;
            this.tmpDir = str;
            this.readBufferSize = i;
        }

        public int getVersion() {
            return 1;
        }

        public byte[] serialize(InProgressFileWriter.CommittingFile<RowData> committingFile) throws IOException {
            if (!(committingFile instanceof PartitionCommittingFile)) {
                throw new UnsupportedOperationException("Only PartitionCommittingFile is supported.");
            }
            PartitionCommittingFile partitionCommittingFile = (PartitionCommittingFile) committingFile;
            Path path = partitionCommittingFile.getPath();
            long size = partitionCommittingFile.getSize();
            long rowCount = partitionCommittingFile.getRowCount();
            byte[] bytes = path.toUri().toString().getBytes(CHARSET);
            byte[] bArr = new byte[24 + bytes.length];
            ByteBuffer order = ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN);
            order.putInt(MAGIC_NUMBER);
            order.putInt(bytes.length);
            order.put(bytes);
            order.putLong(size);
            order.putLong(rowCount);
            return bArr;
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public InProgressFileWriter.CommittingFile<RowData> m2793deserialize(int i, byte[] bArr) throws IOException {
            switch (i) {
                case 1:
                    return deserializeV1(bArr);
                default:
                    throw new IOException("Unrecognized version or corrupt state: " + i);
            }
        }

        private PartitionCommittingFile deserializeV1(byte[] bArr) throws IOException {
            ByteBuffer order = ByteBuffer.wrap(bArr).order(ByteOrder.LITTLE_ENDIAN);
            if (order.getInt() != MAGIC_NUMBER) {
                throw new IOException("Corrupt data: Unexpected magic number.");
            }
            byte[] bArr2 = new byte[order.getInt()];
            order.get(bArr2);
            Path path = new Path(new String(bArr2, CHARSET));
            return new PartitionCommittingFile(path, order.getLong(), order.getLong(), SubPartition.createForRead(new File(path.toUri()), this.readBufferSize, false), this.inputSerializer, this.tmpDir);
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/PartitionInProgressFileWriter$PartitionFileReader.class */
    public static class PartitionFileReader implements InProgressFileWriter.FileReader<RowData> {
        private final PartitionRecordReader recordReader;
        private final long recordCount;
        private long readerIndex = 0;

        public PartitionFileReader(SubPartition subPartition, long j, TypeSerializer<RowData> typeSerializer, String str) throws IOException {
            this.recordReader = new PartitionRecordReader(subPartition, new String[]{str}, typeSerializer);
            this.recordCount = j;
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.FileReader
        public boolean hasNext() throws IOException {
            return this.readerIndex < this.recordCount;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.FileReader
        public RowData next() throws IOException {
            this.readerIndex++;
            return this.recordReader.next();
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.FileReader
        public void close() throws IOException {
            this.recordReader.clearBuffers();
            this.recordReader.releaseAllResources();
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/PartitionInProgressFileWriter$PartitionPendingFile.class */
    public static class PartitionPendingFile implements InProgressFileWriter.PendingFile<RowData> {
        private final Path targetPath;
        private final long fileSize;
        private final long rowCount;
        private final SubPartition subPartition;
        private final TypeSerializer<RowData> inputSerializer;
        private final String tmpDir;

        public PartitionPendingFile(Path path, long j, long j2, SubPartition subPartition, TypeSerializer<RowData> typeSerializer, String str) {
            this.targetPath = path;
            this.fileSize = j;
            this.rowCount = j2;
            this.subPartition = subPartition;
            this.inputSerializer = typeSerializer;
            this.tmpDir = str;
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public Path getPath() {
            return this.targetPath;
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public long getSize() {
            return this.fileSize;
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public long getRowCount() {
            return this.rowCount;
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public InProgressFileWriter.CommittingFile<RowData> commit() {
            return new PartitionCommittingFile(this.targetPath, this.fileSize, this.rowCount, this.subPartition, this.inputSerializer, this.tmpDir);
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public void dispose() throws IOException {
            this.subPartition.release();
        }

        @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter.PendingFile
        public InProgressFileWriter.FileReader<RowData> getFileReader() throws IOException {
            return new PartitionFileReader(this.subPartition, getRowCount(), this.inputSerializer, this.tmpDir);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PartitionPendingFile partitionPendingFile = (PartitionPendingFile) obj;
            return this.fileSize == partitionPendingFile.fileSize && this.rowCount == partitionPendingFile.rowCount && Objects.equals(this.targetPath, partitionPendingFile.targetPath);
        }

        public int hashCode() {
            return Objects.hash(this.targetPath, Long.valueOf(this.fileSize));
        }
    }

    public PartitionInProgressFileWriter(String str, long j, FileChannelManager fileChannelManager, BufferPool bufferPool, TypeSerializer<RowData> typeSerializer, boolean z, String str2, int i) throws IOException {
        this.bucketID = str;
        this.creationTime = j;
        this.lastUpdateTime = j;
        this.inputSerializer = typeSerializer;
        this.bufferPool = bufferPool;
        File pathFile = fileChannelManager.createChannel().getPathFile();
        this.path = new Path(pathFile.toURI());
        this.subPartition = SubPartition.createWithFileChannel(pathFile, i, false);
        this.isFinished = false;
        this.flushAlways = z;
        this.tmpDir = str2;
    }

    public void markWrite(long j) {
        this.lastUpdateTime = j;
    }

    @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter
    public void write(RowData rowData, long j) throws IOException {
        this.numRowOut.inc();
        emitRecord(serializeRecord(this.serializer, rowData));
        markWrite(j);
        if (this.flushAlways) {
            flush();
        }
    }

    public void flush() {
        finishUnicastBufferBuilder();
        this.subPartition.flush();
    }

    @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter
    public InProgressFileWriter.PendingFile<RowData> closeForUpload() throws IOException {
        checkInProduceState();
        finishUnicastBufferBuilder();
        this.subPartition.finish();
        this.isFinished = true;
        return new PartitionPendingFile(this.path, getSize(), this.numRowOut.getCount(), this.subPartition, this.inputSerializer, this.tmpDir);
    }

    @Override // org.apache.flink.odps.sink.cache.InProgressFileWriter
    public void dispose() throws IOException {
        if (this.currentBufferBuilder != null) {
            this.currentBufferBuilder.close();
        }
        this.subPartition.release();
    }

    /* renamed from: getBucketId, reason: merged with bridge method [inline-methods] */
    public String m2791getBucketId() {
        return this.bucketID;
    }

    public long getCreationTime() {
        return this.creationTime;
    }

    public long getLastUpdateTime() {
        return this.lastUpdateTime;
    }

    public long getSize() throws IOException {
        return this.numBytesOut.getCount();
    }

    private ByteBuffer serializeRecord(DataOutputSerializer dataOutputSerializer, RowData rowData) throws IOException {
        dataOutputSerializer.setPositionUnsafe(4);
        this.inputSerializer.serialize(rowData, dataOutputSerializer);
        dataOutputSerializer.writeIntUnsafe(dataOutputSerializer.length() - 4, 0);
        return dataOutputSerializer.wrapAsByteBuffer();
    }

    private void emitRecord(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder bufferBuilder;
        BufferBuilder appendUnicastDataForNewRecord = appendUnicastDataForNewRecord(byteBuffer);
        while (true) {
            bufferBuilder = appendUnicastDataForNewRecord;
            if (!byteBuffer.hasRemaining()) {
                break;
            }
            finishUnicastBufferBuilder();
            appendUnicastDataForNewRecord = appendUnicastDataForRecordContinuation(byteBuffer);
        }
        if (bufferBuilder.isFull()) {
            finishUnicastBufferBuilder();
        }
    }

    private BufferBuilder appendUnicastDataForNewRecord(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder bufferBuilder = this.currentBufferBuilder;
        if (bufferBuilder == null) {
            bufferBuilder = requestNewUnicastBufferBuilder();
            this.subPartition.add(bufferBuilder.createBufferConsumerFromBeginning(), 0);
        }
        bufferBuilder.appendAndCommit(byteBuffer);
        return bufferBuilder;
    }

    private void finishUnicastBufferBuilder() {
        BufferBuilder bufferBuilder = this.currentBufferBuilder;
        if (bufferBuilder != null) {
            this.numBytesOut.inc(bufferBuilder.finish());
            this.numBuffersOut.inc();
            this.currentBufferBuilder = null;
            bufferBuilder.close();
        }
    }

    private BufferBuilder appendUnicastDataForRecordContinuation(ByteBuffer byteBuffer) throws IOException {
        BufferBuilder requestNewUnicastBufferBuilder = requestNewUnicastBufferBuilder();
        this.subPartition.add(requestNewUnicastBufferBuilder.createBufferConsumerFromBeginning(), requestNewUnicastBufferBuilder.appendAndCommit(byteBuffer));
        return requestNewUnicastBufferBuilder;
    }

    private BufferBuilder requestNewUnicastBufferBuilder() throws IOException {
        checkInProduceState();
        BufferBuilder requestNewBufferBuilderFromPool = requestNewBufferBuilderFromPool();
        this.currentBufferBuilder = requestNewBufferBuilderFromPool;
        return requestNewBufferBuilderFromPool;
    }

    private void checkInProduceState() throws IllegalStateException {
        Preconditions.checkState(!this.isFinished, "Partition already finished.");
    }

    private BufferBuilder requestNewBufferBuilderFromPool() throws IOException {
        BufferBuilder requestBufferBuilder = this.bufferPool.requestBufferBuilder();
        if (requestBufferBuilder != null) {
            return requestBufferBuilder;
        }
        try {
            return this.bufferPool.requestBufferBuilderBlocking();
        } catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for buffer");
        }
    }
}
