package org.apache.flink.odps.sink.insert;

import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.metrics.MetricNames;
import com.aliyun.odps.table.write.BatchWriter;
import com.aliyun.odps.table.write.TableBatchWriteSession;
import com.aliyun.odps.table.write.WriterAttemptId;
import com.aliyun.odps.table.write.WriterCommitMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.odps.sink.common.OdpsWriteFunction;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.common.WriteOperationType;
import org.apache.flink.odps.sink.event.SinkTaskEvent;
import org.apache.flink.odps.sink.event.TaskAckEvent;
import org.apache.flink.odps.sink.partition.PartitionAssigner;
import org.apache.flink.odps.sink.table.TunnelWriterOptions;
import org.apache.flink.odps.sink.utils.DataBucket;
import org.apache.flink.odps.sink.utils.DataItem;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.ArrowUtils;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.odps.util.RowDataToOdpsConverters;
import org.apache.flink.odps.vectorized.ArrowWriter;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import p000flinkconnectorodps.org.apache.arrow.vector.VectorSchemaRoot;

/* loaded from: input_file:org/apache/flink/odps/sink/insert/InsertFunction.class */
public class InsertFunction extends OdpsWriteFunction<RowData> implements ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(InsertFunction.class);
    protected final boolean isGroupPartition;
    protected final boolean supportsGrouping;
    protected transient RowDataToOdpsConverters.RowDataToOdpsRecordConverter converter;
    protected transient Map<String, DataBucket> buckets;
    protected transient Map<String, TableBatchWriteSession> odpsSessionMap;
    protected transient boolean useArrowWriter;
    protected transient long bufferSize;
    protected transient long batchSize;
    protected transient long writerCheckInterval;
    protected transient Map<String, BatchWriter<ArrayRecord>> odpsRecordWriterMap;
    protected transient Map<String, ArrowWriter<RowData>> arrowWriterMap;
    protected transient Map<String, BatchWriter<VectorSchemaRoot>> odpsArrowWriterMap;
    protected transient Map<String, Long> odpsArrowWriterUpdateTime;
    protected transient Map<String, Long> odpsArrowWriterBlockNumber;
    protected transient ConcurrentHashMap<String, String> sessionRequest;
    protected transient List<WriterStatus> statuses;
    protected volatile transient boolean commitConfirming;

    public InsertFunction(Configuration configuration, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions, PartitionAssigner<RowData> partitionAssigner, WriteOperationType writeOperationType, RowType rowType) {
        super(configuration, odpsConf, tableIdentifier, str, dataSchema, z, odpsWriteOptions, partitionAssigner, writeOperationType, rowType);
        this.isGroupPartition = z2;
        this.supportsGrouping = z3;
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteFunction
    public void open(Configuration configuration) throws IOException {
        super.open(configuration);
        this.converter = RowDataToOdpsConverters.createRecordConverter(this.rowType);
        this.buckets = new HashMap();
        this.odpsSessionMap = new HashMap();
        this.odpsRecordWriterMap = new HashMap();
        this.arrowWriterMap = new HashMap();
        this.odpsArrowWriterMap = new HashMap();
        this.odpsArrowWriterUpdateTime = new HashMap();
        this.odpsArrowWriterBlockNumber = new HashMap();
        this.sessionRequest = new ConcurrentHashMap<>();
        this.statuses = new ArrayList();
        this.commitConfirming = false;
        this.batchSize = this.config.getLong(OdpsOptions.INSERT_ARROW_WRITE_BATCH_SIZE);
        this.useArrowWriter = this.config.getBoolean(OdpsOptions.INSERT_ARROW_WRITER_ENABLE);
        this.bufferSize = ((MemorySize) this.config.get(OdpsOptions.INSERT_WRITER_BUFFER_SIZE)).getBytes();
        this.writerCheckInterval = this.config.getLong(OdpsOptions.INSERT_ARROW_WRITER_FLUSH_INTERVAL);
        if (this.useArrowWriter) {
            this.processingTimeService.registerTimer(this.processingTimeService.getCurrentProcessingTime() + this.writerCheckInterval, this);
        }
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteFunction
    protected void snapshotState(long j) throws IOException {
        LOG.info("Snapshot state for checkpoint: " + j);
        flushRemaining(false, j);
        while (this.commitConfirming) {
            try {
                this.executor.yield();
            } catch (InterruptedException e) {
                LOG.error("Snapshot state error: ", e);
                throw new IOException(e);
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteFunction
    public void endInput() throws Exception {
        super.endInput();
        flushRemaining(true, Long.MAX_VALUE);
        while (this.commitConfirming) {
            try {
                this.executor.yield();
            } catch (InterruptedException e) {
                LOG.error("End input error: ", e);
                throw new IOException(e);
            }
        }
    }

    @Override // org.apache.flink.odps.sink.common.OdpsWriteFunction
    public void notifyCheckpointAborted(long j) throws Exception {
        super.notifyCheckpointAborted(j);
        reset();
        throw new IOException("Task failed for checkpoint " + j + " aborted");
    }

    protected void flushRemaining(boolean z, long j) throws IOException {
        while (hasPendingRequest()) {
            try {
                this.executor.yield();
            } catch (InterruptedException e) {
                LOG.error("End input error: ", e);
                throw new IOException(e);
            }
        }
        if (hasBufferedData()) {
            for (Map.Entry<String, DataBucket> entry : this.buckets.entrySet()) {
                flushBucketData(entry.getKey(), entry.getValue());
            }
        }
        commitWriter();
        if (!this.isDynamicPartition && this.statuses.isEmpty() && this.sessionRequest.containsKey(this.partition)) {
            WriterStatus writerStatus = new WriterStatus();
            writerStatus.setSessionId(this.sessionRequest.get(this.partition));
            writerStatus.setPartitionSpec(this.partition);
            writerStatus.setWriterMessage(null);
            writerStatus.setTotalRecords(0L);
            this.statuses.add(writerStatus);
        }
        sendCommitEvent(z, j);
        reset();
        this.commitConfirming = true;
    }

    protected void validDateRowData(RowData rowData) throws Exception {
        if (!rowData.getRowKind().equals(RowKind.INSERT)) {
            throw new UnsupportedOperationException();
        }
    }

    public void processElement(RowData rowData, ProcessFunction<RowData, Object>.Context context, Collector<Object> collector) throws Exception {
        validDateRowData(rowData);
        String partitionSpec = !this.isDynamicPartition ? this.partition : this.partitionAssigner.getPartitionSpec(rowData, null);
        if (!this.sessionRequest.containsKey(partitionSpec)) {
            if (this.isGroupPartition) {
                bootstrapForGroupPartition(partitionSpec);
            } else {
                sendBootstrapEvent(partitionSpec);
            }
        }
        if (!this.sessionRequest.get(partitionSpec).isEmpty()) {
            if (this.buckets.containsKey(partitionSpec)) {
                flushBucketData(partitionSpec, this.buckets.get(partitionSpec));
            }
            writeData(partitionSpec, rowData);
        } else {
            String str = partitionSpec;
            DataBucket computeIfAbsent = this.buckets.computeIfAbsent(str, str2 -> {
                return new DataBucket(str);
            });
            computeIfAbsent.add(new DataItem(partitionSpec, rowData));
            if (computeIfAbsent.getRecords().size() > 10000) {
                Thread.sleep(1000L);
            }
        }
    }

    protected void bootstrapForGroupPartition(String str) throws IOException {
        OdpsUtils.createPartitionIfNeeded(getTableMetaProvider(), this.tableIdentifier, str);
        TableBatchWriteSession orCreateWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, null, this.settings);
        this.odpsSessionMap.put(str, orCreateWriteSession);
        this.sessionRequest.put(str, orCreateWriteSession.getId());
    }

    protected void writeData(String str, RowData rowData) throws IOException {
        if (this.useArrowWriter) {
            if (!this.odpsArrowWriterMap.containsKey(str)) {
                initArrowWriter(str, this.sessionRequest.get(str));
            }
            doWriteArrowBatch(str, rowData);
        } else {
            if (!this.odpsRecordWriterMap.containsKey(str)) {
                initRecordWriter(str, this.sessionRequest.get(str));
            }
            ArrayRecord newElement = this.odpsRecordWriterMap.get(str).newElement();
            this.converter.convert(rowData, newElement);
            doWriteRecord(str, newElement);
        }
    }

    public void close() {
        LOG.info("Close function");
        reset();
    }

    public void onProcessingTime(long j) throws Exception {
        long currentProcessingTime = this.processingTimeService.getCurrentProcessingTime();
        ArrayList<String> arrayList = new ArrayList();
        for (Map.Entry<String, Long> entry : this.odpsArrowWriterUpdateTime.entrySet()) {
            if (j - entry.getValue().longValue() > this.writerCheckInterval) {
                arrayList.add(entry.getKey());
            }
        }
        for (String str : arrayList) {
            LOG.info("Partition {} flush arrow writer {}", str, this.odpsArrowWriterBlockNumber.get(str));
            ArrowWriter<RowData> arrowWriter = this.arrowWriterMap.get(str);
            if (arrowWriter.getCachedCount() > 0) {
                flushArrowWriter(str, arrowWriter);
            } else {
                flushArrowWriter(str, arrowWriter);
                arrowWriter.close();
                closeOdpsWriter(str, this.odpsArrowWriterMap.get(str));
                this.odpsArrowWriterMap.remove(str);
                this.odpsArrowWriterUpdateTime.remove(str);
                this.arrowWriterMap.remove(str);
            }
        }
        this.processingTimeService.registerTimer(currentProcessingTime + this.writerCheckInterval, this);
    }

    @Override // org.apache.flink.odps.sink.common.AbstractWriteFunction
    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        Preconditions.checkArgument(operatorEvent instanceof TaskAckEvent, "The write function can only handle TaskAckEvent");
        TaskAckEvent taskAckEvent = (TaskAckEvent) operatorEvent;
        LOG.info("Handle operator event for task[{}], {}", Integer.valueOf(this.taskID), taskAckEvent);
        if (taskAckEvent.isCommitted()) {
            this.commitConfirming = false;
        }
        if (taskAckEvent.getSessionId().isEmpty()) {
            return;
        }
        this.sessionRequest.put(taskAckEvent.getPartition(), taskAckEvent.getSessionId());
    }

    protected void sendBootstrapEvent(String str) {
        LOG.info("Send bootstrap event to coordinator, task[{}], partition: {}.", Integer.valueOf(this.taskID), str);
        this.eventGateway.sendEventToCoordinator(SinkTaskEvent.builder().taskID(this.taskID).bootstrap(true).requiredPartition(str).build());
        this.sessionRequest.put(str, "");
    }

    protected void sendCommitEvent(boolean z, long j) {
        LOG.info("Send commit event to coordinator, task[{}], checkpointId[{}].", Integer.valueOf(this.taskID), Long.valueOf(j));
        this.eventGateway.sendEventToCoordinator(SinkTaskEvent.builder().taskID(this.taskID).checkpointID(j).bootstrap(false).endInput(z).writeStatus(this.statuses).writeSuccess(true).build());
    }

    private void doWriteRecord(String str, ArrayRecord arrayRecord) throws IOException {
        this.odpsRecordWriterMap.get(str).write(arrayRecord);
    }

    private void doWriteArrowBatch(String str, RowData rowData) throws IOException {
        ArrowWriter<RowData> arrowWriter = this.arrowWriterMap.get(str);
        arrowWriter.write(rowData);
        if (arrowWriter.getCachedCount() > this.batchSize) {
            flushArrowWriter(str, arrowWriter);
        }
    }

    private BatchWriter<ArrayRecord> initRecordWriter(String str, String str2) throws IOException {
        TableBatchWriteSession orCreateWriteSession;
        if (this.odpsSessionMap.containsKey(str)) {
            orCreateWriteSession = this.odpsSessionMap.get(str);
        } else {
            orCreateWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, str2, this.settings);
            this.odpsSessionMap.put(str, orCreateWriteSession);
        }
        TunnelWriterOptions.Builder withBufferSize = TunnelWriterOptions.newTunnelWriterOptionsBuilder().withSettings(this.settings).withBufferWriter(true).withBufferSize(this.bufferSize);
        if (this.isGroupPartition) {
            withBufferSize.withNumberOfParallel(1);
        } else {
            withBufferSize.withNumberOfParallel(this.taskParallel);
        }
        LOG.info("Init buffer writer for partition {}, session id {}, writer id {}", new Object[]{str, str2, Integer.valueOf(this.taskID)});
        BatchWriter<ArrayRecord> createRecordWriter = orCreateWriteSession.createRecordWriter(this.taskID, WriterAttemptId.of(0), withBufferSize.build());
        this.odpsRecordWriterMap.put(str, createRecordWriter);
        return createRecordWriter;
    }

    private BatchWriter<VectorSchemaRoot> initArrowWriter(String str, String str2) throws IOException {
        TableBatchWriteSession orCreateWriteSession;
        if (this.odpsSessionMap.containsKey(str)) {
            orCreateWriteSession = this.odpsSessionMap.get(str);
        } else {
            orCreateWriteSession = OdpsUtils.getOrCreateWriteSession(this.tableIdentifier, str, str2, this.settings);
            this.odpsSessionMap.put(str, orCreateWriteSession);
        }
        long j = this.taskID;
        if (this.odpsArrowWriterBlockNumber.containsKey(str)) {
            j = this.odpsArrowWriterBlockNumber.get(str).longValue() + this.taskParallel;
        }
        BatchWriter<VectorSchemaRoot> createArrowWriter = orCreateWriteSession.createArrowWriter(j, WriterAttemptId.of(0), WriterOptions.newBuilder().withSettings(this.settings).build());
        this.odpsArrowWriterMap.put(str, createArrowWriter);
        this.arrowWriterMap.put(str, ArrowUtils.createRowDataArrowWriter(createArrowWriter.newElement(), this.rowType));
        this.odpsArrowWriterUpdateTime.put(str, Long.valueOf(System.currentTimeMillis()));
        this.odpsArrowWriterBlockNumber.put(str, Long.valueOf(j));
        LOG.info("Init arrow writer for partition {}, session id {}, writer id {}", new Object[]{str, str2, Long.valueOf(j)});
        return createArrowWriter;
    }

    protected void flushBucketData(String str, DataBucket dataBucket) throws IOException {
        if (dataBucket.isEmpty()) {
            return;
        }
        if (this.useArrowWriter) {
            if (!this.odpsArrowWriterMap.containsKey(str)) {
                initArrowWriter(str, this.sessionRequest.get(str));
            }
            Iterator<DataItem> it = dataBucket.getRecords().iterator();
            while (it.hasNext()) {
                doWriteArrowBatch(str, it.next().getRowData());
            }
        } else {
            if (!this.odpsRecordWriterMap.containsKey(str)) {
                initRecordWriter(str, this.sessionRequest.get(str));
            }
            ArrayRecord newElement = this.odpsRecordWriterMap.get(str).newElement();
            Iterator<DataItem> it2 = dataBucket.getRecords().iterator();
            while (it2.hasNext()) {
                this.converter.convert(it2.next().getRowData(), newElement);
                doWriteRecord(str, newElement);
                newElement.clear();
            }
        }
        dataBucket.reset();
    }

    protected void commitWriter() throws IOException {
        if (!this.useArrowWriter) {
            for (Map.Entry<String, BatchWriter<ArrayRecord>> entry : this.odpsRecordWriterMap.entrySet()) {
                closeOdpsWriter(entry.getKey(), entry.getValue());
            }
            return;
        }
        for (Map.Entry<String, ArrowWriter<RowData>> entry2 : this.arrowWriterMap.entrySet()) {
            ArrowWriter<RowData> value = entry2.getValue();
            flushArrowWriter(entry2.getKey(), value);
            value.close();
        }
        for (Map.Entry<String, BatchWriter<VectorSchemaRoot>> entry3 : this.odpsArrowWriterMap.entrySet()) {
            closeOdpsWriter(entry3.getKey(), entry3.getValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reset() {
        this.buckets.clear();
        this.statuses.clear();
        this.sessionRequest.clear();
        this.odpsSessionMap.clear();
        this.odpsRecordWriterMap.forEach((str, batchWriter) -> {
            try {
                batchWriter.close();
            } catch (IOException e) {
                LOG.error("Close record writer failed: ", e);
            }
        });
        this.odpsRecordWriterMap.clear();
        this.odpsArrowWriterMap.forEach((str2, batchWriter2) -> {
            try {
                batchWriter2.close();
            } catch (IOException e) {
                LOG.error("Close record writer failed: ", e);
            }
        });
        this.odpsArrowWriterMap.clear();
        this.arrowWriterMap.forEach((str3, arrowWriter) -> {
            arrowWriter.close();
        });
        this.arrowWriterMap.clear();
        this.odpsArrowWriterUpdateTime.clear();
        this.odpsArrowWriterBlockNumber.clear();
    }

    private boolean hasBufferedData() {
        return this.buckets.size() > 0 && this.buckets.values().stream().anyMatch(dataBucket -> {
            return !dataBucket.isEmpty();
        });
    }

    private boolean hasPendingRequest() {
        return this.sessionRequest.size() > 0 && this.sessionRequest.values().stream().anyMatch((v0) -> {
            return v0.isEmpty();
        });
    }

    private void flushArrowWriter(String str, ArrowWriter<RowData> arrowWriter) throws IOException {
        arrowWriter.finish();
        this.odpsArrowWriterMap.get(str).write(arrowWriter.getRecordBatch());
        arrowWriter.reset();
        this.odpsArrowWriterUpdateTime.put(str, Long.valueOf(System.currentTimeMillis()));
    }

    private void closeOdpsWriter(String str, BatchWriter batchWriter) throws IOException {
        LOG.info("Close Odps writer for partition {}", str);
        String id = this.odpsSessionMap.get(str).getId();
        batchWriter.close();
        WriterStatus writerStatus = new WriterStatus();
        WriterCommitMessage commit = batchWriter.commit();
        writerStatus.setSessionId(id);
        writerStatus.setPartitionSpec(str);
        writerStatus.setWriterMessage(commit);
        writerStatus.setTotalRecords(batchWriter.currentMetricsValues().counter(MetricNames.RECORD_COUNT).get().getCount());
        this.statuses.add(writerStatus);
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
        processElement((RowData) obj, (ProcessFunction<RowData, Object>.Context) context, (Collector<Object>) collector);
    }
}
