package org.apache.flink.odps.sink.cache;

import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.odps.sink.cache.WriterThread;
import org.apache.flink.odps.sink.writer.WriterInfo;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/cache/Buckets.class */
public class Buckets<IN, BucketID> implements ProcessingTimeCallback {
    private static final Logger LOG = LoggerFactory.getLogger(Buckets.class);
    private final int subtaskIndex;
    private final Map<BucketID, Bucket<IN, BucketID>> activeBuckets;
    private final BucketWriter<IN, BucketID> bucketWriter;
    private final RollingPolicy<IN, BucketID> rollingPolicy;
    private final BucketAssigner<IN, BucketID> bucketAssigner;
    private final BucketerContext bucketerContext = new BucketerContext();

    @Nullable
    private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;

    @Nullable
    private FileLifeCycleListener<BucketID> fileLifeCycleListener;
    private final ProcessingTimeService procTimeService;
    private final long bucketCheckInterval;
    private final WriterThread<BucketID>[] writerThreads;
    private final int threadNum;
    private final BucketStateSerializer<IN, BucketID> bucketStateSerializer;
    private final Map<BucketID, WriterInfo> bucketsWriterInfo;

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/Buckets$BucketerContext.class */
    private static final class BucketerContext implements BucketAssigner.Context {

        @Nullable
        private Long elementTimestamp;
        private long currentWatermark;
        private long currentProcessingTime;

        private BucketerContext() {
            this.elementTimestamp = null;
            this.currentWatermark = Long.MIN_VALUE;
            this.currentProcessingTime = Long.MIN_VALUE;
        }

        void update(@Nullable Long l, long j, long j2) {
            this.elementTimestamp = l;
            this.currentWatermark = j;
            this.currentProcessingTime = j2;
        }

        public long currentProcessingTime() {
            return this.currentProcessingTime;
        }

        public long currentWatermark() {
            return this.currentWatermark;
        }

        @Nullable
        public Long timestamp() {
            return this.elementTimestamp;
        }
    }

    public Buckets(Configuration configuration, BucketWriter<IN, BucketID> bucketWriter, BucketAssigner<IN, BucketID> bucketAssigner, RollingPolicy<IN, BucketID> rollingPolicy, int i, ProcessingTimeService processingTimeService, Comparator<BucketID> comparator) {
        this.bucketWriter = (BucketWriter) Preconditions.checkNotNull(bucketWriter);
        this.bucketAssigner = (BucketAssigner) Preconditions.checkNotNull(bucketAssigner);
        this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        this.subtaskIndex = i;
        this.activeBuckets = new TreeMap(comparator);
        this.procTimeService = processingTimeService;
        this.bucketCheckInterval = ((Long) configuration.get(OdpsOptions.SINK_BUCKET_CHECK_INTERVAL)).longValue();
        processingTimeService.registerTimer(processingTimeService.getCurrentProcessingTime() + this.bucketCheckInterval, this);
        this.threadNum = configuration.getInteger(OdpsOptions.SINK_THREAD_NUM);
        this.writerThreads = new WriterThread[this.threadNum];
        for (int i2 = 0; i2 < this.threadNum; i2++) {
            WriterThread<BucketID> writerThread = new WriterThread<>(comparator, configuration.getInteger(OdpsOptions.SINK_FILE_WRITER_MAX_RETRIES));
            this.writerThreads[i2] = writerThread;
            writerThread.setName("OdpsBucket writer thread #" + (i2 + 1));
            writerThread.setDaemon(true);
            writerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.flink.odps.sink.cache.Buckets.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    Buckets.LOG.error("IO Thread '" + thread.getName() + "' terminated due to an exception. Shutting down I/O Manager.", th);
                }
            });
            writerThread.start();
        }
        this.bucketStateSerializer = new BucketStateSerializer<>(bucketAssigner.getSerializer(), bucketWriter.getCommitInfoSerializer(), bucketWriter.getCommittingFileSerializer());
        this.bucketsWriterInfo = new HashMap();
    }

    public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener) {
        this.bucketLifeCycleListener = (BucketLifeCycleListener) Preconditions.checkNotNull(bucketLifeCycleListener);
    }

    public void setFileLifeCycleListener(FileLifeCycleListener<BucketID> fileLifeCycleListener) {
        this.fileLifeCycleListener = (FileLifeCycleListener) Preconditions.checkNotNull(fileLifeCycleListener);
    }

    public void initializeState(ListState<byte[]> listState) throws Exception {
        Iterator it = ((Iterable) listState.get()).iterator();
        while (it.hasNext()) {
            handleRestoredBucketState((BucketState) SimpleVersionedSerialization.readVersionAndDeSerialize(this.bucketStateSerializer, (byte[]) it.next()));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Bucket<IN, BucketID> onElement(IN in, long j, @Nullable Long l, long j2) throws Exception {
        this.bucketerContext.update(l, j2, j);
        Bucket<IN, BucketID> orCreateBucketForBucketId = getOrCreateBucketForBucketId(this.bucketAssigner.getBucketId(in, this.bucketerContext));
        orCreateBucketForBucketId.write(in, j);
        return orCreateBucketForBucketId;
    }

    public void snapshotState(long j, ListState<byte[]> listState) throws IOException {
        try {
            snapshotActiveBuckets(j, listState);
            LOG.info("Subtask {} checkpoint with id={} finished.", Integer.valueOf(this.subtaskIndex), Long.valueOf(j));
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    public void flush(final long j) throws IOException {
        LOG.info("Subtask {} checkpointing for checkpoint with id={}.", Integer.valueOf(this.subtaskIndex), Long.valueOf(j));
        final CountDownLatch countDownLatch = new CountDownLatch(this.activeBuckets.size());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (final Bucket<IN, BucketID> bucket : this.activeBuckets.values()) {
            boolean z = false;
            if (bucket.hasData()) {
                z = true;
            } else if (this.bucketsWriterInfo.containsKey(bucket.getBucketId())) {
                z = true;
            }
            if (z) {
                if (this.bucketLifeCycleListener != null) {
                    this.bucketLifeCycleListener.bucketCommitting(bucket);
                }
                concurrentHashMap.put(bucket.getBucketId(), false);
                bucket.onReceptionOfCheckpoint(j, new WriterThread.RequestDoneCallback() { // from class: org.apache.flink.odps.sink.cache.Buckets.2
                    @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                    public void requestSuccessful() {
                        concurrentHashMap.put(bucket.getBucketId(), true);
                        countDownLatch.countDown();
                    }

                    @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                    public void requestFailed(IOException iOException) {
                        Buckets.LOG.info("Subtask {} checkpointing for checkpoint with id={} failed.", new Object[]{Integer.valueOf(Buckets.this.subtaskIndex), Long.valueOf(j), iOException});
                        concurrentHashMap.put(bucket.getBucketId(), false);
                        countDownLatch.countDown();
                    }
                });
                LOG.info("Subtask {} checkpoint for {} with bucket id={} snapshot.", new Object[]{Integer.valueOf(this.subtaskIndex), Long.valueOf(j), bucket.getBucketId()});
            } else {
                countDownLatch.countDown();
            }
        }
        while (true) {
            try {
                try {
                    if (this.bucketLifeCycleListener == null || !this.bucketLifeCycleListener.bucketsWait()) {
                        if (countDownLatch.await(500L, TimeUnit.MILLISECONDS)) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    this.bucketsWriterInfo.clear();
                    throw new IOException(e);
                }
            } finally {
                this.bucketsWriterInfo.entrySet().removeIf(entry -> {
                    return concurrentHashMap.containsKey(entry.getKey());
                });
            }
        }
        if (!concurrentHashMap.values().stream().allMatch(bool -> {
            return bool.booleanValue();
        })) {
            throw new IOException("Write error");
        }
    }

    public void addEmptyBuckets(final long j) throws IOException {
        for (Map.Entry<BucketID, WriterInfo> entry : this.bucketsWriterInfo.entrySet()) {
            if (!this.activeBuckets.containsKey(entry.getKey())) {
                createBucketForBucketId(entry.getKey()).bootstrap(entry.getValue());
            }
        }
        final CountDownLatch countDownLatch = new CountDownLatch(this.bucketsWriterInfo.size());
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (final BucketID bucketid : this.bucketsWriterInfo.keySet()) {
            concurrentHashMap.put(bucketid, false);
            this.activeBuckets.get(bucketid).onReceptionOfCheckpoint(j, new WriterThread.RequestDoneCallback() { // from class: org.apache.flink.odps.sink.cache.Buckets.3
                @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                public void requestSuccessful() {
                    concurrentHashMap.put(bucketid, true);
                    countDownLatch.countDown();
                }

                @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                public void requestFailed(IOException iOException) {
                    Buckets.LOG.info("Subtask {} checkpointing for checkpoint with id={} failed.", new Object[]{Integer.valueOf(Buckets.this.subtaskIndex), Long.valueOf(j), iOException});
                    concurrentHashMap.put(bucketid, false);
                    countDownLatch.countDown();
                }
            });
            LOG.info("Subtask {} checkpoint for {} with bucket id={} snapshot.", new Object[]{Integer.valueOf(this.subtaskIndex), Long.valueOf(j), bucketid});
        }
        while (true) {
            try {
                try {
                    if (this.bucketLifeCycleListener == null || !this.bucketLifeCycleListener.bucketsWait()) {
                        if (countDownLatch.await(500L, TimeUnit.MILLISECONDS)) {
                            break;
                        }
                    }
                } catch (Exception e) {
                    throw new IOException(e);
                }
            } finally {
                this.bucketsWriterInfo.clear();
            }
        }
        if (!concurrentHashMap.values().stream().allMatch(bool -> {
            return bool.booleanValue();
        })) {
            throw new IOException("Write empty bucket error");
        }
    }

    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = this.procTimeService.getCurrentProcessingTime();
        Iterator<Bucket<IN, BucketID>> it = this.activeBuckets.values().iterator();
        while (it.hasNext()) {
            it.next().onProcessingTime(j);
        }
        this.procTimeService.registerTimer(currentProcessingTime + this.bucketCheckInterval, this);
    }

    public void bootstrapForBucket(BucketID bucketid, WriterInfo writerInfo) {
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket == null) {
            bucket = createBucketForBucketId(bucketid);
        }
        bucket.bootstrap(writerInfo);
        this.bucketsWriterInfo.put(bucketid, writerInfo);
    }

    public void setCommittedForBucket(BucketID bucketid, long j) throws Exception {
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket != null) {
            bucket.setCommitted(j);
        }
    }

    public boolean isCommittedForCheckpoint(long j) {
        boolean z = true;
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().getValue().isCommitted(j)) {
                z = false;
                break;
            }
        }
        return z;
    }

    public boolean isCommittedForLessThanCheckpoint(long j) {
        boolean z = true;
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!it.next().getValue().isCommittedLessThanCheckpoint(j)) {
                z = false;
                break;
            }
        }
        return z;
    }

    public void disposeCommittingFileForCurrentCheckpoint() throws IOException {
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().disposeCurrentCommittingFile();
        }
    }

    public void disposeCommittingFileForCheckpoint(long j) throws IOException {
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().disposeCommittingFile(j, false);
        }
    }

    public void disposeCommittingFileForLessThanCheckpoint(long j) throws IOException {
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().disposeCommittingFile(j, true);
        }
    }

    public boolean reUploadCommittingFileForBucket(BucketID bucketid, long j, WriterInfo writerInfo, WriterThread.RequestDoneCallback requestDoneCallback) throws Exception {
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket != null) {
            return bucket.reUploadCommittingFile(j, writerInfo, requestDoneCallback);
        }
        LOG.error("Cannot find buckets {} ", bucketid);
        return false;
    }

    public void close() throws Exception {
        if (this.activeBuckets != null) {
            Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().dispose();
            }
            this.activeBuckets.clear();
        }
        ArrayList arrayList = new ArrayList(this.threadNum);
        for (WriterThread<BucketID> writerThread : this.writerThreads) {
            arrayList.add(() -> {
                try {
                    writerThread.shutdown();
                } catch (Throwable th) {
                    throw new IOException("Error while shutting down IO Manager writer thread.", th);
                }
            });
        }
        IOUtils.closeAll(arrayList);
    }

    private void snapshotActiveBuckets(long j, ListState<byte[]> listState) throws Exception {
        listState.clear();
        Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> it = this.activeBuckets.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<BucketID, Bucket<IN, BucketID>> next = it.next();
            next.getValue().snapshot(j);
            if (next.getValue().isActive()) {
                BucketState<IN, BucketID> bucketState = next.getValue().getBucketState();
                listState.add(SimpleVersionedSerialization.writeVersionAndSerialize(this.bucketStateSerializer, bucketState));
                LOG.info("Subtask {} checkpointing: {}", Integer.valueOf(this.subtaskIndex), bucketState);
            } else {
                LOG.info("Inactive bucket {}, checkpoint {}", next.getKey(), Long.valueOf(j));
                it.remove();
            }
        }
    }

    private void handleRestoredBucketState(BucketState<IN, BucketID> bucketState) throws Exception {
        BucketID bucketId = bucketState.getBucketId();
        if (LOG.isInfoEnabled()) {
            LOG.info("Subtask {} restoring: {} bucketId: {}", new Object[]{Integer.valueOf(this.subtaskIndex), bucketState, bucketId});
        }
        updateActiveBucketId(bucketId, Bucket.restore(this.subtaskIndex, bucketState, this.bucketWriter, this.rollingPolicy, this.fileLifeCycleListener));
    }

    private void updateActiveBucketId(BucketID bucketid, Bucket<IN, BucketID> bucket) throws IOException {
        bucket.setWriterThread(this.writerThreads[this.activeBuckets.size() % this.threadNum]);
        if (this.activeBuckets.get(bucketid) != null) {
            throw new UnsupportedOperationException("Need merge bucket");
        }
        this.activeBuckets.put(bucketid, bucket);
    }

    private Bucket<IN, BucketID> getOrCreateBucketForBucketId(BucketID bucketid) throws IOException {
        WriterInfo writerInfo;
        Bucket<IN, BucketID> bucket = this.activeBuckets.get(bucketid);
        if (bucket == null) {
            bucket = createBucketForBucketId(bucketid);
            notifyBucketCreate(bucket);
            if (this.bucketsWriterInfo.containsKey(bucketid) && (writerInfo = this.bucketsWriterInfo.get(bucketid)) != null) {
                bucket.bootstrap(writerInfo);
            }
        }
        if (!this.bucketsWriterInfo.containsKey(bucketid)) {
            this.bucketsWriterInfo.put(bucketid, null);
            notifyBucketBootstrap(bucket);
        }
        return bucket;
    }

    private Bucket<IN, BucketID> createBucketForBucketId(BucketID bucketid) {
        WriterThread<BucketID> writerThread = this.writerThreads[this.activeBuckets.size() % this.threadNum];
        Bucket<IN, BucketID> bucket = Bucket.getNew(this.subtaskIndex, bucketid, this.bucketWriter, this.rollingPolicy, this.fileLifeCycleListener);
        bucket.setWriterThread(writerThread);
        this.activeBuckets.put(bucketid, bucket);
        return bucket;
    }

    private void notifyBucketCreate(Bucket<IN, BucketID> bucket) {
        if (this.bucketLifeCycleListener != null) {
            this.bucketLifeCycleListener.bucketCreated(bucket);
        }
    }

    private void notifyBucketBootstrap(Bucket<IN, BucketID> bucket) {
        if (this.bucketLifeCycleListener != null) {
            this.bucketLifeCycleListener.bucketBootstrap(bucket);
        }
    }
}
