package org.apache.flink.odps.sink.cache;

import java.io.IOException;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/cache/WriterThread.class */
public class WriterThread<BucketID> extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(WriterThread.class);
    protected final PriorityBlockingQueue<WriteRequest<BucketID>> requestQueue;
    private volatile boolean alive = true;
    private final int writeRetryTimes;

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/WriterThread$RequestDoneCallback.class */
    public interface RequestDoneCallback {
        void requestSuccessful();

        void requestFailed(IOException iOException);
    }

    /* loaded from: input_file:org/apache/flink/odps/sink/cache/WriterThread$WriteRequest.class */
    public interface WriteRequest<BucketID> {
        void write() throws IOException;

        void requestDone(IOException iOException);

        boolean isReady();

        BucketID getBucketId();

        int partNumber();

        void reset();
    }

    public WriterThread(Comparator<BucketID> comparator, int i) {
        this.requestQueue = new PriorityBlockingQueue<>(64, (writeRequest, writeRequest2) -> {
            if (writeRequest.getBucketId().equals(writeRequest2.getBucketId())) {
                return writeRequest.partNumber() - writeRequest2.partNumber();
            }
            boolean isReady = writeRequest.isReady();
            boolean isReady2 = writeRequest2.isReady();
            return (!(isReady && isReady2) && (isReady || isReady2)) ? isReady ? -1 : 1 : comparator.compare(writeRequest.getBucketId(), writeRequest2.getBucketId());
        });
        this.writeRetryTimes = i;
    }

    public void addRequest(WriteRequest<BucketID> writeRequest) {
        this.requestQueue.add(writeRequest);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown() {
        synchronized (this) {
            if (this.alive) {
                this.alive = false;
                interrupt();
            }
            try {
                join(1000L);
            } catch (InterruptedException e) {
            }
            IOException iOException = new IOException("WriterThread has been closed.");
            while (!this.requestQueue.isEmpty()) {
                WriteRequest<BucketID> poll = this.requestQueue.poll();
                if (poll != null) {
                    try {
                        poll.requestDone(iOException);
                    } catch (Throwable th) {
                        LOG.error("The handler of the request complete callback threw an exception" + (th.getMessage() == null ? "." : ": " + th.getMessage()), th);
                    }
                }
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.alive) {
            WriteRequest<BucketID> writeRequest = null;
            while (this.alive && writeRequest == null) {
                try {
                    writeRequest = this.requestQueue.take();
                } catch (InterruptedException e) {
                    if (!this.alive) {
                        return;
                    } else {
                        LOG.warn(Thread.currentThread() + " was interrupted without shutdown.");
                    }
                }
            }
            if (writeRequest.isReady()) {
                int i = 0;
                long j = 1000;
                while (true) {
                    long j2 = j;
                    IOException iOException = null;
                    try {
                        writeRequest.write();
                    } catch (IOException e2) {
                        iOException = e2;
                    } catch (Throwable th) {
                        iOException = new IOException("The buffer could not be written: " + th.getMessage(), th);
                    }
                    if (iOException != null && iOException.getMessage() != null && !iOException.getMessage().contains("WriterThread has been closed") && !iOException.getMessage().contains("writer has been finished or aborted")) {
                        i++;
                        LOG.error("I/O writing thread encountered an error" + (iOException.getMessage() == null ? "." : ": " + iOException.getMessage()) + ", retry times {}", Integer.valueOf(i));
                        writeRequest.reset();
                        if (i <= this.writeRetryTimes) {
                            try {
                                Thread.sleep(j2);
                                j = j2 * 2;
                            } catch (InterruptedException e3) {
                            }
                        }
                    }
                    try {
                        writeRequest.requestDone(iOException);
                        break;
                    } catch (Throwable th2) {
                        LOG.error("The handler of the request-complete-callback threw an exception" + (th2.getMessage() == null ? "." : ": " + th2.getMessage()), th2);
                    }
                }
            } else {
                addRequest(writeRequest);
            }
        }
    }
}
