package com.aliyun.odps.table.write.impl.batch;

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.arrow.ArrowWriter;
import com.aliyun.odps.table.arrow.ArrowWriterFactory;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.WriterOptions;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.metrics.Metrics;
import com.aliyun.odps.table.metrics.count.BytesCount;
import com.aliyun.odps.table.metrics.count.RecordCount;
import com.aliyun.odps.table.utils.ConfigConstants;
import com.aliyun.odps.table.utils.SchemaUtils;
import com.aliyun.odps.table.write.BatchWriter;
import com.aliyun.odps.table.write.WriterAttemptId;
import com.aliyun.odps.table.write.WriterCommitMessage;
import com.aliyun.odps.tunnel.TunnelConstants;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import javax.annotation.Nullable;
import p000flinkconnectorodps.io.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import p000flinkconnectorodps.org.apache.arrow.vector.VectorSchemaRoot;
import p000flinkconnectorodps.org.apache.arrow.vector.types.pojo.Schema;

/* loaded from: input_file:com/aliyun/odps/table/write/impl/batch/ArrowWriterImpl.class */
public class ArrowWriterImpl implements BatchWriter<VectorSchemaRoot> {
    private boolean isClosed = false;
    private final long blockNumber;
    private final WriterOptions writerOptions;
    private final Schema arrowSchema;
    private final String sessionId;
    private final TableIdentifier identifier;
    private final WriterAttemptId attemptId;
    private Connection connection;
    private ArrowWriter batchWriter;
    private WriterCommitMessage commitMessage;
    private Metrics metrics;
    private BytesCount bytesCount;
    private RecordCount recordCount;

    public ArrowWriterImpl(String str, TableIdentifier tableIdentifier, DataSchema dataSchema, long j, WriterAttemptId writerAttemptId, WriterOptions writerOptions, ArrowOptions arrowOptions) {
        this.sessionId = str;
        this.identifier = tableIdentifier;
        this.blockNumber = j;
        this.attemptId = writerAttemptId;
        this.writerOptions = writerOptions;
        this.arrowSchema = SchemaUtils.toArrowSchema(dataSchema.getColumns(), arrowOptions);
        initMetrics();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.aliyun.odps.table.write.BatchWriter
    public VectorSchemaRoot newElement() {
        return VectorSchemaRoot.create(this.arrowSchema, this.writerOptions.getBufferAllocator());
    }

    @Override // com.aliyun.odps.table.write.BatchWriter
    public void write(VectorSchemaRoot vectorSchemaRoot) throws IOException {
        if (this.isClosed) {
            throw new IOException("Arrow writer is closed");
        }
        if (this.batchWriter == null) {
            openWriterConnection(this.sessionId, this.identifier, this.blockNumber, this.attemptId);
            this.batchWriter = ArrowWriterFactory.getRecordBatchWriter(this.connection.getOutputStream(), this.writerOptions);
        }
        try {
            this.batchWriter.writeBatch(vectorSchemaRoot);
            this.recordCount.inc(vectorSchemaRoot.getRowCount());
            this.bytesCount.setValue(Long.valueOf(this.batchWriter.bytesWritten()));
        } catch (IOException e) {
            Response response = this.connection.getResponse();
            if (response == null || response.isOK()) {
                throw new IOException("ArrowHttpOutputStream Serialize Exception", e);
            }
            TunnelException tunnelException = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), Integer.valueOf(response.getStatus()));
            throw new IOException(tunnelException.getMessage(), tunnelException);
        }
    }

    @Override // com.aliyun.odps.table.write.BatchWriter
    public void abort() throws IOException {
        disconnect();
    }

    @Override // com.aliyun.odps.table.write.BatchWriter
    @Nullable
    public WriterCommitMessage commit() throws IOException {
        close();
        return this.commitMessage;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isClosed) {
            return;
        }
        try {
            if (this.batchWriter != null) {
                this.batchWriter.close();
                Response response = this.connection.getResponse();
                if (!response.isOK()) {
                    TunnelException tunnelException = new TunnelException(response.getHeader("x-odps-request-id"), this.connection.getInputStream(), Integer.valueOf(response.getStatus()));
                    throw new IOException(tunnelException.getMessage(), tunnelException);
                }
                this.commitMessage = new WriterCommitMessageImpl(this.blockNumber, loadResultFromJson(this.connection.getInputStream()));
            }
        } finally {
            disconnect();
            this.isClosed = true;
        }
    }

    @Override // com.aliyun.odps.table.write.BatchWriter
    public Metrics currentMetricsValues() {
        return this.metrics;
    }

    private void initMetrics() {
        this.bytesCount = new BytesCount();
        this.recordCount = new RecordCount();
        this.metrics = new Metrics();
        this.metrics.register(this.bytesCount);
        this.metrics.register(this.recordCount);
    }

    private void openWriterConnection(String str, TableIdentifier tableIdentifier, long j, WriterAttemptId writerAttemptId) throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("Transfer-Encoding", "chunked");
        hashMap.put("Content-Type", HttpPostBodyUtil.DEFAULT_BINARY_CONTENT_TYPE);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ConfigConstants.BLOCK_NUMBER, Long.toString(j));
        hashMap2.put(ConfigConstants.ATTEMPT_NUMBER, Integer.toString(writerAttemptId.getAttemptNumber()));
        hashMap2.put(ConfigConstants.DATA_FORMAT_TYPE, this.writerOptions.getDataFormat().getType().toString());
        hashMap2.put(ConfigConstants.DATA_FORMAT_VERSION, this.writerOptions.getDataFormat().getVersion().toString());
        if (this.writerOptions.getSettings() != null && this.writerOptions.getSettings().getQuotaName().isPresent()) {
            hashMap2.put(TunnelConstants.PARAM_QUOTA_NAME, this.writerOptions.getSettings().getQuotaName().get());
        }
        String buildTableSessionDataResource = ResourceBuilder.buildTableSessionDataResource("v1", tableIdentifier.getProject(), tableIdentifier.getSchema(), tableIdentifier.getTable(), str);
        try {
            RestClient createHttpClient = ExecutionEnvironment.create(this.writerOptions.getSettings()).createHttpClient(tableIdentifier.getProject());
            createHttpClient.setChunkSize(this.writerOptions.getChunkSize());
            this.connection = createHttpClient.connect(buildTableSessionDataResource, "POST", hashMap2, hashMap);
        } catch (OdpsException | IOException e) {
            disconnect();
            throw new IOException(e.getMessage(), e);
        }
    }

    private String loadResultFromJson(InputStream inputStream) throws IOException {
        try {
            try {
                JsonObject asJsonObject = new JsonParser().parse(IOUtils.readStreamAsString(inputStream)).getAsJsonObject();
                return asJsonObject.has("CommitMessage") ? asJsonObject.get("CommitMessage").getAsString() : "";
            } catch (Exception e) {
                throw new IOException("Parse writer commit response failed", e);
            }
        } finally {
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

    private void disconnect() throws IOException {
        if (this.connection != null) {
            this.connection.disconnect();
        }
    }
}
