package org.apache.flink.odps.sink.writer;

import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.metrics.MetricNames;
import java.io.IOException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.odps.sink.table.TableUpsertSessionImpl;
import org.apache.flink.odps.sink.table.TableUpsertWriterOptions;
import org.apache.flink.odps.sink.table.UpsertWriter;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/writer/OdpsUpsertWriter.class */
public class OdpsUpsertWriter extends OdpsTableWriter {
    private static final Logger LOG = LoggerFactory.getLogger(OdpsUpsertWriter.class);
    private TableUpsertSessionImpl upsertSession;
    private UpsertWriter<ArrayRecord> upsertWriter;
    private long totalRecord;
    private TableUpsertWriterOptions writerOptions;
    private boolean finished;
    private boolean started;
    private final TableIdentifier identifier;
    private final String sessionId;

    public OdpsUpsertWriter(Configuration configuration, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str, DataSchema dataSchema, String str2) throws IOException {
        super(configuration, odpsConf, tableIdentifier, str, dataSchema);
        this.writerOptions = TableUpsertWriterOptions.builder().withMaxRetries(configuration.getInteger(OdpsOptions.UPSERT_WRITER_FLUSH_MAX_RETRIES)).withMaxBufferSize(((MemorySize) configuration.get(OdpsOptions.UPSERT_WRITER_MAX_BUFFER_SIZE)).getBytes()).withSlotBufferSize(((MemorySize) configuration.get(OdpsOptions.UPSERT_WRITER_BUCKET_BUFFER_SIZE)).getBytes()).build();
        this.identifier = tableIdentifier;
        this.sessionId = str2;
        this.finished = false;
        this.totalRecord = 0L;
    }

    private void ensureStarted() throws IOException {
        if (this.started) {
            return;
        }
        this.started = true;
        this.upsertSession = OdpsUtils.getOrCreateUpsertSession(this.identifier, this.partition, this.sessionId, this.settings);
        this.upsertWriter = this.upsertSession.createUpsertWriter(this.writerOptions);
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void addElement(RowData rowData) throws IOException {
        if (this.finished) {
            throw new IOException("Add element failed: upsert writer has been finished or aborted!");
        }
        ensureStarted();
        ArrayRecord newElement = this.upsertWriter.newElement();
        this.converter.convert(rowData, newElement);
        if (rowData.getRowKind().equals(RowKind.INSERT) || rowData.getRowKind().equals(RowKind.UPDATE_AFTER)) {
            this.upsertWriter.upsert(newElement);
        } else if (rowData.getRowKind().equals(RowKind.DELETE) || rowData.getRowKind().equals(RowKind.UPDATE_BEFORE)) {
            this.upsertWriter.delete(newElement);
        } else {
            LOG.error("Ignore element:" + rowData.getRowKind());
        }
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void flush() throws IOException {
        if (this.finished) {
            throw new IOException("Flush failed: upsert writer has been finished or aborted!");
        }
        ensureStarted();
        if (this.upsertWriter != null) {
            this.upsertWriter.flush();
        }
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public CommitInfo finish(long j) throws IOException {
        if (this.finished) {
            throw new IOException("Finish failed: upsert writer has been finished or aborted!");
        }
        try {
            if (this.upsertWriter != null) {
                this.upsertWriter.close();
                this.totalRecord += this.upsertWriter.currentMetricsValues().counter(MetricNames.RECORD_COUNT).get().getCount();
                this.upsertWriter = null;
            }
            PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo();
            partitionCommitInfo.setPartitionSpec(this.partition);
            partitionCommitInfo.setSessionId(this.sessionId);
            partitionCommitInfo.setTotalRecords(this.totalRecord);
            partitionCommitInfo.setCheckpointId(j);
            this.finished = true;
            return partitionCommitInfo;
        } finally {
            if (this.upsertSession != null) {
                this.upsertSession.close();
            }
        }
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void reset() {
        try {
            if (this.upsertWriter != null) {
                this.upsertWriter.reset();
            }
        } catch (IOException e) {
            LOG.error("Reset upsert writer error", e);
        }
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void abort() {
        try {
            if (this.finished) {
                return;
            }
            try {
                if (this.upsertWriter != null) {
                    this.upsertWriter.reset();
                    this.upsertWriter = null;
                }
                if (this.upsertSession != null) {
                    this.upsertSession.close();
                    this.upsertSession = null;
                }
            } catch (IOException e) {
                LOG.error("Abort upsert writer error", e);
                if (this.upsertSession != null) {
                    this.upsertSession.close();
                    this.upsertSession = null;
                }
            }
            this.finished = true;
        } catch (Throwable th) {
            if (this.upsertSession != null) {
                this.upsertSession.close();
                this.upsertSession = null;
            }
            throw th;
        }
    }
}
