package org.apache.flink.odps.sink.utils;

import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/utils/ExecuteThread.class */
public class ExecuteThread extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ExecuteThread.class);
    protected final LinkedBlockingQueue<ExecuteRequest> requestQueue;
    protected final LinkedBlockingQueue<ExecuteRequest> commitQueue = new LinkedBlockingQueue<>();
    protected final LinkedBlockingQueue<ExecuteRequest> compactQueue = new LinkedBlockingQueue<>();
    private volatile boolean alive = true;

    /* loaded from: input_file:org/apache/flink/odps/sink/utils/ExecuteThread$ExecuteRequest.class */
    public interface ExecuteRequest {

        /* loaded from: input_file:org/apache/flink/odps/sink/utils/ExecuteThread$ExecuteRequest$ExecuteType.class */
        public enum ExecuteType {
            COMMIT,
            COMPACT
        }

        void execute() throws IOException;

        void requestDone(IOException iOException);

        ExecuteType getExecuteType();
    }

    public ExecuteThread(LinkedBlockingQueue<ExecuteRequest> linkedBlockingQueue) {
        this.requestQueue = linkedBlockingQueue;
    }

    public void addRequest(ExecuteRequest executeRequest) {
        if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.COMMIT)) {
            this.commitQueue.add(executeRequest);
        } else if (executeRequest.getExecuteType().equals(ExecuteRequest.ExecuteType.COMPACT)) {
            this.compactQueue.add(executeRequest);
        }
    }

    private void clearQueue(LinkedBlockingQueue<ExecuteRequest> linkedBlockingQueue, IOException iOException) {
        while (!linkedBlockingQueue.isEmpty()) {
            ExecuteRequest poll = linkedBlockingQueue.poll();
            if (poll != null) {
                try {
                    poll.requestDone(iOException);
                } catch (Throwable th) {
                    LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
            }
        }
    }

    public void cancel() {
        synchronized (this) {
            IOException iOException = new IOException("Request has been cancelled.");
            clearQueue(this.commitQueue, iOException);
            clearQueue(this.compactQueue, iOException);
            interrupt();
        }
    }

    public void shutdown() {
        synchronized (this) {
            if (this.alive) {
                this.alive = false;
                interrupt();
            }
            try {
                join(1000L);
            } catch (InterruptedException e) {
            }
            IOException iOException = new IOException("Execute thread has been closed.");
            clearQueue(this.commitQueue, iOException);
            clearQueue(this.compactQueue, iOException);
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.alive) {
            ExecuteRequest executeRequest = null;
            while (this.alive && executeRequest == null) {
                if (!this.commitQueue.isEmpty()) {
                    executeRequest = this.commitQueue.poll();
                }
                if (executeRequest == null) {
                    try {
                        ExecuteRequest poll = this.requestQueue.poll(1000L, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            addRequest(poll);
                        }
                    } catch (InterruptedException e) {
                        if (!this.alive) {
                            return;
                        } else {
                            LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                        }
                    }
                }
                if (executeRequest == null) {
                    if (!this.commitQueue.isEmpty()) {
                        executeRequest = this.commitQueue.poll();
                    } else if (!this.compactQueue.isEmpty()) {
                        executeRequest = this.compactQueue.poll();
                    }
                }
            }
            if (executeRequest != null) {
                IOException iOException = null;
                try {
                    executeRequest.execute();
                } catch (IOException e2) {
                    iOException = e2;
                } catch (Throwable th) {
                    iOException = new IOException("Execute failed : " + th.getMessage(), th);
                    LOG.error("I/O writing thread encountered an error" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                }
                try {
                    executeRequest.requestDone(iOException);
                } catch (Throwable th2) {
                    LOG.error("The handler of the request-complete-callback threw an exception" + (th2.getMessage() == null ? "." : ": " + th2.getMessage()), th2);
                }
            }
        }
    }
}
