package org.apache.flink.odps.sink.upsert;

import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.metrics.MetricNames;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.odps.FlinkOdpsException;
import org.apache.flink.odps.sink.common.OdpsWriteOptions;
import org.apache.flink.odps.sink.common.WriteOperationType;
import org.apache.flink.odps.sink.insert.InsertFunction;
import org.apache.flink.odps.sink.partition.PartitionAssigner;
import org.apache.flink.odps.sink.table.TableUpsertSessionImpl;
import org.apache.flink.odps.sink.table.TableUpsertWriterOptions;
import org.apache.flink.odps.sink.table.UpsertWriter;
import org.apache.flink.odps.sink.utils.DataBucket;
import org.apache.flink.odps.sink.utils.DataItem;
import org.apache.flink.odps.sink.utils.WriterStatus;
import org.apache.flink.odps.table.OdpsOptions;
import org.apache.flink.odps.util.OdpsConf;
import org.apache.flink.odps.util.OdpsUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/odps/sink/upsert/UpsertFunction.class */
public class UpsertFunction extends InsertFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(UpsertFunction.class);
    protected transient Map<String, TableUpsertSessionImpl> odpsUpsertSessionMap;
    protected transient Map<String, UpsertWriter<ArrayRecord>> odpsUpsertWriterMap;
    private transient TableUpsertWriterOptions writerOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.odps.sink.upsert.UpsertFunction$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/odps/sink/upsert/UpsertFunction$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public UpsertFunction(Configuration configuration, OdpsConf odpsConf, TableIdentifier tableIdentifier, String str, DataSchema dataSchema, boolean z, boolean z2, boolean z3, OdpsWriteOptions odpsWriteOptions, PartitionAssigner<RowData> partitionAssigner, WriteOperationType writeOperationType, RowType rowType) {
        super(configuration, odpsConf, tableIdentifier, str, dataSchema, z, z2, z3, odpsWriteOptions, partitionAssigner, writeOperationType, rowType);
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction, org.apache.flink.odps.sink.common.OdpsWriteFunction
    public void open(Configuration configuration) throws IOException {
        super.open(configuration);
        this.odpsUpsertWriterMap = new HashMap();
        this.odpsUpsertSessionMap = new HashMap();
        this.writerOptions = TableUpsertWriterOptions.builder().withMaxRetries(this.config.getInteger(OdpsOptions.UPSERT_WRITER_FLUSH_MAX_RETRIES)).withMaxBufferSize(((MemorySize) this.config.get(OdpsOptions.UPSERT_WRITER_MAX_BUFFER_SIZE)).getBytes()).withSlotBufferSize(((MemorySize) this.config.get(OdpsOptions.UPSERT_WRITER_BUCKET_BUFFER_SIZE)).getBytes()).build();
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    protected void bootstrapForGroupPartition(String str) throws IOException {
        OdpsUtils.createPartitionIfNeeded(getTableMetaProvider(), this.tableIdentifier, str);
        TableUpsertSessionImpl orCreateUpsertSession = OdpsUtils.getOrCreateUpsertSession(this.tableIdentifier, str, null, this.settings);
        this.odpsUpsertSessionMap.put(str, orCreateUpsertSession);
        this.sessionRequest.put(str, orCreateUpsertSession.getId());
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    protected void flushBucketData(String str, DataBucket dataBucket) throws IOException {
        if (dataBucket.isEmpty()) {
            return;
        }
        try {
            if (!this.odpsUpsertWriterMap.containsKey(str)) {
                initUpsertWriter(str, this.sessionRequest.get(str));
            }
            ArrayRecord newElement = this.odpsUpsertWriterMap.get(str).newElement();
            Iterator<DataItem> it = dataBucket.getRecords().iterator();
            while (it.hasNext()) {
                RowData rowData = it.next().getRowData();
                this.converter.convert(rowData, newElement);
                doWriteRecord(str, newElement, rowData.getRowKind());
                newElement.clear();
            }
            dataBucket.reset();
        } catch (IOException e) {
            throw new FlinkOdpsException("Flush data error: ", e);
        }
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    protected void commitWriter() throws IOException {
        for (Map.Entry<String, UpsertWriter<ArrayRecord>> entry : this.odpsUpsertWriterMap.entrySet()) {
            String key = entry.getKey();
            LOG.info("Close Odps writer for partition {}", key);
            UpsertWriter<ArrayRecord> value = entry.getValue();
            TableUpsertSessionImpl tableUpsertSessionImpl = this.odpsUpsertSessionMap.get(key);
            String id = tableUpsertSessionImpl.getId();
            try {
                value.close();
                tableUpsertSessionImpl.close();
                WriterStatus writerStatus = new WriterStatus();
                writerStatus.setSessionId(id);
                writerStatus.setPartitionSpec(key);
                writerStatus.setTotalRecords(value.currentMetricsValues().counter(MetricNames.RECORD_COUNT).get().getCount());
                this.statuses.add(writerStatus);
            } catch (Throwable th) {
                tableUpsertSessionImpl.close();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    public void reset() {
        super.reset();
        this.odpsUpsertWriterMap.clear();
        this.odpsUpsertSessionMap.forEach((str, tableUpsertSessionImpl) -> {
            tableUpsertSessionImpl.close();
        });
        this.odpsUpsertSessionMap.clear();
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    protected void validDateRowData(RowData rowData) throws Exception {
    }

    @Override // org.apache.flink.odps.sink.insert.InsertFunction
    protected void writeData(String str, RowData rowData) throws IOException {
        if (!this.odpsUpsertWriterMap.containsKey(str)) {
            initUpsertWriter(str, this.sessionRequest.get(str));
        }
        ArrayRecord newElement = this.odpsUpsertWriterMap.get(str).newElement();
        this.converter.convert(rowData, newElement);
        doWriteRecord(str, newElement, rowData.getRowKind());
    }

    private void doWriteRecord(String str, ArrayRecord arrayRecord, RowKind rowKind) throws IOException {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[rowKind.ordinal()]) {
            case 1:
            case 2:
                this.odpsUpsertWriterMap.get(str).upsert(arrayRecord);
                return;
            case 3:
            case 4:
                this.odpsUpsertWriterMap.get(str).delete(arrayRecord);
                return;
            default:
                throw new UnsupportedOperationException();
        }
    }

    private UpsertWriter<ArrayRecord> initUpsertWriter(String str, String str2) throws IOException {
        TableUpsertSessionImpl orCreateUpsertSession;
        if (this.odpsUpsertSessionMap.containsKey(str)) {
            orCreateUpsertSession = this.odpsUpsertSessionMap.get(str);
        } else {
            orCreateUpsertSession = OdpsUtils.getOrCreateUpsertSession(this.tableIdentifier, str, str2, this.settings);
            this.odpsUpsertSessionMap.put(str, orCreateUpsertSession);
        }
        UpsertWriter<ArrayRecord> createUpsertWriter = orCreateUpsertSession.createUpsertWriter(this.writerOptions);
        this.odpsUpsertWriterMap.put(str, createUpsertWriter);
        return createUpsertWriter;
    }
}
