package org.apache.flink.odps.sink.insert;

import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.write.TableBatchWriteSession;
import com.aliyun.odps.table.write.WriterCommitMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.common.OdpsWriteCoordinator;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.table.TableUtils;
import org.apache.flink.odps.sink.utils.NonThrownExecutor;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/insert/InsertCoordinator.class */
public class InsertCoordinator extends OdpsWriteCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(InsertCoordinator.class);
    protected final int dynamicPartitionLimit;
    protected final boolean isGroupPartition;
    protected final boolean supportsGrouping;
    protected transient Map<String, TableBatchWriteSession> tableWriteSessionMap;
    protected transient NonThrownExecutor commitExecutor;
    protected transient List<Future<?>> commitTasks;

    public InsertCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str2, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions) {
        super(str, configuration, context, odpsConf, tableIdentifier, str2, dataSchema, z, odpsWriteOptions);
        this.isGroupPartition = z2;
        this.supportsGrouping = z3;
        this.dynamicPartitionLimit = ((Integer) configuration.get(OdpsOptions.SINK_DYNAMIC_PARTITION_LIMIT)).intValue();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void initEnv() {
        this.settings = TableUtils.getEnvironmentSettings(getOdps(), this.odpsConf.getTunnelEndpoint(), this.conf);
        this.tableWriteSessionMap = new ConcurrentHashMap();
        this.commitExecutor = NonThrownExecutor.builder(LOG).exceptionHook((str, th) -> {
            this.context.failJob(new FlinkOdpsException(str, th));
        }).threadNum(this.conf.getInteger(OdpsOptions.INSERT_COMMIT_THREAD_NUM)).waitForTasksFinish(true).build();
        this.commitTasks = new ArrayList();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void close() throws Exception {
        if (this.commitExecutor != null) {
            this.commitExecutor.close();
        }
        super.close();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void notifyCheckpointAborted(long j) {
        super.notifyCheckpointAborted(j);
        clearState();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void resetToCheckpoint(long j, byte[] bArr) {
        super.resetToCheckpoint(j, bArr);
        clearState();
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initStaticWriteSession(int i) throws IOException {
        this.executor.execute(() -> {
            String id;
            if (this.tableWriteSessionMap.containsKey(this.staticPartition)) {
                id = this.tableWriteSessionMap.get(this.staticPartition).getId();
            } else {
                TableBatchWriteSession orCreateWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, this.staticPartition, null, this.settings);
                id = orCreateWriteSession.getId();
                this.tableWriteSessionMap.put(this.staticPartition, orCreateWriteSession);
            }
            sendSingleTaskAckEvents(-1L, i, this.staticPartition, id, false);
        }, "initialize dynamic partition session", new Object[0]);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initDynamicWriteSession(int i, String str) throws IOException {
        this.executor.execute(() -> {
            String id;
            if (this.tableWriteSessionMap.containsKey(str)) {
                id = this.tableWriteSessionMap.get(str).getId();
            } else {
                if (this.tableWriteSessionMap.size() >= this.dynamicPartitionLimit) {
                    throw new FlinkOdpsException("Dynamic partition limit: " + this.dynamicPartitionLimit);
                }
                OdpsUtils.createPartitionIfNeeded(getTableMetaProvider(), this.tableIdentifier, str);
                TableBatchWriteSession orCreateWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, null, this.settings);
                id = orCreateWriteSession.getId();
                this.tableWriteSessionMap.put(str, orCreateWriteSession);
            }
            sendSingleTaskAckEvents(-1L, i, str, id, false);
        }, "initialize dynamic partition session", new Object[0]);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected boolean commitWriteSession() throws IOException {
        if (!Arrays.stream(this.eventBuffer).allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            return false;
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        HashMap hashMap = new HashMap();
        for (WriterStatus writerStatus : (List) Arrays.stream(this.eventBuffer).map((v0) -> {
            return v0.getWriterStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())) {
            String partitionSpec = writerStatus.getPartitionSpec();
            if (!concurrentHashMap.containsKey(partitionSpec)) {
                concurrentHashMap.put(partitionSpec, new ArrayList());
            } else if (this.isGroupPartition) {
                throw new FlinkOdpsException("Duplicated partition " + partitionSpec + " for group partition mode! ");
            }
            if (!concurrentHashMap2.containsKey(partitionSpec)) {
                concurrentHashMap2.put(partitionSpec, writerStatus.getSessionId());
            } else if (!((String) concurrentHashMap2.get(partitionSpec)).equals(writerStatus.getSessionId())) {
                throw new FlinkOdpsException("Partition " + partitionSpec + " has different writer session id: " + writerStatus.getSessionId() + ", " + ((String) concurrentHashMap2.get(partitionSpec)));
            }
            ((List) concurrentHashMap.get(partitionSpec)).add(writerStatus.getWriterMessage());
            hashMap.put(partitionSpec, Long.valueOf(((Long) hashMap.getOrDefault(partitionSpec, 0L)).longValue() + writerStatus.getTotalRecords()));
        }
        Set<String> keySet = concurrentHashMap.keySet();
        if (!this.isGroupPartition && !this.tableWriteSessionMap.keySet().equals(keySet)) {
            throw new IllegalArgumentException("Expected partition " + this.tableWriteSessionMap.keySet() + ", actual partition " + keySet);
        }
        Set set = (Set) Arrays.stream(this.eventBuffer).map((v0) -> {
            return v0.getCheckpointID();
        }).collect(Collectors.toSet());
        if (set.size() != 1) {
            throw new IllegalArgumentException("Expected 1 checkpointId, actual checkpointIds " + set);
        }
        CountDownLatch countDownLatch = new CountDownLatch(keySet.size());
        ConcurrentHashMap concurrentHashMap3 = new ConcurrentHashMap();
        long nanoTime = System.nanoTime();
        for (String str : keySet) {
            try {
                concurrentHashMap3.put(str, false);
                this.commitTasks.add(this.commitExecutor.submit(() -> {
                    TableBatchWriteSession tableBatchWriteSession;
                    if (this.isGroupPartition) {
                        tableBatchWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, (String) concurrentHashMap2.get(str), this.settings);
                    } else {
                        tableBatchWriteSession = this.tableWriteSessionMap.get(str);
                        if (!tableBatchWriteSession.getId().equals(concurrentHashMap2.get(str))) {
                            throw new FlinkOdpsException("Partition " + str + " has different session id: " + tableBatchWriteSession.getId() + ", " + ((String) concurrentHashMap2.get(str)));
                        }
                    }
                    try {
                        try {
                            LOG.info("Start to commit session {}, partition {}, row count {}", new Object[]{tableBatchWriteSession.getId(), str, hashMap.get(str)});
                            long nanoTime2 = System.nanoTime();
                            tableBatchWriteSession.commit((WriterCommitMessage[]) ((List) concurrentHashMap.get(str)).toArray(new WriterCommitMessage[0]));
                            LOG.info("Commit session {}, time taken ms: {}", tableBatchWriteSession.getId(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2)));
                            concurrentHashMap3.put(str, true);
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            concurrentHashMap3.put(str, false);
                            throw new IOException(e);
                        }
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }, "Commit write session for partition " + str, new Object[0]));
            } catch (Throwable th) {
                clearState();
                throw th;
            }
        }
        try {
            countDownLatch.await();
            if (!concurrentHashMap3.values().stream().allMatch(bool -> {
                return bool.booleanValue();
            })) {
                throw new IOException("Commit error");
            }
            LOG.info("Checkpoint commit table partition size {}, time taken ms: {}", Integer.valueOf(keySet.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
            clearState();
            return true;
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private void clearState() {
        if (this.started) {
            this.tableWriteSessionMap.clear();
            for (Future<?> future : this.commitTasks) {
                if (!future.isDone()) {
                    future.cancel(true);
                }
            }
            this.commitTasks.clear();
        }
    }
}
