package org.apache.flink.odps.sink.upsert;

import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.write.WriterCommitMessage;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator;
import org.apache.flink.odps.sink.table.TableUpsertSessionImpl;
import org.apache.flink.odps.sink.utils.AsyncExecuteThread;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/upsert/FileCachedUpsertCoordinator.class */
public class FileCachedUpsertCoordinator extends FileCachedInsertCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(FileCachedUpsertCoordinator.class);
    protected transient Map<String, Long> upsertCommitTimes;
    protected boolean enableMajorCompact;
    protected int compactThreshold;

    public FileCachedUpsertCoordinator(String str, Configuration configuration, OperatorCoordinator.Context context, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str2, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions) {
        super(str, configuration, context, odpsConf, tableIdentifier, str2, dataSchema, z, z2, z3, odpsWriteOptions);
        this.compactThreshold = 1;
        this.executeThreadNum = this.conf.getInteger(OdpsOptions.UPSERT_COMMIT_THREAD_NUM);
        this.enableMajorCompact = this.conf.getBoolean(OdpsOptions.UPSERT_MAJOR_COMPACT_ENABLE);
        this.compactThreshold = this.conf.getInteger(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator, org.apache.flink.odps.sink.common.AbstractWriteOperatorCoordinator
    public void initEnv() {
        super.initEnv();
        this.upsertCommitTimes = new HashMap();
    }

    @Override // org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator
    protected String initDynamicWriteSession(String str) throws IOException {
        TableUpsertSessionImpl tableUpsertSessionImpl = null;
        try {
            tableUpsertSessionImpl = OdpsUtils.getOrCreateUpsertSession(this.tableIdentifier, str, null, this.settings);
            String id = tableUpsertSessionImpl.getId();
            if (tableUpsertSessionImpl != null) {
                tableUpsertSessionImpl.close();
            }
            return id;
        } catch (Throwable th) {
            if (tableUpsertSessionImpl != null) {
                tableUpsertSessionImpl.close();
            }
            throw th;
        }
    }

    @Override // org.apache.flink.odps.sink.insert.FileCachedInsertCoordinator
    protected void commitWriteSession(List<WriterCommitMessage> list, String str, String str2, long j) throws IOException {
        TableUpsertSessionImpl tableUpsertSessionImpl = null;
        try {
            try {
                TableUpsertSessionImpl upsertSession = OdpsUtils.getUpsertSession(this.tableIdentifier, str, str2, this.settings);
                if (upsertSession.getStatus().equals(SessionStatus.COMMITTED)) {
                    LOG.info("Session has committed {}, partition {}", str2, str);
                    if (upsertSession != null) {
                        upsertSession.close();
                        return;
                    }
                    return;
                }
                LOG.info("Start to commit session {}, partition {}, row count {}", new Object[]{str2, str, Long.valueOf(j)});
                long nanoTime = System.nanoTime();
                upsertSession.commit();
                LOG.info("Commit session success, session id {}, time taken ms: {}", str2, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
                long longValue = this.upsertCommitTimes.getOrDefault(str, 0L).longValue() + 1;
                this.upsertCommitTimes.put(str, Long.valueOf(longValue));
                if (this.enableMajorCompact && longValue > this.compactThreshold) {
                    this.upsertCommitTimes.remove(str);
                    this.executor.execute(() -> {
                        handleCompactEvent(str);
                    }, "handle partition check event", new Object[0]);
                }
                if (upsertSession != null) {
                    upsertSession.close();
                }
            } catch (Exception e) {
                throw new IOException("Commit session " + str2 + " failed!", e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                tableUpsertSessionImpl.close();
            }
            throw th;
        }
    }

    private void handleCompactEvent(final String str) {
        addExecuteRequest(str, new AsyncExecuteThread.CompactRequest() { // from class: org.apache.flink.odps.sink.upsert.FileCachedUpsertCoordinator.1
            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void execute() throws IOException {
                try {
                    long nanoTime = System.nanoTime();
                    OdpsUtils.majorCompact(FileCachedUpsertCoordinator.this.odps, FileCachedUpsertCoordinator.this.tableIdentifier, str);
                    FileCachedUpsertCoordinator.LOG.info("Checkpoint compact table partition {}, time taken ms: {}", str, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
                } catch (Exception e) {
                    throw new IOException(e);
                }
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public void requestDone(IOException iOException) {
                if (iOException != null) {
                    FileCachedUpsertCoordinator.LOG.error("Checkpoint compact table partition {} failed", str, iOException);
                }
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.CompactRequest, org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public AsyncExecuteThread.ExecuteRequest.ExecuteType getExecuteType() {
                return AsyncExecuteThread.ExecuteRequest.ExecuteType.COMPACT;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.ExecuteRequest
            public long getCheckpointId() {
                return -1L;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getPartition() {
                return str;
            }

            @Override // org.apache.flink.odps.sink.utils.AsyncExecuteThread.SessionRequest
            public String getSessionId() {
                return "";
            }
        });
    }
}
