package org.apache.flink.odps.sink.table;

import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.commons.util.RetryExceedLimitException;
import com.aliyun.odps.commons.util.RetryStrategy;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.table.DataFormat;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.SessionType;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.DynamicPartitionOptions;
import com.aliyun.odps.table.configuration.RestOptions;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.utils.Preconditions;
import com.aliyun.odps.table.write.TableWriteCapabilities;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelConstants;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.impl.ConfigurationImpl;
import com.aliyun.odps.tunnel.impl.UpsertSessionImpl;
import java.io.IOException;
import java.util.Random;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/table/TableUpsertSessionImpl.class */
public class TableUpsertSessionImpl extends TableUpsertSessionBase {
    private static final Logger LOG = LoggerFactory.getLogger(TableUpsertSessionImpl.class);
    protected transient TableTunnel.UpsertSession session;
    protected transient RetryStrategy commitRetry;
    protected boolean isCommitted;
    protected boolean isClosed;

    public TableUpsertSessionImpl(TableIdentifier tableIdentifier, PartitionSpec partitionSpec, boolean z, DynamicPartitionOptions dynamicPartitionOptions, ArrowOptions arrowOptions, TableWriteCapabilities tableWriteCapabilities, EnvironmentSettings environmentSettings) throws IOException {
        super(tableIdentifier, partitionSpec, z, dynamicPartitionOptions, arrowOptions, tableWriteCapabilities, environmentSettings);
        this.isCommitted = false;
        this.isClosed = false;
        this.commitRetry = new RetryStrategy(3, new Random().nextInt(10) + 10, RetryStrategy.BackoffStrategy.LINEAR_BACKOFF);
    }

    public TableUpsertSessionImpl(TableIdentifier tableIdentifier, PartitionSpec partitionSpec, String str, EnvironmentSettings environmentSettings) throws IOException {
        super(tableIdentifier, partitionSpec, str, environmentSettings);
        this.isCommitted = false;
        this.isClosed = false;
        this.commitRetry = new RetryStrategy(3, new Random().nextInt(10) + 10, RetryStrategy.BackoffStrategy.LINEAR_BACKOFF);
    }

    @Override // org.apache.flink.odps.sink.table.TableUpsertSessionBase
    protected void initSession() throws IOException {
        try {
            TableTunnel tableTunnel = TableUtils.getTableTunnel(this.settings);
            this.session = ((UpsertSessionImpl.Builder) tableTunnel.buildUpsertSession(this.identifier.getProject(), this.identifier.getTable())).setConfig((ConfigurationImpl) tableTunnel.getConfig()).setSchemaName(this.identifier.getSchema()).setPartitionSpec(this.targetPartitionSpec).setSlotNum(this.settings.getSlotNum().orElse(8L).longValue()).setCommitTimeout(this.settings.getRestOptions().orElse(RestOptions.newBuilder().build()).getAsyncTimeoutInSeconds().orElse(600).intValue() * 1000).setNetworkThreadNum(1).build();
            this.sessionId = this.session.getId();
            this.sessionStatus = convertSessionStatus(this.session.getStatus());
        } catch (TunnelException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.flink.odps.sink.table.TableUpsertSessionBase
    protected void reloadSession() throws IOException {
        try {
            TableTunnel tableTunnel = TableUtils.getTableTunnel(this.settings);
            this.session = ((UpsertSessionImpl.Builder) tableTunnel.buildUpsertSession(this.identifier.getProject(), this.identifier.getTable())).setConfig((ConfigurationImpl) tableTunnel.getConfig()).setSchemaName(this.identifier.getSchema()).setPartitionSpec(this.targetPartitionSpec).setUpsertId(getId()).setCommitTimeout(this.settings.getRestOptions().orElse(RestOptions.newBuilder().build()).getAsyncTimeoutInSeconds().orElse(600).intValue() * 1000).setNetworkThreadNum(1).build();
            this.sessionStatus = convertSessionStatus(this.session.getStatus());
        } catch (TunnelException e) {
            throw new IOException(e);
        }
    }

    public UpsertWriter<ArrayRecord> createUpsertWriter(TableUpsertWriterOptions tableUpsertWriterOptions) throws IOException {
        Preconditions.checkNotNull(this.session, "Upsert session", "required");
        return new TableUpsertWriterImpl(this.session, tableUpsertWriterOptions);
    }

    public void commit() throws IOException {
        Preconditions.checkNotNull(this.session, "Upsert session", "required");
        if (this.isCommitted) {
            return;
        }
        this.commitRetry.reset();
        while (true) {
            try {
                this.session.commit(false);
                this.sessionStatus = convertSessionStatus(this.session.getStatus());
                this.isCommitted = true;
                return;
            } catch (Exception e) {
                LOG.error(String.format("Commit error, session id = %s, retry times = %d", this.session.getId(), Integer.valueOf(this.commitRetry.getAttempts())), e);
                try {
                    if (ExceptionUtils.findThrowable(e, InterruptedException.class).isPresent()) {
                        throw e;
                    }
                    this.sessionStatus = convertSessionStatus(this.session.getStatus());
                    if (this.sessionStatus.equals(SessionStatus.CRITICAL) || this.sessionStatus.equals(SessionStatus.EXPIRED) || this.sessionStatus.equals(SessionStatus.UNKNOWN)) {
                        throw new TunnelException(String.format("Commit error, session id = %s, session status = %s", this.session.getId(), Integer.valueOf(this.commitRetry.getAttempts())), this.sessionStatus.toString());
                    }
                    this.commitRetry.onFailure(e);
                } catch (RetryExceedLimitException | TunnelException | InterruptedException e2) {
                    throw new IOException(e2);
                }
            }
        }
        throw new TunnelException(String.format("Commit error, session id = %s, session status = %s", this.session.getId(), Integer.valueOf(this.commitRetry.getAttempts())), this.sessionStatus.toString());
    }

    public void close() {
        if (this.isClosed) {
            return;
        }
        this.session.close();
        this.isClosed = true;
    }

    @Override // com.aliyun.odps.table.write.TableWriteSession
    public boolean supportsDataFormat(DataFormat dataFormat) {
        return dataFormat.getType().equals(DataFormat.Type.RECORD);
    }

    @Override // com.aliyun.odps.table.Session
    public SessionType getType() {
        return SessionType.UPSERT;
    }

    private SessionStatus convertSessionStatus(String str) {
        return str.equalsIgnoreCase(TunnelConstants.SESSION_STATUS_NORMAL) ? SessionStatus.NORMAL : str.equalsIgnoreCase(TunnelConstants.SESSION_STATUS_COMMITTING) ? SessionStatus.COMMITTING : str.equalsIgnoreCase(TunnelConstants.SESSION_STATUS_COMMITTED) ? SessionStatus.COMMITTED : str.equalsIgnoreCase(TunnelConstants.SESSION_STATUS_EXPIRED) ? SessionStatus.EXPIRED : str.equalsIgnoreCase(TunnelConstants.SESSION_STATUS_CRITICAL) ? SessionStatus.CRITICAL : SessionStatus.UNKNOWN;
    }
}
