package org.apache.flink.odps.sink.utils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread.class */
public class AsyncExecuteThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecuteThread.class);
    protected final LinkedBlockingQueue<ExecuteRequest> compactQueue = new LinkedBlockingQueue<>();
    private volatile boolean alive = true;
    protected final Map<String, String> currentActiveSessions = new HashMap();
    protected final Map<String, SortedMap<Long, Set<String>>> successCommitRequest = new HashMap();
    protected final SortedMap<Long, Map<String, String>> successBootstrapRequest = new TreeMap();
    protected final SortedMap<Long, Map<String, String>> failedCommitRequest = new TreeMap();
    protected final PriorityBlockingQueue<ExecuteRequest> executeQueue = new PriorityBlockingQueue<>(64, (executeRequest, executeRequest2) -> {
        ExecuteRequest.ExecuteType executeType = executeRequest.getExecuteType();
        ExecuteRequest.ExecuteType executeType2 = executeRequest2.getExecuteType();
        long checkpointId = executeRequest.getCheckpointId();
        long checkpointId2 = executeRequest2.getCheckpointId();
        if (checkpointId != checkpointId2) {
            return checkpointId > checkpointId2 ? 1 : -1;
        }
        if (executeType.equals(ExecuteRequest.ExecuteType.RESET)) {
            return -1;
        }
        if (executeType2.equals(ExecuteRequest.ExecuteType.RESET)) {
            return 1;
        }
        if (executeType.equals(ExecuteRequest.ExecuteType.SNAPSHOT)) {
            return -1;
        }
        if (executeType2.equals(ExecuteRequest.ExecuteType.SNAPSHOT)) {
            return 1;
        }
        if (executeType.equals(ExecuteRequest.ExecuteType.COMMIT)) {
            return (executeType2.equals(ExecuteRequest.ExecuteType.BOOTSTRAP) && ((BootstrapRequest) executeRequest2).isReUpload()) ? 1 : -1;
        }
        if (executeType2.equals(ExecuteRequest.ExecuteType.COMMIT)) {
            return (executeType.equals(ExecuteRequest.ExecuteType.BOOTSTRAP) && ((BootstrapRequest) executeRequest).isReUpload()) ? -1 : 1;
        }
        return 0;
    });

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$BootstrapRequest.class */
    public interface BootstrapRequest extends SessionRequest {
        void requestDone(IOException iOException, String str);

        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default void requestDone(IOException iOException) {
            requestDone(iOException, null);
        }

        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default ExecuteRequest.ExecuteType getExecuteType() {
            return ExecuteRequest.ExecuteType.BOOTSTRAP;
        }

        boolean isReUpload();

        boolean isCreatePartition();

        void createPartition(boolean z);
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$CommitRequest.class */
    public interface CommitRequest extends SessionRequest {
        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default ExecuteRequest.ExecuteType getExecuteType() {
            return ExecuteRequest.ExecuteType.COMMIT;
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$CompactRequest.class */
    public interface CompactRequest extends SessionRequest {
        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default ExecuteRequest.ExecuteType getExecuteType() {
            return ExecuteRequest.ExecuteType.COMPACT;
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$ExecuteRequest.class */
    public interface ExecuteRequest {

        /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$ExecuteRequest$ExecuteType.class */
        public enum ExecuteType {
            BOOTSTRAP,
            COMMIT,
            COMPACT,
            SNAPSHOT,
            RESET
        }

        void execute() throws IOException;

        void requestDone(IOException iOException);

        ExecuteType getExecuteType();

        long getCheckpointId();
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$ResetRequest.class */
    public interface ResetRequest extends ExecuteRequest {
        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default ExecuteRequest.ExecuteType getExecuteType() {
            return ExecuteRequest.ExecuteType.RESET;
        }
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$SessionRequest.class */
    public interface SessionRequest extends ExecuteRequest {
        String getPartition();

        String getSessionId();
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/AsyncExecuteThread$SnapshotRequest.class */
    public interface SnapshotRequest extends ExecuteRequest {
        @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
        default ExecuteRequest.ExecuteType getExecuteType() {
            return ExecuteRequest.ExecuteType.SNAPSHOT;
        }

        boolean isStarted();
    }

    public int queueSize() {
        return this.executeQueue.size() + this.compactQueue.size();
    }

    public void addRequest(ExecuteRequest executeRequest) {
        if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.RESET)) {
            IOException iOException = new IOException("Request has been cancelled.");
            LOG.info("Clear execute queue for reset");
            clearQueue(this.executeQueue, iOException);
            clearQueue(this.compactQueue, iOException);
        }
        if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.BOOTSTRAP) || executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.COMMIT) || executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.SNAPSHOT) || executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.RESET)) {
            this.executeQueue.add(executeRequest);
        } else if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.COMPACT)) {
            this.compactQueue.add(executeRequest);
        }
    }

    private void clearQueue(Queue<ExecuteRequest> queue, IOException iOException) {
        while (!queue.isEmpty()) {
            ExecuteRequest poll = queue.poll();
            if (poll != null) {
                try {
                    poll.requestDone(iOException);
                } catch (Throwable th) {
                    LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
            }
        }
    }

    private void clearQueue(Queue<ExecuteRequest> queue, IOException iOException, long j) {
        LinkedList linkedList = new LinkedList();
        while (!queue.isEmpty()) {
            ExecuteRequest poll = queue.poll();
            if (poll != null) {
                if (poll.getCheckpointId() > j) {
                    try {
                        poll.requestDone(iOException);
                    } catch (Throwable th) {
                        LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                    }
                } else if (poll.getCheckpointId() < j) {
                    LOG.error("The request checkpoint {} less than {}", Long.valueOf(poll.getCheckpointId()), Long.valueOf(j));
                } else {
                    linkedList.add(poll);
                }
            }
        }
        queue.addAll(linkedList);
    }

    public void reset(final long j) {
        addRequest(new ResetRequest() { // from class: org.apache.flink.odps.sink.utils.AsyncExecuteThread.1
            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void execute() throws IOException {
                AsyncExecuteThread.this.currentActiveSessions.clear();
                Set<Map.Entry<Long, Map<String, String>>> entrySet = AsyncExecuteThread.this.successBootstrapRequest.entrySet();
                long j2 = j;
                entrySet.removeIf(entry -> {
                    return ((Long) entry.getKey()).longValue() > j2;
                });
                Set<Map.Entry<Long, Map<String, String>>> entrySet2 = AsyncExecuteThread.this.failedCommitRequest.entrySet();
                long j3 = j;
                entrySet2.removeIf(entry2 -> {
                    return ((Long) entry2.getKey()).longValue() > j3;
                });
                Iterator<Map.Entry<String, SortedMap<Long, Set<String>>>> it = AsyncExecuteThread.this.successCommitRequest.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, SortedMap<Long, Set<String>>> next = it.next();
                    Set<Map.Entry<Long, Set<String>>> entrySet3 = next.getValue().entrySet();
                    long j4 = j;
                    entrySet3.removeIf(entry3 -> {
                        return ((Long) entry3.getKey()).longValue() > j4;
                    });
                    if (next.getValue().isEmpty()) {
                        it.remove();
                    }
                }
                if ((!AsyncExecuteThread.this.failedCommitRequest.isEmpty() && AsyncExecuteThread.this.failedCommitRequest.lastKey().longValue() <= j) || (!AsyncExecuteThread.this.successBootstrapRequest.isEmpty() && AsyncExecuteThread.this.successBootstrapRequest.lastKey().longValue() <= j)) {
                    AsyncExecuteThread.LOG.info("Reset to checkpoint {} has uncommitted session", Long.valueOf(j));
                } else if (AsyncExecuteThread.this.successCommitRequest.isEmpty()) {
                    AsyncExecuteThread.LOG.info("Reset to checkpoint {}, all state is empty", Long.valueOf(j));
                } else {
                    AsyncExecuteThread.LOG.info("Reset to checkpoint {}, all session committed", Long.valueOf(j));
                }
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void requestDone(IOException iOException) {
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public long getCheckpointId() {
                return j;
            }
        });
    }

    public void shutdown() {
        synchronized (this) {
            if (this.alive) {
                this.alive = false;
                interrupt();
            }
            try {
                join(1000L);
            } catch (InterruptedException e) {
            }
            this.currentActiveSessions.clear();
            this.successBootstrapRequest.clear();
            this.successCommitRequest.clear();
            this.failedCommitRequest.clear();
            IOException iOException = new IOException("Execute thread has been closed.");
            clearQueue(this.executeQueue, iOException);
            clearQueue(this.compactQueue, iOException);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.alive) {
            ExecuteRequest executeRequest = null;
            while (this.alive && executeRequest == null) {
                try {
                    executeRequest = this.executeQueue.poll(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (!this.alive) {
                        return;
                    } else {
                        LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                    }
                }
                if (executeRequest == null) {
                    if (!this.executeQueue.isEmpty()) {
                        executeRequest = this.executeQueue.poll();
                    } else if (!this.compactQueue.isEmpty()) {
                        executeRequest = this.compactQueue.poll();
                    }
                }
            }
            if (executeRequest != null) {
                IOException iOException = null;
                long checkpointId = executeRequest.getCheckpointId();
                if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.RESET)) {
                    try {
                        executeRequest.execute();
                    } catch (IOException e2) {
                        iOException = e2;
                    } catch (Throwable th) {
                        iOException = new IOException("Reset failed: " + th.getMessage(), th);
                        LOG.error("Execute thread encountered an error" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                    }
                    executeRequest.requestDone(iOException);
                } else if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.SNAPSHOT)) {
                    SnapshotRequest snapshotRequest = (SnapshotRequest) executeRequest;
                    if (snapshotRequest.isStarted()) {
                        if (!this.failedCommitRequest.isEmpty() && this.failedCommitRequest.lastKey().longValue() < checkpointId) {
                            addRequest(executeRequest);
                        } else if (!this.successBootstrapRequest.isEmpty() && this.successBootstrapRequest.lastKey().longValue() < checkpointId) {
                            addRequest(executeRequest);
                        }
                    } else if (this.currentActiveSessions.isEmpty()) {
                        LOG.info("Current active session has been removed for checkpoint {}", Long.valueOf(checkpointId));
                    } else {
                        this.successBootstrapRequest.computeIfAbsent(Long.valueOf(checkpointId), l -> {
                            return new HashMap();
                        }).putAll(this.currentActiveSessions);
                        LOG.info("Put all committing sessions for checkpoint {}", Long.valueOf(checkpointId));
                        this.currentActiveSessions.clear();
                    }
                    snapshotRequest.requestDone(null);
                } else if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.BOOTSTRAP)) {
                    BootstrapRequest bootstrapRequest = (BootstrapRequest) executeRequest;
                    String partition = bootstrapRequest.getPartition();
                    boolean z = true;
                    String str = null;
                    if (!bootstrapRequest.isReUpload() && this.currentActiveSessions.containsKey(partition)) {
                        str = this.currentActiveSessions.get(partition);
                        z = false;
                    }
                    if (z) {
                        try {
                            bootstrapRequest.execute();
                            str = bootstrapRequest.getSessionId();
                        } catch (IOException e3) {
                            iOException = e3;
                        } catch (Throwable th2) {
                            iOException = new IOException("Create failed: " + th2.getMessage(), th2);
                            LOG.error("Execute thread encountered an error" + (th2.getMessage() == null ? "." : ": " + th2.getMessage()), th2);
                        }
                        if (iOException == null) {
                            if (bootstrapRequest.isReUpload()) {
                                LOG.info("Reupload session {}, checkpoint {}, partition {}", new Object[]{str, Long.valueOf(checkpointId), partition});
                            } else {
                                this.currentActiveSessions.put(partition, str);
                            }
                        }
                        try {
                            bootstrapRequest.requestDone(iOException, str);
                        } catch (Throwable th3) {
                            LOG.error("The handler of the request-complete-callback threw an exception" + (th3.getMessage() == null ? "." : ": " + th3.getMessage()), th3);
                        }
                    } else {
                        LOG.debug("Session has created {}, partition {}", str, partition);
                    }
                } else if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.COMMIT)) {
                    CommitRequest commitRequest = (CommitRequest) executeRequest;
                    String partition2 = commitRequest.getPartition();
                    if (this.failedCommitRequest.isEmpty() || this.failedCommitRequest.lastKey().longValue() >= checkpointId || !this.failedCommitRequest.get(this.failedCommitRequest.lastKey()).containsKey(partition2)) {
                        String sessionId = commitRequest.getSessionId();
                        boolean z2 = false;
                        if (this.successCommitRequest.containsKey(partition2)) {
                            SortedMap<Long, Set<String>> sortedMap = this.successCommitRequest.get(partition2);
                            if (sortedMap.containsKey(Long.valueOf(checkpointId)) && !sortedMap.get(Long.valueOf(checkpointId)).isEmpty()) {
                                z2 = true;
                            }
                        }
                        if (z2) {
                            LOG.info("Session has committed {}, partition {}, checkpoint {}", new Object[]{sessionId, partition2, Long.valueOf(checkpointId)});
                        } else {
                            try {
                                commitRequest.execute();
                            } catch (IOException e4) {
                                if (ExceptionUtils.findThrowable(e4, InterruptedException.class).isPresent()) {
                                    LOG.error("Commit request has been interrupted");
                                    iOException = new IOException("Request has been cancelled.");
                                } else {
                                    iOException = e4;
                                }
                            } catch (Throwable th4) {
                                iOException = new IOException("Commit failed: " + th4.getMessage(), th4);
                                LOG.error("Commit thread encountered an error" + (th4.getMessage() == null ? "." : ": " + th4.getMessage()), th4);
                            }
                        }
                        if (this.currentActiveSessions.containsKey(partition2) && this.currentActiveSessions.get(partition2).equals(sessionId)) {
                            this.currentActiveSessions.remove(partition2);
                        } else if (this.successBootstrapRequest.containsKey(Long.valueOf(checkpointId))) {
                            Map<String, String> map = this.successBootstrapRequest.get(Long.valueOf(checkpointId));
                            if (map.containsKey(partition2) && map.get(partition2).equals(sessionId)) {
                                map.remove(partition2);
                            }
                            if (map.isEmpty()) {
                                LOG.info("Session has been removed for checkpoint {}", Long.valueOf(checkpointId));
                                this.successBootstrapRequest.entrySet().removeIf(entry -> {
                                    return ((Long) entry.getKey()).longValue() <= checkpointId;
                                });
                            }
                        }
                        if (iOException != null) {
                            this.failedCommitRequest.computeIfAbsent(Long.valueOf(checkpointId), l2 -> {
                                return new HashMap();
                            }).put(partition2, sessionId);
                            LOG.info("Add failed commit request for partition {}, checkpoint {}", partition2, Long.valueOf(checkpointId));
                        } else {
                            if (this.failedCommitRequest.containsKey(Long.valueOf(checkpointId))) {
                                Map<String, String> map2 = this.failedCommitRequest.get(Long.valueOf(checkpointId));
                                if (map2.containsKey(partition2)) {
                                    LOG.info("Failed commit request for partition {} has been removed, checkpoint {}", partition2, Long.valueOf(checkpointId));
                                    map2.remove(partition2);
                                }
                                if (map2.isEmpty()) {
                                    this.failedCommitRequest.entrySet().removeIf(entry2 -> {
                                        return ((Long) entry2.getKey()).longValue() <= checkpointId;
                                    });
                                }
                            }
                            this.successCommitRequest.computeIfAbsent(partition2, str2 -> {
                                return new TreeMap();
                            }).computeIfAbsent(Long.valueOf(checkpointId), l3 -> {
                                return new TreeSet();
                            }).add(sessionId);
                        }
                        try {
                            executeRequest.requestDone(iOException);
                        } catch (Throwable th5) {
                            LOG.error("The handler of the request-complete-callback threw an exception" + (th5.getMessage() == null ? "." : ": " + th5.getMessage()), th5);
                        }
                    } else {
                        addRequest(commitRequest);
                    }
                } else {
                    try {
                        executeRequest.execute();
                    } catch (IOException e5) {
                        iOException = e5;
                    } catch (Throwable th6) {
                        iOException = new IOException("Compact failed: " + th6.getMessage(), th6);
                        LOG.error("Execute thread encountered an error" + (th6.getMessage() == null ? "." : ": " + th6.getMessage()), th6);
                    }
                    try {
                        executeRequest.requestDone(iOException);
                    } catch (Throwable th7) {
                        LOG.error("The handler of the request-complete-callback threw an exception" + (th7.getMessage() == null ? "." : ": " + th7.getMessage()), th7);
                    }
                }
            }
        }
    }
}
