package org.apache.flink.odps.sink.insert;

import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.cache.Bucket;
import org.apache.flink.odps.sink.cache.BucketLifeCycleListener;
import org.apache.flink.odps.sink.cache.Buckets;
import org.apache.flink.odps.sink.cache.PartitionBucketAssigner;
import org.apache.flink.odps.sink.cache.PartitionBucketRollingPolicy;
import org.apache.flink.odps.sink.cache.PartitionBucketWriter;
import org.apache.flink.odps.sink.cache.PartitionBucketsBuilder;
import org.apache.flink.odps.sink.cache.WriterThread;
import org.apache.flink.odps.sink.common.OdpsWriteFunction;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.common.WriteOperationType;
import org.apache.flink.odps.sink.event.SinkTaskEvent;
import org.apache.flink.odps.sink.event.TaskAckEvent;
import org.apache.flink.odps.sink.partition.PartitionAssigner;
import org.apache.flink.odps.sink.partition.PartitionComputer;
import org.apache.flink.odps.sink.utils.MemoryUtils;
import org.apache.flink.odps.sink.utils.NonThrownExecutor;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.sink.writer.PartitionCommitInfo;
import org.apache.flink.odps.sink.writer.PartitionWriterInfo;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/insert/FileCachedInsertFunction.class */
public class FileCachedInsertFunction extends OdpsWriteFunction<RowData> {
    protected transient Buckets<RowData, String> buckets;
    protected transient NonThrownExecutor partitionCreateExecutor;
    protected transient TypeSerializer<RowData> inputSerializer;
    protected transient BufferPool bufferPool;
    protected final boolean isAsyncCommit;
    protected final boolean isGroupPartition;
    protected transient long lastCommittingCheckpointId;
    protected transient CheckpointStorageWorkerView checkpointStorage;
    protected transient Map<Long, StreamStateHandle> stateHandleMap;
    protected transient boolean useMemoryBackend;
    private ListState<byte[]> bucketStates;
    private ListState<Long> lastCommittingCheckpointState;
    private ListState<StreamStateHandle> commitStates;
    private int bufferSize;
    private byte[] stateTmpBuffer;
    private static final Logger LOG = LoggerFactory.getLogger(FileCachedInsertFunction.class);
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor<>("bucket-states", BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> LAST_COMMITTING_CKP_DESC = new ListStateDescriptor<>("last-ckp-counter", LongSerializer.INSTANCE);
    private static final ListStateDescriptor<StreamStateHandle> COMMIT_STATE_DESC = new ListStateDescriptor<>("commit-states", new JavaSerializer());

    public FileCachedInsertFunction(Configuration configuration, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions, PartitionAssigner<RowData> partitionAssigner, WriteOperationType writeOperationType, RowType rowType) {
        super(configuration, odpsConf, tableIdentifier, str, dataSchema, z, odpsWriteOptions, partitionAssigner, writeOperationType, rowType);
        this.bufferSize = 4096;
        this.stateTmpBuffer = new byte[this.bufferSize];
        this.isAsyncCommit = z3;
        this.isGroupPartition = z2;
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        CheckpointStorage checkpointStorage;
        super.initializeState(functionInitializationContext);
        this.checkpointStorage = this.container.getCheckpointStorage();
        try {
            Field declaredField = StreamTask.class.getDeclaredField("checkpointStorage");
            declaredField.setAccessible(true);
            checkpointStorage = (CheckpointStorage) declaredField.get(this.container);
        } catch (Throwable th) {
            checkpointStorage = null;
        }
        this.useMemoryBackend = isUseMemoryBackend(checkpointStorage) || ((checkpointStorage instanceof RocksDBStateBackend) && isUseMemoryBackend((CheckpointStorage) ((RocksDBStateBackend) checkpointStorage).getCheckpointBackend()));
        LOG.info("Initialize FileCachedFunction, context is restore: {}, backend is memory: {}", Boolean.valueOf(functionInitializationContext.isRestored()), Boolean.valueOf(this.useMemoryBackend));
        this.stateHandleMap = new HashMap();
        this.inputSerializer = new RowDataSerializer(this.rowType);
        this.bufferPool = MemoryUtils.getGlobalBufferPool(this.config).createBufferPool(1, Integer.MAX_VALUE);
        this.partitionCreateExecutor = NonThrownExecutor.builder(LOG).threadNum(1).waitForTasksFinish(false).build();
        PartitionBucketsBuilder partitionBucketsBuilder = new PartitionBucketsBuilder(this.config);
        partitionBucketsBuilder.withRollingPolicy(PartitionBucketRollingPolicy.builder().withMaxPartSize((MemorySize) this.config.get(OdpsOptions.SINK_FILE_ROLLING_MAX_SIZE)).build());
        partitionBucketsBuilder.withProcessingTimeService(this.processingTimeService);
        partitionBucketsBuilder.withBucketAssigner(new PartitionBucketAssigner(PartitionComputer.instance(this.dataSchema, this.partition, (String) this.config.get(OdpsOptions.PARTITION_DEFAULT_VALUE))));
        partitionBucketsBuilder.withBucketWriter(new PartitionBucketWriter(this.config, this.odpsConf, this.tableIdentifier, this.dataSchema, this.writeOperationType, this.taskID, this.taskParallel, this.bufferPool, this.inputSerializer));
        partitionBucketsBuilder.withBucketComparator(Comparator.naturalOrder());
        this.lastCommittingCheckpointState = functionInitializationContext.getOperatorStateStore().getUnionListState(LAST_COMMITTING_CKP_DESC);
        this.lastCommittingCheckpointId = -1L;
        if (functionInitializationContext.isRestored()) {
            Iterator it = ((Iterable) this.lastCommittingCheckpointState.get()).iterator();
            while (it.hasNext()) {
                this.lastCommittingCheckpointId = Math.max(this.lastCommittingCheckpointId, ((Long) it.next()).longValue());
            }
            LOG.info("Initialize lastCommittingCheckpointId {}", Long.valueOf(this.lastCommittingCheckpointId));
        }
        this.commitStates = functionInitializationContext.getOperatorStateStore().getListState(COMMIT_STATE_DESC);
        this.buckets = partitionBucketsBuilder.createBuckets(this.taskID);
        this.bucketStates = functionInitializationContext.getOperatorStateStore().getListState(BUCKET_STATE_DESC);
        this.buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertFunction.1
            private Set<String> pendingPartitions = new HashSet();
            private final AtomicInteger requestsNotReturned = new AtomicInteger(0);

            @Override // org.apache.flink.odps.sink.cache.BucketLifeCycleListener
            public void bucketCreated(Bucket<RowData, String> bucket) {
                if (FileCachedInsertFunction.this.isGroupPartition) {
                    this.pendingPartitions.add(bucket.getBucketId());
                    if (this.pendingPartitions.size() >= 64) {
                        String[] strArr = (String[]) this.pendingPartitions.toArray(new String[0]);
                        this.requestsNotReturned.incrementAndGet();
                        FileCachedInsertFunction.this.partitionCreateExecutor.execute(() -> {
                            OdpsUtils.createPartitions(FileCachedInsertFunction.this.getOdps(), FileCachedInsertFunction.this.tableIdentifier, strArr);
                            this.requestsNotReturned.decrementAndGet();
                        }, "initialize partition", new Object[0]);
                        this.pendingPartitions = new HashSet();
                    }
                }
            }

            @Override // org.apache.flink.odps.sink.cache.BucketLifeCycleListener
            public void bucketBootstrap(Bucket<RowData, String> bucket) {
                if (FileCachedInsertFunction.this.isGroupPartition) {
                    FileCachedInsertFunction.this.buckets.bootstrapForBucket(bucket.getBucketId(), null);
                } else {
                    FileCachedInsertFunction.this.sendBootstrapEvent(bucket.getBucketId());
                }
            }

            @Override // org.apache.flink.odps.sink.cache.BucketLifeCycleListener
            public void bucketCommitting(Bucket<RowData, String> bucket) {
                if (!this.pendingPartitions.isEmpty()) {
                    String[] strArr = (String[]) this.pendingPartitions.toArray(new String[0]);
                    this.requestsNotReturned.incrementAndGet();
                    FileCachedInsertFunction.this.partitionCreateExecutor.execute(() -> {
                        OdpsUtils.createPartitions(FileCachedInsertFunction.this.getOdps(), FileCachedInsertFunction.this.tableIdentifier, strArr);
                        this.requestsNotReturned.decrementAndGet();
                    }, "committing partition", new Object[0]);
                    this.pendingPartitions = new HashSet();
                }
                while (this.requestsNotReturned.get() > 0) {
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e) {
                        throw new FlinkOdpsException(e);
                    }
                }
            }

            @Override // org.apache.flink.odps.sink.cache.BucketLifeCycleListener
            public void bucketInactive(Bucket<RowData, String> bucket) {
            }

            @Override // org.apache.flink.odps.sink.cache.BucketLifeCycleListener
            public boolean bucketsWait() throws Exception {
                return FileCachedInsertFunction.this.executor.tryYield();
            }
        });
        this.buckets.setFileLifeCycleListener((str, commitInfo) -> {
            Preconditions.checkArgument(commitInfo instanceof PartitionCommitInfo, "The write function can only handle PartitionCommitInfo");
            PartitionCommitInfo partitionCommitInfo = (PartitionCommitInfo) commitInfo;
            ArrayList arrayList = new ArrayList();
            WriterStatus writerStatus = new WriterStatus();
            writerStatus.setSessionId(partitionCommitInfo.getSessionId());
            writerStatus.setPartitionSpec(partitionCommitInfo.getPartitionSpec());
            writerStatus.setTotalRecords(partitionCommitInfo.getTotalRecords());
            writerStatus.setWriterMessage(partitionCommitInfo.getWriterMessage());
            arrayList.add(writerStatus);
            sendCommitEvent(arrayList, partitionCommitInfo.getCheckpointId(), true);
        });
        if (functionInitializationContext.isRestored()) {
            for (StreamStateHandle streamStateHandle : (Iterable) this.commitStates.get()) {
                try {
                    FSDataInputStream openInputStream = streamStateHandle.openInputStream();
                    Throwable th2 = null;
                    try {
                        try {
                            this.stateHandleMap.put(Long.valueOf(this.lastCommittingCheckpointId), streamStateHandle);
                            if (openInputStream != null) {
                                if (0 != 0) {
                                    try {
                                        openInputStream.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    openInputStream.close();
                                }
                            }
                        } catch (Throwable th4) {
                            th2 = th4;
                            throw th4;
                            break;
                        }
                    } catch (Throwable th5) {
                        if (openInputStream != null) {
                            if (th2 != null) {
                                try {
                                    openInputStream.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                openInputStream.close();
                            }
                        }
                        throw th5;
                        break;
                    }
                } catch (Exception e) {
                    LOG.error("Could not find checkpoint state file, ignore bucket states", e);
                }
            }
            if (this.stateHandleMap.containsKey(Long.valueOf(this.lastCommittingCheckpointId))) {
                this.buckets.initializeState(this.bucketStates);
            }
        }
    }

    public void close() throws Exception {
        LOG.info("Close fileCachedFunction");
        super.close();
        if (this.buckets != null) {
            this.buckets.close();
        }
        if (this.bufferPool != null) {
            this.bufferPool.lazyDestroy();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x002c, code lost:
    
        org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG.error("Snapshot state error: ", r6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:11:0x0040, code lost:
    
        throw new java.io.IOException(r6);
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x0041, code lost:
    
        r5.buckets.disposeCommittingFileForCheckpoint(Long.MAX_VALUE);
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x004b, code lost:
    
        org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG.info("All partition for checkpoint {} has committed ", Long.MAX_VALUE);
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x005c, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:2:0x000f, code lost:
    
        if (r5.isAsyncCommit != false) goto L4;
     */
    /* JADX WARN: Code restructure failed: missing block: B:4:0x001c, code lost:
    
        if (r5.buckets.isCommittedForCheckpoint(Long.MAX_VALUE) != false) goto L15;
     */
    /* JADX WARN: Code restructure failed: missing block: B:6:0x001f, code lost:
    
        r5.executor.yield();
     */
    /* JADX WARN: Code restructure failed: missing block: B:9:0x002b, code lost:
    
        r6 = move-exception;
     */
    @Override // org.apache.flink.odps.sink.common.AbstractWriteFunction
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void endInput() throws java.lang.Exception {
        /*
            r5 = this;
            r0 = r5
            super.endInput()
            r0 = r5
            r1 = 9223372036854775807(0x7fffffffffffffff, double:NaN)
            r0.snapshotState(r1)
            r0 = r5
            boolean r0 = r0.isAsyncCommit
            if (r0 == 0) goto L4b
        L12:
            r0 = r5
            org.apache.flink.odps.sink.cache.Buckets<org.apache.flink.table.data.RowData, java.lang.String> r0 = r0.buckets
            r1 = 9223372036854775807(0x7fffffffffffffff, double:NaN)
            boolean r0 = r0.isCommittedForCheckpoint(r1)
            if (r0 != 0) goto L41
            r0 = r5
            org.apache.flink.streaming.api.operators.MailboxExecutor r0 = r0.executor     // Catch: java.lang.InterruptedException -> L2b
            r0.yield()     // Catch: java.lang.InterruptedException -> L2b
            goto L12
        L2b:
            r6 = move-exception
            org.slf4j.Logger r0 = org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG
            java.lang.String r1 = "Snapshot state error: "
            r2 = r6
            r0.error(r1, r2)
            java.io.IOException r0 = new java.io.IOException
            r1 = r0
            r2 = r6
            r1.<init>(r2)
            throw r0
        L41:
            r0 = r5
            org.apache.flink.odps.sink.cache.Buckets<org.apache.flink.table.data.RowData, java.lang.String> r0 = r0.buckets
            r1 = 9223372036854775807(0x7fffffffffffffff, double:NaN)
            r0.disposeCommittingFileForCheckpoint(r1)
        L4b:
            org.slf4j.Logger r0 = org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG
            java.lang.String r1 = "All partition for checkpoint {} has committed "
            r2 = 9223372036854775807(0x7fffffffffffffff, double:NaN)
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r0.info(r1, r2)
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.odps.sink.insert.FileCachedInsertFunction.endInput():void");
    }

    public void processElement(RowData rowData, ProcessFunction<RowData, Object>.Context context, Collector<Object> collector) throws Exception {
        this.buckets.onElement(rowData, this.processingTimeService.getCurrentProcessingTime(), null, Long.MAX_VALUE);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteFunction
    protected void snapshotState(long j) throws IOException {
        StreamStateHandle writeStateFile;
        this.buckets.flush(j);
        try {
            if (!this.isAsyncCommit) {
                sendCheckpointSuccessEvent(j);
                while (this.lastCommittingCheckpointId != j) {
                    try {
                        this.executor.yield();
                    } catch (InterruptedException e) {
                        LOG.error("Snapshot state error: ", e);
                        throw new IOException(e);
                    }
                }
                this.buckets.addEmptyBuckets(j);
                while (!this.buckets.isCommittedForCheckpoint(j)) {
                    try {
                        this.executor.yield();
                    } catch (InterruptedException e2) {
                        LOG.error("Snapshot state error: ", e2);
                        throw new IOException(e2);
                    }
                }
                this.buckets.disposeCommittingFileForCurrentCheckpoint();
                LOG.info("All partition for checkpoint {} has committed ", Long.valueOf(j));
                if (this.isAsyncCommit && !this.useMemoryBackend && (writeStateFile = writeStateFile()) != null) {
                    this.commitStates.clear();
                    this.commitStates.add(writeStateFile);
                    this.stateHandleMap.put(Long.valueOf(j), writeStateFile);
                }
                this.lastCommittingCheckpointState.clear();
                this.lastCommittingCheckpointState.add(Long.valueOf(this.lastCommittingCheckpointId));
                return;
            }
            if (this.isAsyncCommit) {
                this.commitStates.clear();
                this.commitStates.add(writeStateFile);
                this.stateHandleMap.put(Long.valueOf(j), writeStateFile);
            }
            this.lastCommittingCheckpointState.clear();
            this.lastCommittingCheckpointState.add(Long.valueOf(this.lastCommittingCheckpointId));
            return;
        } catch (Exception e3) {
            throw new IOException(e3);
        }
        while (!this.buckets.isCommittedForLessThanCheckpoint(j)) {
            try {
                this.executor.yield();
            } catch (InterruptedException e4) {
                LOG.error("Snapshot state error: ", e4);
                throw new IOException(e4);
            }
        }
        this.buckets.disposeCommittingFileForLessThanCheckpoint(j);
        sendCheckpointSuccessEvent(j);
        while (this.lastCommittingCheckpointId != j) {
            try {
                this.executor.yield();
            } catch (InterruptedException e5) {
                LOG.error("Snapshot state error: ", e5);
                throw new IOException(e5);
            }
        }
        this.buckets.addEmptyBuckets(j);
        this.buckets.snapshotState(j, this.bucketStates);
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteFunction
    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        Preconditions.checkArgument(operatorEvent instanceof TaskAckEvent, "The write function can only handle TaskAckEvent");
        final TaskAckEvent taskAckEvent = (TaskAckEvent) operatorEvent;
        LOG.info("Handle operator event for task[{}], {}", Integer.valueOf(this.taskID), taskAckEvent);
        if (taskAckEvent.isBootstrap()) {
            String sessionId = taskAckEvent.getSessionId();
            String partition = taskAckEvent.getPartition();
            if (StringUtils.isNullOrWhitespaceOnly(sessionId)) {
                LOG.error("Create session failed! " + partition);
                return;
            }
            PartitionWriterInfo partitionWriterInfo = new PartitionWriterInfo(taskAckEvent.getCheckpointId(), taskAckEvent.getSessionId(), taskAckEvent.getPartition());
            if (taskAckEvent.getCheckpointId() == this.lastCommittingCheckpointId) {
                this.buckets.bootstrapForBucket(taskAckEvent.getPartition(), partitionWriterInfo);
                return;
            } else {
                LOG.error("Ignore bootstrap event for task[{}], {}", Integer.valueOf(this.taskID), taskAckEvent);
                return;
            }
        }
        if (taskAckEvent.isCheckpoint()) {
            this.lastCommittingCheckpointId = taskAckEvent.getCheckpointId();
            return;
        }
        if (taskAckEvent.isEndInput()) {
            return;
        }
        if (!taskAckEvent.isCommitted()) {
            boolean z = false;
            try {
                PartitionWriterInfo partitionWriterInfo2 = null;
                if (!this.isGroupPartition) {
                    Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(taskAckEvent.getSessionId()), "Write session id is null");
                    partitionWriterInfo2 = new PartitionWriterInfo(taskAckEvent.getCheckpointId(), taskAckEvent.getSessionId(), taskAckEvent.getPartition());
                }
                z = this.buckets.reUploadCommittingFileForBucket(taskAckEvent.getPartition(), taskAckEvent.getCheckpointId(), partitionWriterInfo2, new WriterThread.RequestDoneCallback() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertFunction.2
                    @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                    public void requestSuccessful() {
                        FileCachedInsertFunction.LOG.error("Reupload committing file success");
                    }

                    @Override // org.apache.flink.odps.sink.cache.WriterThread.RequestDoneCallback
                    public void requestFailed(IOException iOException) {
                        FileCachedInsertFunction.LOG.error("Reupload committing file failed", iOException);
                        FileCachedInsertFunction.this.sendCommitEvent(new ArrayList(), taskAckEvent.getCheckpointId(), false);
                    }
                });
            } catch (Exception e) {
                LOG.error("Reupload committing file failed", e);
            }
            if (z) {
                return;
            }
            sendCommitEvent(new ArrayList(), taskAckEvent.getCheckpointId(), false);
            return;
        }
        try {
            long checkpointId = taskAckEvent.getCheckpointId();
            this.buckets.setCommittedForBucket(taskAckEvent.getPartition(), checkpointId);
            if (this.isAsyncCommit && this.stateHandleMap.containsKey(Long.valueOf(checkpointId)) && this.buckets.isCommittedForCheckpoint(checkpointId)) {
                Iterator<Map.Entry<Long, StreamStateHandle>> it = this.stateHandleMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Long, StreamStateHandle> next = it.next();
                    if (next.getKey().longValue() <= checkpointId) {
                        next.getValue().discardState();
                        it.remove();
                    }
                }
                LOG.info("Discard state for checkpoint {}", Long.valueOf(checkpointId));
            }
        } catch (Exception e2) {
            LOG.error("Dispose Committing file failed", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendBootstrapEvent(String str) {
        LOG.info("Send bootstrap event to coordinator, task[{}], partition[{}], checkpoint[{}]", new Object[]{Integer.valueOf(this.taskID), str, Long.valueOf(this.lastCommittingCheckpointId)});
        this.eventGateway.sendEventToCoordinator(SinkTaskEvent.builder().taskID(this.taskID).checkpointID(this.lastCommittingCheckpointId).bootstrap(true).endInput(false).requiredPartition(str).writeStatus(new ArrayList()).build());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendCommitEvent(List<WriterStatus> list, long j, boolean z) {
        LOG.info("Send commit event to coordinator, task[{}], checkpoint[{}].", Integer.valueOf(this.taskID), Long.valueOf(j));
        this.eventGateway.sendEventToCoordinator(SinkTaskEvent.builder().taskID(this.taskID).checkpointID(j).bootstrap(false).endInput(false).writeSuccess(z).writeStatus(list).build());
    }

    private void sendCheckpointSuccessEvent(long j) {
        LOG.info("Send checkpoint success event to coordinator, task[{}], checkpoint[{}].", Integer.valueOf(this.taskID), Long.valueOf(j));
        this.eventGateway.sendEventToCoordinator(SinkTaskEvent.builder().taskID(this.taskID).checkpointID(j).bootstrap(false).endInput(false).writeSuccess(false).checkpointSuccess(true).writeStatus(new ArrayList()).build());
    }

    /* JADX WARN: Code restructure failed: missing block: B:13:0x0059, code lost:
    
        org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG.error("Write state file failed, please increase sink.file-cached.file-threshold, value {}", java.lang.Long.valueOf(r0));
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.flink.runtime.state.StreamStateHandle writeStateFile() throws java.lang.Exception {
        /*
            r5 = this;
            r0 = 0
            r6 = r0
        L2:
            r0 = r5
            org.apache.flink.runtime.state.CheckpointStorageWorkerView r0 = r0.checkpointStorage
            org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream r0 = r0.createTaskOwnedStateStream()
            r7 = r0
            r0 = r7
            r1 = r5
            byte[] r1 = r1.stateTmpBuffer
            r0.write(r1)
            r0 = r7
            org.apache.flink.runtime.state.StreamStateHandle r0 = r0.closeAndGetHandle()
            r6 = r0
            r0 = r6
            boolean r0 = r0 instanceof org.apache.flink.runtime.state.filesystem.FileStateHandle
            if (r0 == 0) goto L23
            goto L78
        L23:
            r0 = r6
            if (r0 == 0) goto L2d
            r0 = r6
            r0.discardState()
        L2d:
            r0 = r5
            r1 = r5
            int r1 = r1.bufferSize
            r2 = 2
            int r1 = r1 * r2
            r0.bufferSize = r1
            r0 = r5
            org.apache.flink.configuration.Configuration r0 = r0.config
            org.apache.flink.configuration.ConfigOption<org.apache.flink.configuration.MemorySize> r1 = org.apache.flink.odps.table.OdpsOptions.STATE_FILE_THRESHOLD
            java.lang.Object r0 = r0.get(r1)
            org.apache.flink.configuration.MemorySize r0 = (org.apache.flink.configuration.MemorySize) r0
            long r0 = r0.getBytes()
            r8 = r0
            r0 = r5
            int r0 = r0.bufferSize
            if (r0 <= 0) goto L59
            r0 = r5
            int r0 = r0.bufferSize
            long r0 = (long) r0
            r1 = r8
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L6b
        L59:
            org.slf4j.Logger r0 = org.apache.flink.odps.sink.insert.FileCachedInsertFunction.LOG
            java.lang.String r1 = "Write state file failed, please increase sink.file-cached.file-threshold, value {}"
            r2 = r8
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r0.error(r1, r2)
            goto L78
        L6b:
            r0 = r5
            r1 = r5
            int r1 = r1.bufferSize
            byte[] r1 = new byte[r1]
            r0.stateTmpBuffer = r1
            goto L2
        L78:
            r0 = r6
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.odps.sink.insert.FileCachedInsertFunction.writeStateFile():org.apache.flink.runtime.state.StreamStateHandle");
    }

    private boolean isUseMemoryBackend(CheckpointStorage checkpointStorage) {
        return (checkpointStorage instanceof MemoryStateBackend) || (checkpointStorage instanceof JobManagerCheckpointStorage);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (ProcessFunction<RowData, Object>.Context) context, (Collector<Object>) collector);
    }
}
