package org.apache.flink.odps.sink.cache;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.aspectj.lang.JoinPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/cache/SubPartition.class */
public class SubPartition {
    private static final Logger LOG;

    @Nullable
    private BufferConsumer currentBuffer;
    private final FileChannelBoundedData data;
    private final boolean useDirectFileTransfer;
    private int numDataBuffersWritten;
    private int numBuffersAndEventsWritten;
    private boolean isFinished;
    private boolean isReleased;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy(JoinPoint.SYNCHRONIZATION_LOCK)
    private final Set<ResultSubpartitionView> readers = new HashSet();

    public SubPartition(FileChannelBoundedData fileChannelBoundedData, boolean z) {
        this.data = fileChannelBoundedData;
        this.useDirectFileTransfer = z;
    }

    public SubPartition(FileChannelBoundedData fileChannelBoundedData, boolean z, boolean z2) {
        this.data = fileChannelBoundedData;
        this.useDirectFileTransfer = z;
        this.isFinished = z2;
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public boolean isReleased() {
        return this.isReleased;
    }

    public boolean add(BufferConsumer bufferConsumer, int i) throws IOException {
        if (isFinished()) {
            bufferConsumer.close();
            return false;
        }
        flushCurrentBuffer();
        this.currentBuffer = bufferConsumer;
        return true;
    }

    public void flush() {
        try {
            flushCurrentBuffer();
        } catch (IOException e) {
            throw new FlinkRuntimeException(e.getMessage(), e);
        }
    }

    private void flushCurrentBuffer() throws IOException {
        if (this.currentBuffer != null) {
            writeAndCloseBufferConsumer(this.currentBuffer);
            this.currentBuffer = null;
        }
    }

    private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
        try {
            Buffer build = bufferConsumer.build();
            try {
                this.data.writeBuffer(build);
                this.numBuffersAndEventsWritten++;
                if (build.isBuffer()) {
                    this.numDataBuffersWritten++;
                }
                build.recycleBuffer();
            } catch (Throwable th) {
                build.recycleBuffer();
                throw th;
            }
        } finally {
            bufferConsumer.close();
        }
    }

    public void finish() throws IOException {
        Preconditions.checkState(!this.isReleased, "data partition already released");
        Preconditions.checkState(!this.isFinished, "data partition already finished");
        this.isFinished = true;
        flushCurrentBuffer();
        writeAndCloseBufferConsumer(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE, false));
        this.data.finishWrite();
    }

    public void release() throws IOException {
        synchronized (this.lock) {
            if (this.isReleased) {
                return;
            }
            this.isReleased = true;
            this.isFinished = true;
            if (this.currentBuffer != null) {
                this.currentBuffer.close();
                this.currentBuffer = null;
            }
            checkReaderReferencesAndDispose();
        }
    }

    public ResultSubpartitionView createReadView(BufferAvailabilityListener bufferAvailabilityListener) throws IOException {
        PartitionBufferReader partitionBufferReader;
        synchronized (this.lock) {
            Preconditions.checkState(!this.isReleased, "data partition already released");
            Preconditions.checkState(this.isFinished, "writing of blocking partition not yet finished");
            if (!Files.isReadable(this.data.getFilePath())) {
                throw new FlinkRuntimeException("Partition not found.");
            }
            partitionBufferReader = new PartitionBufferReader(this, this.data, this.numDataBuffersWritten, bufferAvailabilityListener);
            this.readers.add(partitionBufferReader);
        }
        return partitionBufferReader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseReaderReference(ResultSubpartitionView resultSubpartitionView) throws IOException {
        synchronized (this.lock) {
            if (this.readers.remove(resultSubpartitionView) && this.isReleased) {
                checkReaderReferencesAndDispose();
            }
        }
    }

    @GuardedBy(JoinPoint.SYNCHRONIZATION_LOCK)
    private void checkReaderReferencesAndDispose() throws IOException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (this.readers.isEmpty()) {
            this.data.close();
        }
    }

    @VisibleForTesting
    public BufferConsumer getCurrentBuffer() {
        return this.currentBuffer;
    }

    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return 0;
    }

    protected long getTotalNumberOfBuffers() {
        return this.numBuffersAndEventsWritten;
    }

    protected long getTotalNumberOfBytes() {
        return this.data.getSize();
    }

    int getBuffersInBacklog() {
        return this.numDataBuffersWritten;
    }

    public static SubPartition createWithFileChannel(File file, int i, boolean z) throws IOException {
        return new SubPartition(FileChannelBoundedData.create(file.toPath(), i), !z);
    }

    public static SubPartition createForRead(File file, int i, boolean z) {
        return new SubPartition(FileChannelBoundedData.createForRead(file.toPath(), i), !z, true);
    }

    static {
        $assertionsDisabled = !SubPartition.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SubPartition.class);
    }
}
