package org.apache.flink.odps.sink.common;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.adapter.OperatorCoordinatorAdapter;
import org.apache.flink.odps.sink.event.SinkTaskEvent;
import org.apache.flink.odps.sink.event.TaskAckEvent;
import org.apache.flink.odps.sink.utils.NonThrownExecutor;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/common/AbstractWriteOperatorCoordinator.class */
public abstract class AbstractWriteOperatorCoordinator implements OperatorCoordinatorAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteOperatorCoordinator.class);
    protected final String operatorName;
    protected final Configuration conf;
    protected final OperatorCoordinator.Context context;
    protected transient OperatorCoordinator.SubtaskGateway[] gateways;
    protected transient SinkTaskEvent[] eventBuffer;
    protected final int parallelism;
    protected NonThrownExecutor executor;
    protected boolean started = false;
    protected volatile boolean allSubtaskReady;
    protected Object lock;

    public AbstractWriteOperatorCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context) {
        this.operatorName = str;
        this.conf = configuration;
        this.context = context;
        this.parallelism = context.currentParallelism();
    }

    public void start() throws Exception {
        LOG.info("Starting odps write operator {}.", this.operatorName);
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        this.eventBuffer = new SinkTaskEvent[this.parallelism];
        this.gateways = new OperatorCoordinator.SubtaskGateway[this.parallelism];
        this.executor = NonThrownExecutor.builder(LOG).exceptionHook((str, th) -> {
            this.context.failJob(new FlinkOdpsException(str, th));
        }).threadNum(1).waitForTasksFinish(true).build();
        initEnv();
        this.started = true;
        this.allSubtaskReady = false;
        this.lock = new Object();
    }

    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.close();
        }
        this.eventBuffer = null;
        LOG.info("Odps write coordinator {} closed.", this.operatorName);
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        this.executor.execute(() -> {
            try {
                completableFuture.complete(new byte[0]);
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint Session %s for source %s", Long.valueOf(j), getClass().getSimpleName()), th));
            }
        }, "taking checkpoint %d", Long.valueOf(j));
    }

    public void notifyCheckpointComplete(long j) {
        LOG.info("Notify the checkpoint {} complete", Long.valueOf(j));
    }

    public void resetToCheckpoint(long j, byte[] bArr) {
        LOG.info("Reset to the checkpoint {}", Long.valueOf(j));
    }

    public void notifyCheckpointAborted(long j) {
        LOG.info("Notify the checkpoint {} aborted", Long.valueOf(j));
    }

    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
        Preconditions.checkState(operatorEvent instanceof SinkTaskEvent, "The coordinator can only handle SinkTaskEvent");
        SinkTaskEvent sinkTaskEvent = (SinkTaskEvent) operatorEvent;
        if (sinkTaskEvent.isEndInput()) {
            this.executor.execute(() -> {
                handleEndInputEvent(sinkTaskEvent);
            }, "handle end input event for task [" + i + "]", new Object[0]);
            return;
        }
        if (sinkTaskEvent.isBootstrap()) {
            this.executor.execute(() -> {
                handleBootstrapEvent(sinkTaskEvent);
            }, "handle bootstrap event for task [" + i + "]", new Object[0]);
        } else if (sinkTaskEvent.isCheckpointSuccess()) {
            this.executor.execute(() -> {
                handleCheckpointSuccessEvent(sinkTaskEvent);
            }, "handle checkpoint event for task [" + i + "]", new Object[0]);
        } else {
            this.executor.execute(() -> {
                handleCommitEvent(sinkTaskEvent);
            }, "handle commit event for task [" + i + "]", new Object[0]);
        }
    }

    public void subtaskFailed(int i, @Nullable Throwable th) {
        this.eventBuffer[i] = null;
        LOG.warn("Reset the event for task [" + i + "]", th);
    }

    public void subtaskReset(int i, long j) {
        LOG.warn("Subtask reset for task [" + i + "], checkpoint [" + j + "]");
    }

    public void subtaskReady(int i, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        this.gateways[i] = subtaskGateway;
        if (this.allSubtaskReady || !Arrays.stream(this.gateways).allMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            return;
        }
        this.allSubtaskReady = true;
        synchronized (this.lock) {
            this.lock.notifyAll();
        }
    }

    protected abstract void initEnv();

    protected abstract void handleBootstrapEvent(SinkTaskEvent sinkTaskEvent) throws IOException;

    protected abstract void handleCommitEvent(SinkTaskEvent sinkTaskEvent) throws IOException;

    protected abstract void handleEndInputEvent(SinkTaskEvent sinkTaskEvent) throws IOException;

    protected void handleCheckpointSuccessEvent(SinkTaskEvent sinkTaskEvent) throws IOException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAllTaskAckEvents(long j, String str, String str2, boolean z, boolean z2) {
        TaskAckEvent build = TaskAckEvent.builder().checkpointId(j).committed(z).endInput(z2).partition(str).sessionId(str2).build();
        CompletableFuture.allOf((CompletableFuture[]) Arrays.stream(this.gateways).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(subtaskGateway -> {
            return subtaskGateway.sendEvent(build);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r6, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new FlinkOdpsException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAllTaskAckEvents(long j, String str, String str2, boolean z, boolean z2, boolean z3, boolean z4) {
        TaskAckEvent build = TaskAckEvent.builder().checkpointId(j).bootstrap(z3).committed(z).endInput(z2).partition(str).checkpoint(z4).sessionId(str2).build();
        CompletableFuture.allOf((CompletableFuture[]) Arrays.stream(this.gateways).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(subtaskGateway -> {
            return subtaskGateway.sendEvent(build);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r6, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new FlinkOdpsException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    protected void sendTasksAckEvents(long j, Set<Integer> set, String str, String str2, boolean z) {
        TaskAckEvent build = TaskAckEvent.builder().checkpointId(j).committed(z).partition(str).sessionId(str2).build();
        ArrayList arrayList = new ArrayList(set.size());
        for (Integer num : set) {
            if (this.gateways[num.intValue()] == null) {
                throw new FlinkOdpsException("Gateway is null: " + num);
            }
            arrayList.add(this.gateways[num.intValue()]);
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map(subtaskGateway -> {
            return subtaskGateway.sendEvent(build);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r6, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new FlinkOdpsException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendSingleTaskAckEvents(long j, int i, String str, String str2, boolean z) {
        TaskAckEvent build = TaskAckEvent.builder().checkpointId(j).committed(z).partition(str).sessionId(str2).build();
        if (this.gateways[i] == null) {
            throw new FlinkOdpsException("Gateway is null: " + i);
        }
        this.gateways[i].sendEvent(build).whenComplete((acknowledge, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new FlinkOdpsException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    protected void sendSingleTaskAckEvents(long j, int i, String str, String str2, boolean z, boolean z2) {
        TaskAckEvent build = TaskAckEvent.builder().checkpointId(j).committed(z).bootstrap(z2).partition(str).sessionId(str2).build();
        if (this.gateways[i] == null) {
            throw new FlinkOdpsException("Gateway is null: " + i);
        }
        this.gateways[i].sendEvent(build).whenComplete((acknowledge, th) -> {
            if (!sendToFinishedTasks(th)) {
                throw new FlinkOdpsException("Error while waiting for the commit ack events to finish sending", th);
            }
        });
    }

    private boolean sendToFinishedTasks(Throwable th) {
        return (th.getCause() instanceof TaskNotRunningException) || th.getCause().getMessage().contains("running");
    }

    private boolean allEventsReceived() {
        return Arrays.stream(this.eventBuffer).allMatch(sinkTaskEvent -> {
            return sinkTaskEvent != null;
        });
    }
}
