package org.apache.flink.odps.sink.common;

import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.event.SinkTaskEvent;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsMetaDataProvider;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/common/OdpsWriteCoordinator.class */
public abstract class OdpsWriteCoordinator extends AbstractWriteOperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(OdpsWriteCoordinator.class);
    protected final OdpsConf odpsConf;
    protected final TableIdentifier tableIdentifier;
    protected final boolean isDynamicPartition;
    protected final OdpsWriteOptions writeOptions;
    protected final DataSchema dataSchema;
    protected final boolean isPartitioned;
    protected String staticPartition;
    protected transient Odps odps;
    protected transient EnvironmentSettings settings;
    protected transient OdpsMetaDataProvider tableMetaProvider;
    protected final int metaCacheSize;
    protected final int metaCacheExpireTime;

    public OdpsWriteCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str2, DataSchema dataSchema, boolean z, OdpsWriteOptions odpsWriteOptions) {
        super(str, configuration, context);
        if (odpsConf == null) {
            this.odpsConf = OdpsUtils.getOdpsConf();
        } else {
            this.odpsConf = odpsConf;
        }
        Preconditions.checkNotNull(this.odpsConf, "odps conf cannot be null");
        this.tableIdentifier = tableIdentifier;
        this.metaCacheExpireTime = ((Integer) configuration.get(OdpsOptions.SINK_META_EXPIRE_TIME)).intValue();
        this.metaCacheSize = ((Integer) configuration.get(OdpsOptions.SINK_META_CACHE_SIZE)).intValue();
        this.dataSchema = dataSchema;
        this.isDynamicPartition = z;
        this.writeOptions = odpsWriteOptions == null ? OdpsWriteOptions.builder().build() : odpsWriteOptions;
        this.isPartitioned = !dataSchema.getPartitionKeys().isEmpty();
        this.tableMetaProvider = getTableMetaProvider();
        if (!z) {
            try {
                checkPartition(str2);
            } catch (IOException e) {
                throw new FlinkOdpsException(e);
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleBootstrapEvent(SinkTaskEvent sinkTaskEvent) {
        if (!this.isDynamicPartition) {
            try {
                initStaticWriteSession(sinkTaskEvent.getTaskID().intValue());
            } catch (Exception e) {
                throw new FlinkOdpsException(e);
            }
        } else {
            if (StringUtils.isNullOrWhitespaceOnly(sinkTaskEvent.getPartitionSpec())) {
                throw new FlinkOdpsException("Invalid partition spec for dynamic partition!");
            }
            try {
                initDynamicWriteSession(sinkTaskEvent.getTaskID().intValue(), sinkTaskEvent.getPartitionSpec());
            } catch (Exception e2) {
                throw new FlinkOdpsException(e2);
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleCommitEvent(SinkTaskEvent sinkTaskEvent) {
        this.eventBuffer[sinkTaskEvent.getTaskID().intValue()] = sinkTaskEvent;
        if (Arrays.stream(this.eventBuffer).allMatch(sinkTaskEvent2 -> {
            return sinkTaskEvent2 != null && sinkTaskEvent2.isWriteSuccess();
        })) {
            try {
                commitWriteSession();
                this.executor.execute(() -> {
                    sendAllTaskAckEvents(sinkTaskEvent.getCheckpointID().longValue(), "", "", true, false);
                }, "Commit session response", new Object[0]);
                this.eventBuffer = new SinkTaskEvent[this.parallelism];
            } catch (Exception e) {
                throw new FlinkOdpsException(e);
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    protected void handleEndInputEvent(SinkTaskEvent sinkTaskEvent) {
        this.eventBuffer[sinkTaskEvent.getTaskID().intValue()] = sinkTaskEvent;
        if (Arrays.stream(this.eventBuffer).allMatch(sinkTaskEvent2 -> {
            return sinkTaskEvent2 != null && sinkTaskEvent2.isEndInput();
        })) {
            try {
                commitWriteSession();
                this.executor.execute(() -> {
                    sendAllTaskAckEvents(Long.MAX_VALUE, "", "", true, true);
                }, "End input session response", new Object[0]);
                this.eventBuffer = new SinkTaskEvent[this.parallelism];
            } catch (Exception e) {
                throw new FlinkOdpsException(e);
            }
        }
    }

    protected abstract void initStaticWriteSession(int i) throws IOException;

    protected abstract void initDynamicWriteSession(int i, String str) throws IOException;

    protected abstract boolean commitWriteSession() throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public Odps getOdps() {
        if (this.odps == null) {
            this.odps = OdpsUtils.getOdps(this.odpsConf);
        }
        return this.odps;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OdpsMetaDataProvider getTableMetaProvider() {
        if (this.tableMetaProvider == null) {
            this.tableMetaProvider = new OdpsMetaDataProvider(getOdps(), this.metaCacheSize, this.metaCacheExpireTime);
        }
        return this.tableMetaProvider;
    }

    private void checkPartition(String str) throws IOException {
        if (!this.isPartitioned) {
            if (!StringUtils.isNullOrWhitespaceOnly(str)) {
                throw new IOException("The partition spec should be null or whitespace with non partition odps table: " + this.tableIdentifier.toString());
            }
            this.staticPartition = "";
        } else {
            if (StringUtils.isNullOrWhitespaceOnly(str)) {
                LOG.error("The partition cannot be null or whitespace with partition table: " + this.tableIdentifier.toString());
                throw new IOException("Check partition failed.");
            }
            this.staticPartition = new PartitionSpec(str).toString();
            OdpsUtils.createPartitionIfNeeded(getTableMetaProvider(), this.tableIdentifier, this.staticPartition);
        }
    }
}
