package org.apache.flink.odps.sink.insert;

import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.write.TableBatchWriteSession;
import com.aliyun.odps.table.write.WriterCommitMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.common.OdpsWriteCoordinator;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.event.SinkTaskEvent;
import org.apache.flink.odps.sink.table.TableUtils;
import org.apache.flink.odps.sink.utils.ActiveSessionInfo;
import org.apache.flink.odps.sink.utils.AsyncExecuteThread;
import org.apache.flink.odps.sink.utils.NonThrownExecutor;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/insert/FileCachedInsertCoordinator.class */
public class FileCachedInsertCoordinator extends OdpsWriteCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(FileCachedInsertCoordinator.class);
    protected boolean isGroupPartition;
    protected boolean isAsyncCommit;
    protected int executeThreadNum;
    protected transient Map<String, AsyncExecuteThread> executeThreadMap;
    protected transient AsyncExecuteThread[] executeThreads;
    protected transient ScheduledExecutorService bootstrapCheckExecutor;
    protected transient ScheduledExecutorService partitionCheckExecutor;
    protected transient Map<String, List<AsyncExecuteThread.BootstrapRequest>> pendingBootstrapRequest;
    protected transient Map<String, Map<String, ActiveSessionInfo>> currentActiveSessions;
    protected transient Map<Long, Boolean[]> taskStatus;
    protected transient Map<Long, Set<Integer>> taskCheckpointStatus;
    protected long latestCheckpoint;
    private final AtomicLong totalCommit;
    private final AtomicLong totalCommitTimes;
    private final AtomicLong totalRowCount;
    private transient Map<Long, Long> checkpointTimes;
    private transient Map<Long, AtomicLong> commitRowCount;

    public FileCachedInsertCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str2, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions) {
        super(str, configuration, context, odpsConf, tableIdentifier, str2, dataSchema, z, odpsWriteOptions);
        this.totalCommit = new AtomicLong(0L);
        this.totalCommitTimes = new AtomicLong(0L);
        this.totalRowCount = new AtomicLong(0L);
        this.isGroupPartition = z2;
        this.isAsyncCommit = z3;
        this.executeThreadNum = configuration.getInteger(OdpsOptions.INSERT_COMMIT_THREAD_NUM);
        this.latestCheckpoint = -1L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void initEnv() {
        this.settings = TableUtils.getEnvironmentSettings(getOdps(), this.odpsConf.getTunnelEndpoint(), this.conf);
        this.executeThreads = new AsyncExecuteThread[this.executeThreadNum];
        this.executeThreadMap = new HashMap();
        for (int i = 0; i < this.executeThreadNum; i++) {
            AsyncExecuteThread asyncExecuteThread = new AsyncExecuteThread();
            this.executeThreads[i] = asyncExecuteThread;
            asyncExecuteThread.setName("OdpsBucket commit thread #" + (i + 1));
            asyncExecuteThread.setDaemon(true);
            asyncExecuteThread.setUncaughtExceptionHandler((thread, th) -> {
                LOG.error("IO Thread '" + thread.getName() + "' terminated due to an exception. Shutting down I/O Manager.", th);
            });
            asyncExecuteThread.start();
        }
        this.currentActiveSessions = new HashMap();
        this.pendingBootstrapRequest = new HashMap();
        if (!this.isGroupPartition) {
            this.bootstrapCheckExecutor = Executors.newSingleThreadScheduledExecutor();
            this.bootstrapCheckExecutor.scheduleAtFixedRate(() -> {
                this.executor.execute(() -> {
                    handleBootstrapCheck(1);
                }, "handle partition create event", new Object[0]);
            }, 5000L, 15000L, TimeUnit.MILLISECONDS);
            if (this.isPartitioned) {
                this.partitionCheckExecutor = Executors.newSingleThreadScheduledExecutor();
                this.partitionCheckExecutor.scheduleAtFixedRate(() -> {
                    this.executor.execute(this::handlePartitionCheck, "handle partition check event", new Object[0]);
                }, 0L, this.metaCacheExpireTime - 30, TimeUnit.SECONDS);
            }
        }
        this.taskStatus = new HashMap();
        this.taskCheckpointStatus = new HashMap();
        this.checkpointTimes = new HashMap();
        this.commitRowCount = new ConcurrentHashMap();
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator, org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleBootstrapEvent(final SinkTaskEvent sinkTaskEvent) {
        if (this.isGroupPartition) {
            throw new UnsupportedOperationException("Unsupported bootstrap event for non group partition");
        }
        final String partitionSpec = sinkTaskEvent.getPartitionSpec();
        AsyncExecuteThread.BootstrapRequest bootstrapRequest = new AsyncExecuteThread.BootstrapRequest() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator.1
            private String sessionId = null;
            private boolean isCreatePartition = false;

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void execute() throws IOException {
                this.sessionId = FileCachedInsertCoordinator.this.initDynamicWriteSession(partitionSpec);
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public void requestDone(IOException iOException, String str) {
                if (iOException != null) {
                    if (iOException.getMessage().contains("Request has been cancelled")) {
                        return;
                    } else {
                        FileCachedInsertCoordinator.this.context.failJob(new FlinkOdpsException("Create session error", iOException));
                    }
                }
                if (!FileCachedInsertCoordinator.this.allSubtaskReady) {
                    FileCachedInsertCoordinator.LOG.info("Waiting for subtask ready");
                    synchronized (FileCachedInsertCoordinator.this.lock) {
                        while (!FileCachedInsertCoordinator.this.allSubtaskReady) {
                            try {
                                FileCachedInsertCoordinator.this.lock.wait();
                            } catch (InterruptedException e) {
                                throw new FlinkOdpsException("Waiting for subtask ready error", iOException);
                            }
                        }
                        Thread.sleep(1000L);
                    }
                }
                NonThrownExecutor nonThrownExecutor = FileCachedInsertCoordinator.this.executor;
                SinkTaskEvent sinkTaskEvent2 = sinkTaskEvent;
                String str2 = partitionSpec;
                nonThrownExecutor.execute(() -> {
                    FileCachedInsertCoordinator.this.sendAllTaskAckEvents(sinkTaskEvent2.getCheckpointID().longValue(), str2, str, false, false, true, false);
                }, "Create session response", new Object[0]);
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public boolean isReUpload() {
                return false;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public long getCheckpointId() {
                return sinkTaskEvent.getCheckpointID().longValue();
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getPartition() {
                return partitionSpec;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getSessionId() {
                return this.sessionId;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public boolean isCreatePartition() {
                return this.isCreatePartition;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public void createPartition(boolean z) {
                this.isCreatePartition = z;
            }
        };
        boolean z = false;
        if (this.isPartitioned && !OdpsUtils.partitionExist(getTableMetaProvider(), this.tableIdentifier, partitionSpec, false)) {
            bootstrapRequest.createPartition(true);
            z = true;
        }
        if (sinkTaskEvent.getCheckpointID().longValue() > this.latestCheckpoint) {
            z = true;
        }
        addBootstrapRequest(partitionSpec, bootstrapRequest, z);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator, org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleCommitEvent(final SinkTaskEvent sinkTaskEvent) {
        if (!sinkTaskEvent.isWriteSuccess()) {
            LOG.error("Write task failed! Event: {}", sinkTaskEvent);
            throw new FlinkOdpsException("Write task failed!");
        }
        for (WriterStatus writerStatus : sinkTaskEvent.getWriterStatuses()) {
            final String partitionSpec = writerStatus.getPartitionSpec();
            final String sessionId = writerStatus.getSessionId();
            final long longValue = sinkTaskEvent.getCheckpointID().longValue();
            final ActiveSessionInfo addPendingCommitSession = addPendingCommitSession(partitionSpec, sessionId, sinkTaskEvent.getTaskID().intValue(), writerStatus.getWriterMessage(), writerStatus.getTotalRecords());
            boolean z = false;
            if (this.isGroupPartition) {
                z = true;
            } else {
                Set<Integer> taskIds = addPendingCommitSession.getTaskIds();
                if (taskIds.size() == this.parallelism) {
                    this.currentActiveSessions.get(partitionSpec).remove(sessionId);
                    z = true;
                } else {
                    LOG.debug("Wait commit: task Ids {}, session {}", taskIds, sessionId);
                }
            }
            if (z) {
                addExecuteRequest(partitionSpec, new AsyncExecuteThread.CommitRequest() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator.2
                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public void execute() throws IOException {
                        if (FileCachedInsertCoordinator.this.commitRowCount.containsKey(Long.valueOf(longValue))) {
                            ((AtomicLong) FileCachedInsertCoordinator.this.commitRowCount.get(Long.valueOf(longValue))).addAndGet(addPendingCommitSession.getTotalRowCount().longValue());
                        }
                        FileCachedInsertCoordinator.this.commitWriteSession(addPendingCommitSession.getCommitMessageList(), partitionSpec, sessionId, addPendingCommitSession.getTotalRowCount().longValue());
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
                    public String getSessionId() {
                        return sessionId;
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public void requestDone(IOException iOException) {
                        if (iOException == null) {
                            if (FileCachedInsertCoordinator.this.isGroupPartition) {
                                NonThrownExecutor nonThrownExecutor = FileCachedInsertCoordinator.this.executor;
                                SinkTaskEvent sinkTaskEvent2 = sinkTaskEvent;
                                String str = partitionSpec;
                                String str2 = sessionId;
                                nonThrownExecutor.execute(() -> {
                                    FileCachedInsertCoordinator.this.sendSingleTaskAckEvents(sinkTaskEvent2.getCheckpointID().longValue(), sinkTaskEvent2.getTaskID().intValue(), str, str2, true);
                                }, "Commit session response", new Object[0]);
                                return;
                            }
                            NonThrownExecutor nonThrownExecutor2 = FileCachedInsertCoordinator.this.executor;
                            SinkTaskEvent sinkTaskEvent3 = sinkTaskEvent;
                            String str3 = partitionSpec;
                            String str4 = sessionId;
                            nonThrownExecutor2.execute(() -> {
                                FileCachedInsertCoordinator.this.sendAllTaskAckEvents(sinkTaskEvent3.getCheckpointID().longValue(), str3, str4, true, false, false, false);
                            }, "Commit session response", new Object[0]);
                            return;
                        }
                        if (iOException.getMessage().contains("Request has been cancelled")) {
                            return;
                        }
                        FileCachedInsertCoordinator.LOG.error("Commit session error", iOException);
                        if (!FileCachedInsertCoordinator.this.isGroupPartition) {
                            NonThrownExecutor nonThrownExecutor3 = FileCachedInsertCoordinator.this.executor;
                            String str5 = partitionSpec;
                            SinkTaskEvent sinkTaskEvent4 = sinkTaskEvent;
                            nonThrownExecutor3.execute(() -> {
                                FileCachedInsertCoordinator.this.handleReUploadBootStrap(str5, sinkTaskEvent4.getCheckpointID().longValue());
                            }, "Re upload session boot strap", new Object[0]);
                            return;
                        }
                        NonThrownExecutor nonThrownExecutor4 = FileCachedInsertCoordinator.this.executor;
                        SinkTaskEvent sinkTaskEvent5 = sinkTaskEvent;
                        String str6 = partitionSpec;
                        String str7 = sessionId;
                        nonThrownExecutor4.execute(() -> {
                            FileCachedInsertCoordinator.this.sendSingleTaskAckEvents(sinkTaskEvent5.getCheckpointID().longValue(), sinkTaskEvent5.getTaskID().intValue(), str6, str7, false);
                        }, "Commit session response", new Object[0]);
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public long getCheckpointId() {
                        return longValue;
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
                    public String getPartition() {
                        return partitionSpec;
                    }
                });
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleCheckpointSuccessEvent(final SinkTaskEvent sinkTaskEvent) throws IOException {
        if (sinkTaskEvent.isCheckpointSuccess()) {
            Set<Integer> computeIfAbsent = this.taskCheckpointStatus.computeIfAbsent(sinkTaskEvent.getCheckpointID(), l -> {
                return new HashSet();
            });
            computeIfAbsent.add(sinkTaskEvent.getTaskID());
            if (computeIfAbsent.size() != this.parallelism) {
                LOG.info("Wait checkpoint complete event : task Ids {}, checkpoint {}", computeIfAbsent, sinkTaskEvent.getCheckpointID());
                return;
            }
            for (AsyncExecuteThread asyncExecuteThread : this.executeThreads) {
                asyncExecuteThread.addRequest(new AsyncExecuteThread.SnapshotRequest() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator.3
                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SnapshotRequest
                    public boolean isStarted() {
                        return false;
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public void execute() throws IOException {
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public void requestDone(IOException iOException) {
                    }

                    @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                    public long getCheckpointId() {
                        return sinkTaskEvent.getCheckpointID().longValue();
                    }
                });
            }
            this.latestCheckpoint = sinkTaskEvent.getCheckpointID().longValue();
            this.executor.execute(() -> {
                sendAllTaskAckEvents(sinkTaskEvent.getCheckpointID().longValue(), null, null, false, false, false, true);
            }, "Checkpoint response", new Object[0]);
            LOG.info("Set latestCheckpoint {}", sinkTaskEvent.getCheckpointID());
        }
    }

    protected void handleReUploadBootStrap(final String str, final long j) {
        addBootstrapRequest(str, new AsyncExecuteThread.BootstrapRequest() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator.4
            private String sessionId = null;
            private boolean isCreatePartition = false;

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void execute() throws IOException {
                this.sessionId = FileCachedInsertCoordinator.this.initDynamicWriteSession(str);
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public void requestDone(IOException iOException, String str2) {
                if (iOException != null) {
                    if (iOException.getMessage().contains("Request has been cancelled")) {
                        return;
                    } else {
                        FileCachedInsertCoordinator.this.context.failJob(new FlinkOdpsException("ReUpload create session error", iOException));
                    }
                }
                NonThrownExecutor nonThrownExecutor = FileCachedInsertCoordinator.this.executor;
                long j2 = j;
                String str3 = str;
                nonThrownExecutor.execute(() -> {
                    FileCachedInsertCoordinator.this.sendAllTaskAckEvents(j2, str3, str2, false, false, false, false);
                }, "Create session response", new Object[0]);
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public boolean isReUpload() {
                return true;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public long getCheckpointId() {
                return j;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getPartition() {
                return str;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getSessionId() {
                return this.sessionId;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public boolean isCreatePartition() {
                return this.isCreatePartition;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.BootstrapRequest
            public void createPartition(boolean z) {
                this.isCreatePartition = z;
            }
        }, this.isPartitioned && !OdpsUtils.partitionExist(getTableMetaProvider(), this.tableIdentifier, str, true));
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void notifyCheckpointComplete(long j) {
        super.notifyCheckpointComplete(j);
        this.taskStatus.remove(Long.valueOf(j));
        this.taskCheckpointStatus.remove(Long.valueOf(j));
        if (this.checkpointTimes.containsKey(Long.valueOf(j))) {
            this.totalCommit.incrementAndGet();
            this.totalCommitTimes.addAndGet(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.checkpointTimes.get(Long.valueOf(j)).longValue()));
            this.totalRowCount.addAndGet(this.commitRowCount.get(Long.valueOf(j)).get());
            LOG.info("Commit times: {}, time taken ms avg: {}, total row count: {}", new Object[]{Long.valueOf(this.totalCommit.get()), Double.valueOf(this.totalCommitTimes.get() / this.totalCommit.get()), Long.valueOf(this.totalRowCount.get())});
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void subtaskReset(int i, long j) {
        super.subtaskReset(i, j);
        Boolean[] computeIfAbsent = this.taskStatus.computeIfAbsent(Long.valueOf(j), l -> {
            return new Boolean[this.parallelism];
        });
        computeIfAbsent[i] = true;
        if (Arrays.stream(computeIfAbsent).allMatch(bool -> {
            return bool != null && bool.equals(true);
        })) {
            this.taskStatus.remove(Long.valueOf(j));
            clearState(j);
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void resetToCheckpoint(long j, byte[] bArr) {
        super.resetToCheckpoint(j, bArr);
        clearState(j);
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        this.checkpointTimes.put(Long.valueOf(j), Long.valueOf(System.nanoTime()));
        this.commitRowCount.put(Long.valueOf(j), new AtomicLong(0L));
        if (this.isAsyncCommit) {
            this.executor.execute(() -> {
                try {
                    ArrayList arrayList = new ArrayList();
                    for (AsyncExecuteThread asyncExecuteThread : this.executeThreads) {
                        arrayList.add(CompletableFuture.supplyAsync(() -> {
                            final CountDownLatch countDownLatch = new CountDownLatch(1);
                            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                            asyncExecuteThread.addRequest(new AsyncExecuteThread.SnapshotRequest() { // from class: org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator.5
                                @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SnapshotRequest
                                public boolean isStarted() {
                                    return true;
                                }

                                @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                                public void execute() throws IOException {
                                }

                                @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                                public void requestDone(IOException iOException) {
                                    atomicBoolean.set(iOException == null);
                                    countDownLatch.countDown();
                                }

                                @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
                                public long getCheckpointId() {
                                    return j;
                                }
                            });
                            try {
                                countDownLatch.await();
                            } catch (InterruptedException e) {
                                LOG.error("Snapshot interrupted exception", e);
                            }
                            return Boolean.valueOf(atomicBoolean.get());
                        }));
                    }
                    CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r15, th) -> {
                        if (arrayList.stream().anyMatch(completableFuture2 -> {
                            return !((Boolean) completableFuture2.getNow(false)).booleanValue();
                        })) {
                            LOG.info("Trigger checkpoint {} failed", Long.valueOf(j));
                            completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Session %s for source %s", Long.valueOf(j), getClass().getSimpleName()), th));
                        } else {
                            LOG.info("Trigger checkpoint {} success", Long.valueOf(j));
                            completableFuture.complete(new byte[0]);
                        }
                    });
                } catch (Throwable th2) {
                    ExceptionUtils.rethrowIfFatalErrorOrOOM(th2);
                    completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Session %s for source %s", Long.valueOf(j), getClass().getSimpleName()), th2));
                }
            }, "taking checkpoint %d", Long.valueOf(j));
        } else {
            this.executor.execute(() -> {
                try {
                    completableFuture.complete(new byte[0]);
                } catch (Throwable th) {
                    ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                    completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Session %s for source %s", Long.valueOf(j), getClass().getSimpleName()), th));
                }
            }, "taking checkpoint %d", Long.valueOf(j));
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void close() throws Exception {
        ArrayList arrayList = new ArrayList(this.executeThreadNum);
        for (AsyncExecuteThread asyncExecuteThread : this.executeThreads) {
            arrayList.add(() -> {
                try {
                    asyncExecuteThread.shutdown();
                } catch (Throwable th) {
                    throw new IOException("Error while shutting down IO Manager writer thread.", th);
                }
            });
        }
        IOUtils.closeAll(arrayList);
        if (this.partitionCheckExecutor != null) {
            this.partitionCheckExecutor.shutdown();
        }
        if (this.bootstrapCheckExecutor != null) {
            this.bootstrapCheckExecutor.shutdown();
        }
        super.close();
    }

    protected void handleBootstrapCheck(int i) {
        ArrayList arrayList = new ArrayList();
        this.pendingBootstrapRequest.forEach((str, list) -> {
            if (list.stream().anyMatch((v0) -> {
                return v0.isCreatePartition();
            })) {
                arrayList.add(str);
            }
        });
        if (arrayList.size() >= i) {
            OdpsUtils.createPartitions(getOdps(), this.tableIdentifier, (String[]) arrayList.toArray(new String[0]));
            arrayList.forEach(str2 -> {
                this.pendingBootstrapRequest.get(str2).forEach(bootstrapRequest -> {
                    bootstrapRequest.createPartition(false);
                });
            });
        }
        Iterator<Map.Entry<String, List<AsyncExecuteThread.BootstrapRequest>>> it = this.pendingBootstrapRequest.entrySet().iterator();
        while (it.hasNext()) {
            List<AsyncExecuteThread.BootstrapRequest> value = it.next().getValue();
            Iterator<AsyncExecuteThread.BootstrapRequest> it2 = value.iterator();
            while (it2.hasNext()) {
                AsyncExecuteThread.BootstrapRequest next = it2.next();
                if (next.getCheckpointId() <= this.latestCheckpoint && !next.isCreatePartition()) {
                    addExecuteRequest(next.getPartition(), next);
                    it2.remove();
                }
            }
            if (value.isEmpty()) {
                it.remove();
            }
        }
    }

    protected void handlePartitionCheck() {
        getTableMetaProvider().loadPartitions(this.tableIdentifier.getProject(), this.tableIdentifier.getSchema(), this.tableIdentifier.getTable());
    }

    protected void addBootstrapRequest(String str, AsyncExecuteThread.BootstrapRequest bootstrapRequest, boolean z) {
        if (!z) {
            addExecuteRequest(str, bootstrapRequest);
            return;
        }
        if (this.pendingBootstrapRequest.containsKey(str)) {
            this.pendingBootstrapRequest.get(str).add(bootstrapRequest);
        } else {
            LinkedList linkedList = new LinkedList();
            linkedList.add(bootstrapRequest);
            this.pendingBootstrapRequest.put(str, linkedList);
        }
        handleBootstrapCheck(64);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addExecuteRequest(String str, AsyncExecuteThread.ExecuteRequest executeRequest) {
        AsyncExecuteThread asyncExecuteThread;
        if (this.executeThreadMap.containsKey(str)) {
            asyncExecuteThread = this.executeThreadMap.get(str);
        } else {
            asyncExecuteThread = (AsyncExecuteThread) Arrays.stream(this.executeThreads).min(Comparator.comparingInt((v0) -> {
                return v0.queueSize();
            })).get();
            this.executeThreadMap.put(str, asyncExecuteThread);
        }
        asyncExecuteThread.addRequest(executeRequest);
    }

    protected void commitWriteSession(List<WriterCommitMessage> list, String str, String str2, long j) throws IOException {
        TableBatchWriteSession writeSession = OdpsUtils.getWriteSession(this.tableIdentifier, str, str2, this.settings);
        try {
            if (writeSession.getStatus().equals(SessionStatus.COMMITTED)) {
                LOG.info("Session has committed {}, partition {}", str2, str);
                return;
            }
            LOG.info("Start to commit session {}, partition {}, row count {}", new Object[]{writeSession.getId(), str, Long.valueOf(j)});
            long nanoTime = System.nanoTime();
            writeSession.commit((WriterCommitMessage[]) list.toArray(new WriterCommitMessage[0]));
            LOG.info("Commit session {}, time taken ms: {}", str2, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    protected String initDynamicWriteSession(String str) throws IOException {
        return OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, null, this.settings).getId();
    }

    private ActiveSessionInfo addPendingCommitSession(String str, String str2, int i, WriterCommitMessage writerCommitMessage, long j) {
        ActiveSessionInfo computeIfAbsent = this.currentActiveSessions.computeIfAbsent(str, str3 -> {
            return new HashMap();
        }).computeIfAbsent(str2, str4 -> {
            return new ActiveSessionInfo();
        });
        computeIfAbsent.addCommitMessage(writerCommitMessage);
        computeIfAbsent.addCompleteTask(Integer.valueOf(i));
        computeIfAbsent.addRowCount(Long.valueOf(j));
        return computeIfAbsent;
    }

    private void clearState(long j) {
        if (this.started) {
            this.executor.execute(() -> {
                this.currentActiveSessions.clear();
                this.pendingBootstrapRequest.clear();
                this.taskStatus.clear();
                this.taskCheckpointStatus.clear();
                for (AsyncExecuteThread asyncExecuteThread : this.executeThreads) {
                    asyncExecuteThread.reset(j);
                }
                this.latestCheckpoint = j;
            }, "Reset to checkpoint " + j, new Object[0]);
        } else {
            this.latestCheckpoint = j;
        }
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initStaticWriteSession(int i) throws IOException {
        throw new UnsupportedOperationException("Unsupported init static write session");
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initDynamicWriteSession(int i, String str) throws IOException {
        throw new UnsupportedOperationException("Unsupported init dynamic write session");
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected boolean commitWriteSession() throws IOException {
        throw new UnsupportedOperationException("Unsupported commit write session");
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator, org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleEndInputEvent(SinkTaskEvent sinkTaskEvent) {
        throw new UnsupportedOperationException("Unsupported end input event");
    }
}
