package org.apache.flink.odps.sink.cache;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.flink.odps.sink.cache.InProgressFileWriter;
import org.apache.flink.odps.sink.cache.WriterThread;
import org.apache.flink.odps.sink.writer.CommitInfo;
import org.apache.flink.odps.sink.writer.DataWriter;
import org.apache.flink.odps.sink.writer.WriterInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/cache/Bucket.class */
public class Bucket<IN, BucketID> {
    private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
    private final BucketID bucketId;
    private final int subtaskIndex;
    private final BucketWriter<IN, BucketID> bucketWriter;
    private final RollingPolicy<IN, BucketID> rollingPolicy;
    private InProgressFileWriter<IN, BucketID> inProgressPart;
    private final Map<Long, List<InProgressFileWriter.CommittingFile<IN>>> committingFilePerCheckpoint;
    private final Map<Long, CommitInfo> committingInfoPerCheckpoint;
    private final Map<Long, Boolean> commitStatus;
    private List<InProgressFileWriter.CommittingFile<IN>> committingFileForCurrentCheckpoint;
    private final List<InProgressFileWriter.PendingFile<IN>> pendingFileForCurrentCheckpoint;
    private DataWriter<IN> dataWriterForCurrentCheckpoint;
    private WriterThread<BucketID> writerThread;
    private WriterInfo writerInfoForCurrentCheckpoint;
    private volatile boolean bootstrap;

    @Nullable
    private final FileLifeCycleListener<BucketID> fileListener;

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/Bucket$ReUploadCommittingFileRequest.class */
    class ReUploadCommittingFileRequest implements WriterThread.WriteRequest<BucketID> {
        private final List<InProgressFileWriter.CommittingFile<IN>> committingFileList;
        private final long checkpointId;
        private final WriterThread.RequestDoneCallback callback;
        private final WriterInfo writerInfo;
        private DataWriter<IN> dataWriter;

        public ReUploadCommittingFileRequest(List<InProgressFileWriter.CommittingFile<IN>> list, WriterInfo writerInfo, long j, WriterThread.RequestDoneCallback requestDoneCallback) {
            this.committingFileList = list;
            this.writerInfo = writerInfo;
            this.checkpointId = j;
            this.callback = requestDoneCallback;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void write() throws IOException {
            this.dataWriter = Bucket.this.bucketWriter.openDataWriter(Bucket.this.bucketId, this.writerInfo);
            Iterator<InProgressFileWriter.CommittingFile<IN>> it = this.committingFileList.iterator();
            while (it.hasNext()) {
                InProgressFileWriter.FileReader<IN> fileReader = it.next().getFileReader();
                while (fileReader.hasNext()) {
                    this.dataWriter.addElement(fileReader.next());
                }
                fileReader.close();
                this.dataWriter.flush();
            }
            CommitInfo finish = this.dataWriter.finish(this.checkpointId);
            if (Bucket.this.fileListener != null) {
                Bucket.this.fileListener.onPendingFileCommitting(Bucket.this.bucketId, finish);
            }
            Bucket.this.committingInfoPerCheckpoint.put(Long.valueOf(this.checkpointId), finish);
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void requestDone(IOException iOException) {
            if (iOException == null) {
                this.callback.requestSuccessful();
                return;
            }
            if (this.dataWriter != null) {
                this.dataWriter.abort();
            }
            this.callback.requestFailed(iOException);
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public boolean isReady() {
            return true;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public BucketID getBucketId() {
            return (BucketID) Bucket.this.bucketId;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public int partNumber() {
            return 1;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void reset() {
            if (this.dataWriter != null) {
                this.dataWriter.reset();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/odps/sink/cache/Bucket$WritePendingFileRequest.class */
    public class WritePendingFileRequest implements WriterThread.WriteRequest<BucketID> {
        private final InProgressFileWriter.PendingFile<IN> pendingFile;
        private final boolean commit;
        private final long checkpointId;
        private final WriterThread.RequestDoneCallback callback;
        private final int partNumber;

        public WritePendingFileRequest(InProgressFileWriter.PendingFile<IN> pendingFile, boolean z, long j, int i, WriterThread.RequestDoneCallback requestDoneCallback) {
            this.pendingFile = pendingFile;
            this.commit = z;
            this.checkpointId = j;
            this.callback = requestDoneCallback;
            this.partNumber = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void write() throws IOException {
            if (this.pendingFile != null) {
                if (Bucket.this.dataWriterForCurrentCheckpoint == null) {
                    Bucket.this.dataWriterForCurrentCheckpoint = Bucket.this.bucketWriter.openDataWriter(Bucket.this.bucketId, Bucket.this.writerInfoForCurrentCheckpoint);
                }
                InProgressFileWriter.FileReader<IN> fileReader = this.pendingFile.getFileReader();
                while (fileReader.hasNext()) {
                    try {
                        Bucket.this.dataWriterForCurrentCheckpoint.addElement(fileReader.next());
                    } finally {
                        fileReader.close();
                    }
                }
            }
            if (Bucket.this.dataWriterForCurrentCheckpoint != null) {
                Bucket.this.dataWriterForCurrentCheckpoint.flush();
            }
            if (this.commit) {
                if (Bucket.this.dataWriterForCurrentCheckpoint == null) {
                    Bucket.this.dataWriterForCurrentCheckpoint = Bucket.this.bucketWriter.openDataWriter(Bucket.this.bucketId, Bucket.this.writerInfoForCurrentCheckpoint);
                }
                CommitInfo finish = Bucket.this.dataWriterForCurrentCheckpoint.finish(this.checkpointId);
                Bucket.this.dataWriterForCurrentCheckpoint = null;
                Bucket.this.writerInfoForCurrentCheckpoint = null;
                Bucket.this.bootstrap = false;
                if (Bucket.this.fileListener != null) {
                    Bucket.this.fileListener.onPendingFileCommitting(Bucket.this.bucketId, finish);
                }
                Bucket.this.committingInfoPerCheckpoint.put(Long.valueOf(this.checkpointId), finish);
            }
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void requestDone(IOException iOException) {
            if (iOException != null) {
                this.callback.requestFailed(iOException);
            } else {
                this.callback.requestSuccessful();
            }
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public boolean isReady() {
            return Bucket.this.bootstrap;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public BucketID getBucketId() {
            return (BucketID) Bucket.this.bucketId;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public int partNumber() {
            return this.partNumber;
        }

        @Override // org.apache.flink.odps.sink.cache.WriterThread.WriteRequest
        public void reset() {
            if (Bucket.this.dataWriterForCurrentCheckpoint != null) {
                Bucket.this.dataWriterForCurrentCheckpoint.reset();
            }
        }
    }

    private Bucket(int i, BucketID bucketid, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        this.subtaskIndex = i;
        this.bucketId = (BucketID) Preconditions.checkNotNull(bucketid);
        this.bucketWriter = (BucketWriter) Preconditions.checkNotNull(bucketWriter);
        this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        this.fileListener = fileLifeCycleListener;
        this.pendingFileForCurrentCheckpoint = new ArrayList();
        this.committingFileForCurrentCheckpoint = new ArrayList();
        this.committingFilePerCheckpoint = new TreeMap();
        this.committingInfoPerCheckpoint = new ConcurrentHashMap();
        this.commitStatus = new TreeMap();
    }

    private Bucket(int i, BucketState<IN, BucketID> bucketState, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        this(i, bucketState.getBucketId(), bucketWriter, rollingPolicy, fileLifeCycleListener);
        commitRecoveredPendingFiles(bucketState);
    }

    public void commitRecoveredPendingFiles(BucketState<IN, BucketID> bucketState) {
        this.committingInfoPerCheckpoint.putAll(bucketState.getCommittingInfoPerCheckpoint());
        this.committingFilePerCheckpoint.putAll(bucketState.getCommittingFilePerCheckpoint());
        if (this.committingInfoPerCheckpoint.isEmpty()) {
            return;
        }
        Preconditions.checkArgument(this.committingInfoPerCheckpoint.size() == 1, "Commit info size: " + this.committingInfoPerCheckpoint.size());
        Preconditions.checkArgument(this.committingFilePerCheckpoint.size() == 1, "Commit file size: " + this.committingFilePerCheckpoint.size());
        Iterator<Long> it = this.committingInfoPerCheckpoint.keySet().iterator();
        while (it.hasNext()) {
            this.commitStatus.put(it.next(), false);
        }
        for (CommitInfo commitInfo : this.committingInfoPerCheckpoint.values()) {
            if (this.fileListener != null) {
                this.fileListener.onPendingFileCommitting(this.bucketId, commitInfo);
            }
        }
    }

    public void bootstrap(WriterInfo writerInfo) {
        this.writerInfoForCurrentCheckpoint = writerInfo;
        this.bootstrap = true;
    }

    public void snapshot(long j) {
        this.committingFilePerCheckpoint.put(Long.valueOf(j), this.committingFileForCurrentCheckpoint);
        this.committingFileForCurrentCheckpoint = new ArrayList();
    }

    public void setWriterThread(WriterThread<BucketID> writerThread) {
        this.writerThread = writerThread;
    }

    public BucketID getBucketId() {
        return this.bucketId;
    }

    public BucketState<IN, BucketID> getBucketState() {
        TreeMap treeMap = new TreeMap(this.committingInfoPerCheckpoint);
        return new BucketState<>(this.bucketId, new TreeMap(this.committingFilePerCheckpoint), treeMap);
    }

    public boolean isActive() {
        return (this.inProgressPart == null && this.pendingFileForCurrentCheckpoint.isEmpty() && this.committingFileForCurrentCheckpoint.isEmpty() && this.committingInfoPerCheckpoint.isEmpty()) ? false : true;
    }

    public boolean hasData() {
        return (this.inProgressPart == null && this.pendingFileForCurrentCheckpoint.isEmpty() && this.committingFileForCurrentCheckpoint.isEmpty()) ? false : true;
    }

    public void dispose() throws IOException {
        if (this.inProgressPart != null) {
            this.inProgressPart.dispose();
            this.inProgressPart = null;
        }
        disposeCurrentPendingFile();
        disposeCurrentCommittingFile();
    }

    public void write(IN in, long j) throws IOException {
        if (this.inProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.inProgressPart, in)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", new Object[]{Integer.valueOf(this.subtaskIndex), this.bucketId, in});
            }
            this.inProgressPart = closeAndCreatePartFile(j);
        }
        this.inProgressPart.write(in, j);
    }

    public void onProcessingTime(long j) throws IOException {
        if (this.inProgressPart == null || !this.rollingPolicy.shouldRollOnProcessingTime(this.inProgressPart, j)) {
            return;
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy (in-progress file created @ {}, last updated @ {} and current time is {}), (file size is {}).", new Object[]{Integer.valueOf(this.subtaskIndex), this.bucketId, Long.valueOf(this.inProgressPart.getCreationTime()), Long.valueOf(this.inProgressPart.getLastUpdateTime()), Long.valueOf(j), Long.valueOf(this.inProgressPart.getSize())});
        }
        closePartFile(false, -1L, null);
    }

    public void onReceptionOfCheckpoint(long j, WriterThread.RequestDoneCallback requestDoneCallback) throws IOException {
        this.commitStatus.put(Long.valueOf(j), false);
        if (this.inProgressPart == null || !this.rollingPolicy.shouldRollOnCheckpoint(this.inProgressPart)) {
            submitCommitPendingFile(null, true, j, requestDoneCallback, this.pendingFileForCurrentCheckpoint.size() + 1);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", Integer.valueOf(this.subtaskIndex), this.bucketId);
        }
        closePartFile(true, j, requestDoneCallback);
    }

    private InProgressFileWriter<IN, BucketID> closeAndCreatePartFile(long j) throws IOException {
        closePartFile(false, -1L, null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} opening new part filefor bucket id={}.", Integer.valueOf(this.subtaskIndex), this.bucketId);
        }
        return this.bucketWriter.openNewInProgressFile(this.bucketId, j);
    }

    private InProgressFileWriter.PendingFile<IN> closePartFile(boolean z, long j, WriterThread.RequestDoneCallback requestDoneCallback) throws IOException {
        InProgressFileWriter.PendingFile<IN> pendingFile = null;
        if (this.inProgressPart != null) {
            pendingFile = this.inProgressPart.closeForUpload();
            this.pendingFileForCurrentCheckpoint.add(pendingFile);
            this.inProgressPart = null;
        }
        if (pendingFile != null || z) {
            submitCommitPendingFile(pendingFile, z, j, requestDoneCallback, this.pendingFileForCurrentCheckpoint.size());
        }
        return pendingFile;
    }

    private void submitCommitPendingFile(final InProgressFileWriter.PendingFile<IN> pendingFile, final boolean z, long j, final WriterThread.RequestDoneCallback requestDoneCallback, int i) throws IOException {
        this.writerThread.addRequest(new WritePendingFileRequest(pendingFile, z, j, i, new WriterThread.RequestDoneCallback() { // from class: org.apache.flink.odps.sink.cache.Bucket.1
            @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
            public void requestSuccessful() {
                if (pendingFile != null) {
                    Bucket.this.committingFileForCurrentCheckpoint.add(pendingFile.commit());
                }
                if (z) {
                    if (!(Bucket.this.pendingFileForCurrentCheckpoint.size() == Bucket.this.committingFileForCurrentCheckpoint.size())) {
                        IOException iOException = new IOException("Not all pending file committed! Pending file " + Bucket.this.pendingFileForCurrentCheckpoint.size() + ", committing file " + Bucket.this.committingFileForCurrentCheckpoint.size());
                        Bucket.this.pendingFileForCurrentCheckpoint.clear();
                        Bucket.this.committingFileForCurrentCheckpoint.clear();
                        requestFailed(iOException);
                        return;
                    }
                    Bucket.this.pendingFileForCurrentCheckpoint.clear();
                }
                if (requestDoneCallback != null) {
                    requestDoneCallback.requestSuccessful();
                }
            }

            @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
            public void requestFailed(IOException iOException) {
                if (Bucket.this.dataWriterForCurrentCheckpoint != null) {
                    Bucket.LOG.info("Abort data writer for bucket {}", Bucket.this.bucketId);
                    Bucket.this.dataWriterForCurrentCheckpoint.abort();
                }
                if (requestDoneCallback != null) {
                    requestDoneCallback.requestFailed(iOException);
                }
            }
        }));
    }

    public void disposeCurrentPendingFile() throws IOException {
        Iterator<InProgressFileWriter.PendingFile<IN>> it = this.pendingFileForCurrentCheckpoint.iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
        this.pendingFileForCurrentCheckpoint.clear();
    }

    public void disposeCurrentCommittingFile() throws IOException {
        Iterator<InProgressFileWriter.CommittingFile<IN>> it = this.committingFileForCurrentCheckpoint.iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
        this.committingFileForCurrentCheckpoint.clear();
    }

    public void disposeCommittingFile(long j, boolean z) throws IOException {
        boolean z2;
        Iterator<Map.Entry<Long, CommitInfo>> it = this.committingInfoPerCheckpoint.entrySet().iterator();
        while (it.hasNext()) {
            long longValue = it.next().getKey().longValue();
            if (z) {
                z2 = longValue < j;
            } else {
                z2 = longValue <= j;
            }
            if (z2) {
                if (this.committingFilePerCheckpoint.containsKey(Long.valueOf(longValue))) {
                    Iterator<InProgressFileWriter.CommittingFile<IN>> it2 = this.committingFilePerCheckpoint.get(Long.valueOf(longValue)).iterator();
                    while (it2.hasNext()) {
                        it2.next().dispose();
                    }
                    this.committingFilePerCheckpoint.remove(Long.valueOf(longValue));
                }
                it.remove();
            }
        }
    }

    public void setCommitted(long j) {
        Iterator<Long> it = this.commitStatus.keySet().iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            if (longValue <= j) {
                this.commitStatus.put(Long.valueOf(longValue), true);
            }
        }
    }

    public boolean isCommitted(long j) {
        return !this.commitStatus.containsKey(Long.valueOf(j)) || this.commitStatus.get(Long.valueOf(j)).booleanValue();
    }

    public boolean isCommittedLessThanCheckpoint(long j) {
        Iterator<Long> it = this.commitStatus.keySet().iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            if (longValue < j && !this.commitStatus.get(Long.valueOf(longValue)).booleanValue()) {
                return false;
            }
        }
        return true;
    }

    public boolean reUploadCommittingFile(long j, WriterInfo writerInfo, final WriterThread.RequestDoneCallback requestDoneCallback) throws IOException {
        List<InProgressFileWriter.CommittingFile<IN>> list;
        if (!this.committingInfoPerCheckpoint.containsKey(Long.valueOf(j))) {
            LOG.error("Cannot find committing info for checkpoint {} ", Long.valueOf(j));
            return false;
        }
        if (this.committingFilePerCheckpoint.containsKey(Long.valueOf(j))) {
            list = this.committingFilePerCheckpoint.get(Long.valueOf(j));
            LOG.info("Find reUpload committing file, size {}", Integer.valueOf(list.size()));
        } else {
            list = this.committingFileForCurrentCheckpoint;
            LOG.info("ReUpload current committing file, size {}", Integer.valueOf(list.size()));
        }
        this.writerThread.addRequest(new ReUploadCommittingFileRequest(list, writerInfo, j, new WriterThread.RequestDoneCallback() { // from class: org.apache.flink.odps.sink.cache.Bucket.2
            @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
            public void requestSuccessful() {
                if (requestDoneCallback != null) {
                    requestDoneCallback.requestSuccessful();
                }
            }

            @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
            public void requestFailed(IOException iOException) {
                if (requestDoneCallback != null) {
                    requestDoneCallback.requestFailed(iOException);
                }
            }
        }));
        return true;
    }

    public static <IN, BucketID> Bucket<IN, BucketID> getNew(int i, BucketID bucketid, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        return new Bucket<>(i, bucketid, bucketWriter, rollingPolicy, fileLifeCycleListener);
    }

    public static <IN, BucketID> Bucket<IN, BucketID> restore(int i, BucketState<IN, BucketID> bucketState, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        return new Bucket<>(i, (BucketState) bucketState, (BucketWriter) bucketWriter, (RollingPolicy) rollingPolicy, (FileLifeCycleListener) fileLifeCycleListener);
    }
}
