package org.apache.flink.odps.sink.upsert;

import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.common.OdpsWriteCoordinator;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.table.TableUpsertSessionImpl;
import org.apache.flink.odps.sink.table.TableUtils;
import org.apache.flink.odps.sink.utils.ExecuteThread;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/upsert/UpsertCoordinator.class */
public class UpsertCoordinator extends OdpsWriteCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(UpsertCoordinator.class);
    protected final int dynamicPartitionLimit;
    protected boolean enableMajorCompact;
    protected final boolean isGroupPartition;
    protected final boolean supportsGrouping;
    protected transient Map<String, String> tableUpsertSessionIdMap;
    protected transient Map<String, Long> upsertCommitTimes;
    protected transient Map<String, ExecuteThread> compactThreadMap;
    protected transient ExecuteThread[] executeThreads;
    protected transient LinkedBlockingQueue<ExecuteThread.ExecuteRequest> requestQueue;
    protected transient int compactThreshold;
    private final AtomicLong totalCommit;
    private final AtomicLong totalCommitTimes;
    private final AtomicLong totalRowCount;
    private final AtomicLong totalCompact;
    private final AtomicLong totalCompactTimes;

    public UpsertCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str2, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions) {
        super(str, configuration, context, odpsConf, tableIdentifier, str2, dataSchema, z, odpsWriteOptions);
        this.compactThreshold = 1;
        this.totalCommit = new AtomicLong(0L);
        this.totalCommitTimes = new AtomicLong(0L);
        this.totalRowCount = new AtomicLong(0L);
        this.totalCompact = new AtomicLong(0L);
        this.totalCompactTimes = new AtomicLong(0L);
        this.isGroupPartition = z2;
        this.supportsGrouping = z3;
        this.dynamicPartitionLimit = ((Integer) configuration.get(OdpsOptions.SINK_DYNAMIC_PARTITION_LIMIT)).intValue();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void initEnv() {
        this.settings = TableUtils.getEnvironmentSettings(getOdps(), this.odpsConf.getTunnelEndpoint(), this.conf);
        this.tableUpsertSessionIdMap = new ConcurrentHashMap();
        this.upsertCommitTimes = new ConcurrentHashMap();
        int integer = this.conf.getInteger(OdpsOptions.UPSERT_COMMIT_THREAD_NUM);
        this.enableMajorCompact = this.conf.getBoolean(OdpsOptions.UPSERT_MAJOR_COMPACT_ENABLE);
        this.compactThreshold = this.conf.getInteger(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS);
        this.requestQueue = new LinkedBlockingQueue<>();
        this.executeThreads = new ExecuteThread[integer];
        this.compactThreadMap = new ConcurrentHashMap();
        for (int i = 0; i < integer; i++) {
            ExecuteThread executeThread = new ExecuteThread(this.requestQueue);
            this.executeThreads[i] = executeThread;
            executeThread.setName("OdpsBucket commit thread #" + (i + 1));
            executeThread.setDaemon(true);
            executeThread.setUncaughtExceptionHandler((thread, th) -> {
                LOG.error("IO Thread '" + thread.getName() + "' terminated due to an exception. Shutting down I/O Manager.", th);
            });
            executeThread.start();
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void close() throws Exception {
        ArrayList arrayList = new ArrayList(this.executeThreads.length);
        for (ExecuteThread executeThread : this.executeThreads) {
            arrayList.add(() -> {
                try {
                    executeThread.shutdown();
                } catch (Throwable th) {
                    throw new IOException("Error while shutting down commit thread.", th);
                }
            });
        }
        IOUtils.closeAll(arrayList);
        super.close();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void notifyCheckpointAborted(long j) {
        super.notifyCheckpointAborted(j);
        clearState();
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void resetToCheckpoint(long j, byte[] bArr) {
        super.resetToCheckpoint(j, bArr);
        clearState();
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initStaticWriteSession(int i) throws IOException {
        this.executor.execute(() -> {
            String id;
            if (this.tableUpsertSessionIdMap.containsKey(this.staticPartition)) {
                id = this.tableUpsertSessionIdMap.get(this.staticPartition);
            } else {
                TableUpsertSessionImpl tableUpsertSessionImpl = null;
                try {
                    tableUpsertSessionImpl = OdpsUtils.getOrCreateUpsertSession(this.tableIdentifier, this.staticPartition, null, this.settings);
                    id = tableUpsertSessionImpl.getId();
                    if (tableUpsertSessionImpl != null) {
                        tableUpsertSessionImpl.close();
                    }
                    this.tableUpsertSessionIdMap.put(this.staticPartition, id);
                } catch (Throwable th) {
                    if (tableUpsertSessionImpl != null) {
                        tableUpsertSessionImpl.close();
                    }
                    throw th;
                }
            }
            sendSingleTaskAckEvents(-1L, i, this.staticPartition, id, false);
        }, "initialize dynamic partition session", new Object[0]);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected void initDynamicWriteSession(int i, String str) throws IOException {
        this.executor.execute(() -> {
            String id;
            if (this.tableUpsertSessionIdMap.containsKey(str)) {
                id = this.tableUpsertSessionIdMap.get(str);
            } else {
                if (this.tableUpsertSessionIdMap.size() >= this.dynamicPartitionLimit) {
                    throw new FlinkOdpsException("Dynamic partition limit: " + this.dynamicPartitionLimit);
                }
                OdpsUtils.createPartitionIfNeeded(getTableMetaProvider(), this.tableIdentifier, str);
                TableUpsertSessionImpl tableUpsertSessionImpl = null;
                try {
                    tableUpsertSessionImpl = OdpsUtils.getOrCreateUpsertSession(this.tableIdentifier, str, null, this.settings);
                    id = tableUpsertSessionImpl.getId();
                    if (tableUpsertSessionImpl != null) {
                        tableUpsertSessionImpl.close();
                    }
                    this.tableUpsertSessionIdMap.put(str, id);
                } catch (Throwable th) {
                    if (tableUpsertSessionImpl != null) {
                        tableUpsertSessionImpl.close();
                    }
                    throw th;
                }
            }
            sendSingleTaskAckEvents(-1L, i, str, id, false);
        }, "initialize dynamic partition session", new Object[0]);
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteCoordinator
    protected boolean commitWriteSession() throws IOException {
        if (!Arrays.stream(this.eventBuffer).allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            return false;
        }
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final HashMap hashMap = new HashMap();
        for (WriterStatus writerStatus : (List) Arrays.stream(this.eventBuffer).map((v0) -> {
            return v0.getWriterStatuses();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList())) {
            String partitionSpec = writerStatus.getPartitionSpec();
            if (!hashMap.containsKey(partitionSpec)) {
                hashMap.put(partitionSpec, Long.valueOf(writerStatus.getTotalRecords()));
            } else {
                if (this.isGroupPartition) {
                    throw new FlinkOdpsException("Duplicated partition " + partitionSpec + " for group partition mode! ");
                }
                hashMap.put(partitionSpec, Long.valueOf(((Long) hashMap.get(partitionSpec)).longValue() + writerStatus.getTotalRecords()));
            }
            if (!concurrentHashMap.containsKey(partitionSpec)) {
                concurrentHashMap.put(partitionSpec, writerStatus.getSessionId());
            } else if (!((String) concurrentHashMap.get(partitionSpec)).equals(writerStatus.getSessionId())) {
                throw new FlinkOdpsException("Partition " + partitionSpec + " has different session id: " + writerStatus.getSessionId() + ", " + ((String) concurrentHashMap.get(partitionSpec)));
            }
        }
        Set<String> keySet = hashMap.keySet();
        if (!this.isGroupPartition && !this.tableUpsertSessionIdMap.keySet().equals(keySet)) {
            throw new IllegalArgumentException("Expected partition " + this.tableUpsertSessionIdMap.keySet() + ", actual partition " + keySet);
        }
        Set set = (Set) Arrays.stream(this.eventBuffer).map((v0) -> {
            return v0.getCheckpointID();
        }).collect(Collectors.toSet());
        if (set.size() != 1) {
            throw new IllegalArgumentException("Expected 1 checkpointId, actual checkpointIds " + set);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(keySet.size());
        long nanoTime = System.nanoTime();
        final ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        for (final String str : keySet) {
            try {
                concurrentHashMap2.put(str, false);
                ExecuteThread.ExecuteRequest executeRequest = new ExecuteThread.ExecuteRequest() { // from class: org.apache.flink.odps.sink.upsert.UpsertCoordinator.1
                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public void execute() throws IOException {
                        TableUpsertSessionImpl orCreateUpsertSession;
                        long nanoTime2 = System.nanoTime();
                        TableUpsertSessionImpl tableUpsertSessionImpl = null;
                        try {
                            try {
                                if (UpsertCoordinator.this.isGroupPartition) {
                                    orCreateUpsertSession = OdpsUtils.getOrCreateUpsertSession(UpsertCoordinator.this.tableIdentifier, str, (String) concurrentHashMap.get(str), UpsertCoordinator.this.settings);
                                } else {
                                    orCreateUpsertSession = OdpsUtils.getOrCreateUpsertSession(UpsertCoordinator.this.tableIdentifier, str, UpsertCoordinator.this.tableUpsertSessionIdMap.get(str), UpsertCoordinator.this.settings);
                                    if (!orCreateUpsertSession.getId().equals(concurrentHashMap.get(str))) {
                                        throw new FlinkOdpsException("Partition " + str + " has different session id: " + orCreateUpsertSession.getId() + ", " + ((String) concurrentHashMap.get(str)));
                                    }
                                }
                                UpsertCoordinator.LOG.info("Start to commit session {}, partition {}, row count {}", new Object[]{orCreateUpsertSession.getId(), str, hashMap.get(str)});
                                orCreateUpsertSession.commit();
                                UpsertCoordinator.LOG.info("Commit session {}, time taken ms: {}", orCreateUpsertSession.getId(), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2)));
                                concurrentHashMap2.put(str, true);
                                UpsertCoordinator.this.totalCommit.incrementAndGet();
                                UpsertCoordinator.this.totalCommitTimes.addAndGet(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2));
                                UpsertCoordinator.this.totalRowCount.addAndGet(((Long) hashMap.get(str)).longValue());
                                if (orCreateUpsertSession != null) {
                                    orCreateUpsertSession.close();
                                }
                            } catch (Exception e) {
                                concurrentHashMap2.put(str, false);
                                throw new IOException(e);
                            }
                        } catch (Throwable th) {
                            if (0 != 0) {
                                tableUpsertSessionImpl.close();
                            }
                            throw th;
                        }
                    }

                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public void requestDone(IOException iOException) {
                        countDownLatch.countDown();
                        if (iOException == null) {
                            UpsertCoordinator.this.upsertCommitTimes.put(str, Long.valueOf(UpsertCoordinator.this.upsertCommitTimes.getOrDefault(str, 0L).longValue() + 1));
                        } else {
                            UpsertCoordinator.LOG.error("Commit session error", iOException);
                            if (iOException.getMessage().contains("Request has been cancelled.")) {
                                return;
                            }
                            UpsertCoordinator.this.context.failJob(new FlinkOdpsException(iOException));
                        }
                    }

                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public ExecuteThread.ExecuteRequest.ExecuteType getExecuteType() {
                        return ExecuteThread.ExecuteRequest.ExecuteType.COMMIT;
                    }
                };
                if (this.compactThreadMap.containsKey(str)) {
                    this.compactThreadMap.get(str).addRequest(executeRequest);
                } else {
                    this.requestQueue.add(executeRequest);
                }
            } catch (Throwable th) {
                clearRequest();
                clearSessionInfo();
                throw th;
            }
        }
        try {
            countDownLatch.await();
            if (!concurrentHashMap2.values().stream().allMatch(bool -> {
                return bool.booleanValue();
            })) {
                throw new IOException("Commit error");
            }
            LOG.info("Checkpoint commit table partition size {}, time taken ms: {}", Integer.valueOf(keySet.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
            LOG.info("Commit times: {}, time taken ms avg: {}, total row count: {}", new Object[]{Long.valueOf(this.totalCommit.get()), Double.valueOf(this.totalCommitTimes.get() / this.totalCommit.get()), Long.valueOf(this.totalRowCount.get())});
            clearRequest();
            clearSessionInfo();
            if (!this.enableMajorCompact) {
                return true;
            }
            doCompactPartitions(Arrays.stream(this.eventBuffer).allMatch(sinkTaskEvent -> {
                return sinkTaskEvent != null && sinkTaskEvent.isEndInput();
            }) ? false : true);
            return true;
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    protected void doCompactPartitions(final boolean z) throws IOException {
        LinkedList<String> linkedList = new LinkedList();
        for (Map.Entry<String, Long> entry : this.upsertCommitTimes.entrySet()) {
            if (entry.getValue().longValue() >= this.compactThreshold && !this.compactThreadMap.containsKey(entry.getKey())) {
                linkedList.add(entry.getKey());
            }
        }
        if (linkedList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(linkedList.size());
            long nanoTime = System.nanoTime();
            for (final String str : linkedList) {
                this.compactThreadMap.computeIfAbsent(str, str2 -> {
                    return this.executeThreads[this.compactThreadMap.size() % this.executeThreads.length];
                }).addRequest(new ExecuteThread.ExecuteRequest() { // from class: org.apache.flink.odps.sink.upsert.UpsertCoordinator.2
                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public void execute() throws IOException {
                        try {
                            long nanoTime2 = System.nanoTime();
                            OdpsUtils.majorCompact(UpsertCoordinator.this.odps, UpsertCoordinator.this.tableIdentifier, str);
                            UpsertCoordinator.LOG.info("Checkpoint compact table partition {}, time taken ms: {}", str, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2)));
                            UpsertCoordinator.this.totalCompact.incrementAndGet();
                            UpsertCoordinator.this.totalCompactTimes.addAndGet(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime2));
                        } catch (Exception e) {
                            throw new IOException(e);
                        }
                    }

                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public void requestDone(IOException iOException) {
                        UpsertCoordinator.this.compactThreadMap.remove(str);
                        if (!z) {
                            countDownLatch.countDown();
                        }
                        if (iOException != null) {
                            UpsertCoordinator.LOG.error("Checkpoint compact table partition {} failed", str, iOException);
                        } else {
                            UpsertCoordinator.this.upsertCommitTimes.remove(str);
                        }
                    }

                    @Override // org.apache.flink.odps.sink.utils.ExecuteThread.ExecuteRequest
                    public ExecuteThread.ExecuteRequest.ExecuteType getExecuteType() {
                        return ExecuteThread.ExecuteRequest.ExecuteType.COMPACT;
                    }
                });
            }
            if (z) {
                return;
            }
            try {
                countDownLatch.await();
                LOG.info("Checkpoint compact table partition size {}, time taken ms: {}", Integer.valueOf(linkedList.size()), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
                LOG.info("Compact times: {}, time taken ms avg: {}", Long.valueOf(this.totalCompact.get()), Double.valueOf(this.totalCompactTimes.get() / this.totalCompact.get()));
            } catch (InterruptedException e) {
                LOG.error("Compact interrupted exception", e);
            }
        }
    }

    private void clearSessionInfo() {
        this.tableUpsertSessionIdMap.clear();
    }

    private void clearRequest() {
        while (!this.requestQueue.isEmpty()) {
            IOException iOException = new IOException("Request has been cancelled.");
            ExecuteThread.ExecuteRequest poll = this.requestQueue.poll();
            if (poll != null) {
                try {
                    poll.requestDone(iOException);
                } catch (Throwable th) {
                    LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
            }
        }
    }

    private void clearState() {
        if (this.started) {
            clearRequest();
            clearSessionInfo();
            for (ExecuteThread executeThread : this.executeThreads) {
                executeThread.cancel();
            }
        }
    }
}
