package org.apache.flink.odps.sink.writer;

import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.metrics.MetricNames;
import com.aliyun.odps.table.write.BatchWriter;
import com.aliyun.odps.table.write.TableBatchWriteSession;
import com.aliyun.odps.table.write.WriterAttemptId;
import com.aliyun.odps.table.write.WriterCommitMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.odps.sink.table.TunnelCommitMessage;
import org.apache.flink.odps.sink.table.TunnelMultiBlockCommitMessage;
import org.apache.flink.odps.sink.table.TunnelWriterOptions;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.ArrowUtils;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.odps.vectorized.ArrowWriter;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import p000flinkconnectorodps.org.apache.arrow.vector.VectorSchemaRoot;

/* loaded from: input_file:org/apache/flink/odps/sink/writer/OdpsInsertWriter.class */
public class OdpsInsertWriter extends OdpsTableWriter {
    private static final Logger LOG = LoggerFactory.getLogger(OdpsInsertWriter.class);
    private TableBatchWriteSession batchWriteSession;
    private final String sessionId;
    private final boolean useArrowWriter;
    private final long batchSize;
    private final long bufferSize;
    private final int writerParallel;
    private final boolean isGroupPartition;
    private int currentWriterId;
    private ArrowWriter<RowData> currentArrowWriter;
    private BatchWriter<VectorSchemaRoot> currentOdpsArrowWriter;
    private BatchWriter<ArrayRecord> currentRecordWriter;
    private List<WriterCommitMessage> arrowCommitMessage;
    private WriterCommitMessage writerCommitMessage;
    private long totalRecord;
    private boolean finished;

    public OdpsInsertWriter(Configuration configuration, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str, DataSchema dataSchema, String str2, int i, int i2) throws IOException {
        super(configuration, odpsConf, tableIdentifier, str, dataSchema);
        this.isGroupPartition = ((Boolean) configuration.get(OdpsOptions.SINK_GROUP_PARTITION)).booleanValue();
        this.bufferSize = ((MemorySize) configuration.get(OdpsOptions.INSERT_WRITER_BUFFER_SIZE)).getBytes();
        this.batchSize = configuration.getLong(OdpsOptions.INSERT_ARROW_WRITE_BATCH_SIZE);
        this.useArrowWriter = configuration.getBoolean(OdpsOptions.INSERT_ARROW_WRITER_ENABLE);
        this.batchWriteSession = OdpsUtils.getOrCreateWriteSession(tableIdentifier, str, str2, this.settings);
        this.sessionId = this.batchWriteSession.getId();
        this.currentWriterId = i;
        this.writerParallel = i2;
        this.finished = false;
        this.arrowCommitMessage = new ArrayList();
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void addElement(RowData rowData) throws IOException {
        if (this.useArrowWriter) {
            doWriteArrowBatch(rowData);
        } else {
            doWriteRecord(rowData);
        }
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void flush() throws IOException {
        if (!this.useArrowWriter || this.currentOdpsArrowWriter == null) {
            return;
        }
        if (this.currentArrowWriter.getCachedCount() > 0) {
            this.currentArrowWriter.finish();
            this.currentOdpsArrowWriter.write(this.currentArrowWriter.getRecordBatch());
            this.currentArrowWriter.reset();
        }
        this.currentOdpsArrowWriter.close();
        this.arrowCommitMessage.add(this.currentOdpsArrowWriter.commit());
        this.totalRecord += this.currentOdpsArrowWriter.currentMetricsValues().counter(MetricNames.RECORD_COUNT).get().getCount();
        this.currentArrowWriter.close();
        this.currentOdpsArrowWriter = null;
        this.currentArrowWriter = null;
        this.currentWriterId += this.writerParallel;
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public CommitInfo finish(long j) throws IOException {
        if (this.finished) {
            throw new IOException("Write has been finished!");
        }
        flush();
        if (this.useArrowWriter) {
            ArrayList arrayList = new ArrayList();
            for (WriterCommitMessage writerCommitMessage : this.arrowCommitMessage) {
                if (writerCommitMessage instanceof TunnelCommitMessage) {
                    arrayList.add(Long.valueOf(((TunnelCommitMessage) writerCommitMessage).getBlockId()));
                }
            }
            this.writerCommitMessage = new TunnelMultiBlockCommitMessage(this.sessionId, arrayList);
        } else if (this.currentRecordWriter != null) {
            this.currentRecordWriter.close();
            this.writerCommitMessage = this.currentRecordWriter.commit();
            this.totalRecord += this.currentRecordWriter.currentMetricsValues().counter(MetricNames.RECORD_COUNT).get().getCount();
            this.currentRecordWriter = null;
        }
        PartitionCommitInfo partitionCommitInfo = new PartitionCommitInfo();
        partitionCommitInfo.setPartitionSpec(this.partition);
        partitionCommitInfo.setSessionId(this.sessionId);
        partitionCommitInfo.setTotalRecords(this.totalRecord);
        partitionCommitInfo.setWriterMessage(this.writerCommitMessage);
        partitionCommitInfo.setCheckpointId(j);
        this.finished = true;
        return partitionCommitInfo;
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void reset() {
    }

    @Override // org.apache.flink.odps.sink.writer.DataWriter
    public void abort() {
        if (this.finished) {
            return;
        }
        this.finished = true;
    }

    private void doWriteArrowBatch(RowData rowData) throws IOException {
        if (this.currentOdpsArrowWriter == null) {
            initArrowWriter(this.currentWriterId);
        }
        this.currentArrowWriter.write(rowData);
        if (this.currentArrowWriter.getCachedCount() >= this.batchSize) {
            this.currentArrowWriter.finish();
            this.currentOdpsArrowWriter.write(this.currentArrowWriter.getRecordBatch());
            this.currentArrowWriter.reset();
        }
    }

    private void doWriteRecord(RowData rowData) throws IOException {
        if (this.currentRecordWriter == null) {
            initRecordWriter(this.currentWriterId);
        }
        ArrayRecord newElement = this.currentRecordWriter.newElement();
        this.converter.convert(rowData, newElement);
        this.currentRecordWriter.write(newElement);
    }

    private BatchWriter<ArrayRecord> initRecordWriter(int i) throws IOException {
        TunnelWriterOptions.Builder withBufferSize = TunnelWriterOptions.newTunnelWriterOptionsBuilder().withSettings(this.settings).withBufferWriter(true).withBufferSize(this.bufferSize);
        if (this.isGroupPartition) {
            withBufferSize.withNumberOfParallel(1);
        } else {
            withBufferSize.withNumberOfParallel(this.writerParallel);
        }
        this.currentRecordWriter = this.batchWriteSession.createRecordWriter(i, WriterAttemptId.of(0), withBufferSize.build());
        return this.currentRecordWriter;
    }

    private BatchWriter<VectorSchemaRoot> initArrowWriter(int i) throws IOException {
        this.currentOdpsArrowWriter = this.batchWriteSession.createArrowWriter(i, WriterAttemptId.of(0), WriterOptions.newBuilder().withSettings(this.settings).build());
        this.currentArrowWriter = ArrowUtils.createRowDataArrowWriter(this.currentOdpsArrowWriter.newElement(), this.rowType);
        return this.currentOdpsArrowWriter;
    }
}
