package com.aliyun.odps.table.read.impl.batch;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.table.DataFormat;
import com.aliyun.odps.table.DataSchema;
import com.aliyun.odps.table.SessionStatus;
import com.aliyun.odps.table.TableIdentifier;
import com.aliyun.odps.table.configuration.ArrowOptions;
import com.aliyun.odps.table.configuration.ReaderOptions;
import com.aliyun.odps.table.configuration.SplitOptions;
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
import com.aliyun.odps.table.enviroment.ExecutionEnvironment;
import com.aliyun.odps.table.read.SplitReader;
import com.aliyun.odps.table.read.split.InputSplit;
import com.aliyun.odps.table.read.split.impl.IndexedInputSplitAssigner;
import com.aliyun.odps.table.read.split.impl.RowRangeInputSplitAssigner;
import com.aliyun.odps.table.utils.ConfigConstants;
import com.aliyun.odps.table.utils.HttpUtils;
import com.aliyun.odps.table.utils.Preconditions;
import com.aliyun.odps.table.utils.SchemaUtils;
import com.aliyun.odps.table.utils.SessionUtils;
import com.aliyun.odps.tunnel.TunnelConstants;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import p000flinkconnectorodps.io.netty.handler.codec.http.HttpHeaders;
import p000flinkconnectorodps.org.apache.arrow.vector.VectorSchemaRoot;

/* loaded from: input_file:com/aliyun/odps/table/read/impl/batch/TableBatchReadSessionImpl.class */
public class TableBatchReadSessionImpl extends TableBatchReadSessionBase {
    private static final Logger logger = LoggerFactory.getLogger(TableBatchReadSessionImpl.class.getName());
    private transient RestClient restClient;

    public TableBatchReadSessionImpl(TableIdentifier tableIdentifier, String str, EnvironmentSettings environmentSettings) throws IOException {
        super(tableIdentifier, str, environmentSettings);
    }

    public TableBatchReadSessionImpl(TableIdentifier tableIdentifier, List<PartitionSpec> list, List<String> list2, List<String> list3, List<Integer> list4, SplitOptions splitOptions, ArrowOptions arrowOptions, EnvironmentSettings environmentSettings) throws IOException {
        super(tableIdentifier, list, list2, list3, list4, splitOptions, arrowOptions, environmentSettings);
    }

    @Override // com.aliyun.odps.table.read.TableBatchReadSession
    public SplitReader<VectorSchemaRoot> createArrowReader(InputSplit inputSplit, ReaderOptions readerOptions) throws IOException {
        Preconditions.checkNotNull(this.identifier, "Table read identifier");
        Preconditions.checkNotNull(inputSplit, "Input split");
        Preconditions.checkNotNull(readerOptions, "Reader options");
        return new SplitArrowReaderImpl(this.identifier, inputSplit, readerOptions);
    }

    @Override // com.aliyun.odps.table.read.TableBatchReadSession
    public SplitReader<ArrayRecord> createRecordReader(InputSplit inputSplit, ReaderOptions readerOptions) throws IOException {
        return new SplitRecordReaderImpl(createArrowReader(inputSplit, readerOptions), this.readSchema, readerOptions);
    }

    @Override // com.aliyun.odps.table.read.TableReadSession
    public boolean supportsDataFormat(DataFormat dataFormat) {
        if (this.supportDataFormats != null) {
            return this.supportDataFormats.contains(dataFormat);
        }
        return false;
    }

    @Override // com.aliyun.odps.table.read.impl.batch.TableBatchReadSessionBase
    protected void planInputSplits() throws IOException {
        ensureClientInitialized();
        Map<String, String> createCommonHeader = HttpUtils.createCommonHeader();
        createCommonHeader.put("Content-Type", HttpHeaders.Values.APPLICATION_JSON);
        HashMap hashMap = new HashMap();
        hashMap.put(ConfigConstants.SESSION_TYPE, getType().toString());
        if (this.settings != null && this.settings.getQuotaName().isPresent()) {
            hashMap.put(TunnelConstants.PARAM_QUOTA_NAME, this.settings.getQuotaName().get());
        }
        try {
            String generateReadSessionRequest = generateReadSessionRequest();
            logger.debug(String.format("Read table '%s'.\nSession request:\n%s", this.identifier.toString(), generateReadSessionRequest));
            Response stringRequest = this.restClient.stringRequest(ResourceBuilder.buildTableSessionResource("v1", this.identifier.getProject(), this.identifier.getSchema(), this.identifier.getTable(), null), "POST", hashMap, createCommonHeader, generateReadSessionRequest);
            if (!stringRequest.isOK()) {
                throw new TunnelException(stringRequest.getHeader("x-odps-request-id"), new ByteArrayInputStream(stringRequest.getBody()), Integer.valueOf(stringRequest.getStatus()));
            }
            String str = new String(stringRequest.getBody());
            loadResultFromJson(str);
            if (this.sessionStatus != SessionStatus.NORMAL) {
                long asyncIntervalInMills = HttpUtils.getAsyncIntervalInMills(this.settings);
                long asyncTimeoutInSeconds = HttpUtils.getAsyncTimeoutInSeconds(this.settings) * 1000;
                long currentTimeMillis = System.currentTimeMillis();
                while (this.sessionStatus == SessionStatus.INIT) {
                    Thread.sleep(asyncIntervalInMills);
                    logger.trace(String.format("Async read table: '%s', session id: %s", this.identifier.toString(), this.sessionId));
                    str = reloadInputSplits();
                    if (System.currentTimeMillis() - currentTimeMillis >= asyncTimeoutInSeconds) {
                        throw new IOException(String.format("Create table read session timeout.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage));
                    }
                }
            }
            if (this.sessionStatus != SessionStatus.NORMAL) {
                throw new IOException(String.format("Create table read session failed.\nTable identifier: %s.\nSession status: %s.\nSession id: %s.\nError message: %s.", this.identifier.toString(), this.sessionStatus, this.sessionId, this.errorMessage));
            }
            logger.debug(String.format("Read table '%s'.\nSession response:\n%s", this.identifier.toString(), str));
        } catch (Exception e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    @Override // com.aliyun.odps.table.read.impl.batch.TableBatchReadSessionBase
    protected String reloadInputSplits() throws IOException {
        ensureClientInitialized();
        Preconditions.checkString(this.sessionId, "Table read session id");
        Map<String, String> createCommonHeader = HttpUtils.createCommonHeader();
        HashMap hashMap = new HashMap();
        hashMap.put(ConfigConstants.SESSION_TYPE, getType().toString());
        if (this.settings != null && this.settings.getQuotaName().isPresent()) {
            hashMap.put(TunnelConstants.PARAM_QUOTA_NAME, this.settings.getQuotaName().get());
        }
        Connection connection = null;
        try {
            try {
                Connection connect = this.restClient.connect(ResourceBuilder.buildTableSessionResource("v1", this.identifier.getProject(), this.identifier.getSchema(), this.identifier.getTable(), this.sessionId), "GET", hashMap, createCommonHeader);
                Response response = connect.getResponse();
                if (!response.isOK()) {
                    throw new TunnelException(response.getHeader("x-odps-request-id"), connect.getInputStream(), Integer.valueOf(response.getStatus()));
                }
                String readStreamAsString = IOUtils.readStreamAsString(connect.getInputStream());
                loadResultFromJson(readStreamAsString);
                if (connect != null) {
                    connect.disconnect();
                }
                return readStreamAsString;
            } catch (OdpsException e) {
                throw new IOException(e);
            } catch (IOException e2) {
                throw new IOException("Failed to reload table read session with endpoint: " + this.restClient.getEndpoint(), e2);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                connection.disconnect();
            }
            throw th;
        }
    }

    private void ensureClientInitialized() {
        if (this.restClient == null) {
            this.restClient = ExecutionEnvironment.create(this.settings).createHttpClient(this.identifier.getProject());
        }
    }

    private String generateReadSessionRequest() {
        JsonObject jsonObject = new JsonObject();
        JsonArray jsonArray = new JsonArray();
        Stream<R> map = this.requiredDataColumns.stream().map(JsonPrimitive::new);
        jsonArray.getClass();
        map.forEach((v1) -> {
            r1.add(v1);
        });
        jsonObject.add("RequiredDataColumns", jsonArray);
        JsonArray jsonArray2 = new JsonArray();
        Stream<R> map2 = this.requiredPartitionColumns.stream().map(JsonPrimitive::new);
        jsonArray2.getClass();
        map2.forEach((v1) -> {
            r1.add(v1);
        });
        jsonObject.add("RequiredPartitionColumns", jsonArray2);
        JsonArray jsonArray3 = new JsonArray();
        Stream map3 = this.requiredPartitions.stream().map(partitionSpec -> {
            return partitionSpec.toString(false, true);
        }).map(JsonPrimitive::new);
        jsonArray3.getClass();
        map3.forEach((v1) -> {
            r1.add(v1);
        });
        jsonObject.add("RequiredPartitions", jsonArray3);
        JsonArray jsonArray4 = new JsonArray();
        Stream<R> map4 = this.requiredBucketIds.stream().map((v1) -> {
            return new JsonPrimitive(v1);
        });
        jsonArray4.getClass();
        map4.forEach((v1) -> {
            r1.add(v1);
        });
        jsonObject.add("RequiredBucketIds", jsonArray4);
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.addProperty("SplitMode", this.splitOptions.getSplitMode().toString());
        jsonObject2.addProperty("SplitNumber", Long.valueOf(this.splitOptions.getSplitNumber()));
        jsonObject2.addProperty("CrossPartition", Boolean.valueOf(this.splitOptions.isCrossPartition()));
        jsonObject.add("SplitOptions", jsonObject2);
        JsonObject jsonObject3 = new JsonObject();
        jsonObject3.addProperty("TimestampUnit", this.arrowOptions.getTimestampUnit().toString());
        jsonObject3.addProperty("DatetimeUnit", this.arrowOptions.getDateTimeUnit().toString());
        jsonObject.add("ArrowOptions", jsonObject3);
        jsonObject.add("FilterPredicate", new JsonPrimitive(""));
        return new GsonBuilder().disableHtmlEscaping().create().toJson((JsonElement) jsonObject);
    }

    private void loadResultFromJson(String str) throws TunnelException {
        int asInt;
        try {
            JsonObject asJsonObject = new JsonParser().parse(str).getAsJsonObject();
            if (asJsonObject.has("SessionId")) {
                this.sessionId = asJsonObject.get("SessionId").getAsString();
            }
            if (asJsonObject.has("ExpirationTime")) {
                this.expirationTime = asJsonObject.get("ExpirationTime").getAsLong();
            }
            if (asJsonObject.has("SessionType")) {
                String asString = asJsonObject.get("SessionType").getAsString();
                if (!getType().toString().equals(asString.toLowerCase())) {
                    throw new UnsupportedOperationException("Unsupported session type: " + asString);
                }
            }
            if (asJsonObject.has("SessionStatus")) {
                this.sessionStatus = SessionStatus.valueOf(asJsonObject.get("SessionStatus").getAsString().toUpperCase());
            }
            if (asJsonObject.has("Message")) {
                this.errorMessage = asJsonObject.get("Message").getAsString();
            }
            if (asJsonObject.has("DataSchema")) {
                JsonObject asJsonObject2 = asJsonObject.get("DataSchema").getAsJsonObject();
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                if (asJsonObject2.has("DataColumns")) {
                    JsonArray asJsonArray = asJsonObject2.get("DataColumns").getAsJsonArray();
                    for (int i = 0; i < asJsonArray.size(); i++) {
                        arrayList.add(SchemaUtils.parseColumn(asJsonArray.get(i).getAsJsonObject()));
                    }
                }
                if (asJsonObject2.has("PartitionColumns")) {
                    JsonArray asJsonArray2 = asJsonObject2.get("PartitionColumns").getAsJsonArray();
                    for (int i2 = 0; i2 < asJsonArray2.size(); i2++) {
                        Column parseColumn = SchemaUtils.parseColumn(asJsonArray2.get(i2).getAsJsonObject());
                        arrayList.add(parseColumn);
                        arrayList2.add(parseColumn.getName());
                    }
                }
                this.readSchema = DataSchema.newBuilder().columns(arrayList).partitionBy(arrayList2).build();
            }
            if (asJsonObject.has("SupportedDataFormat")) {
                this.supportDataFormats = new HashSet();
                asJsonObject.get("SupportedDataFormat").getAsJsonArray().forEach(jsonElement -> {
                    this.supportDataFormats.add(SessionUtils.parseDataFormat(jsonElement.getAsJsonObject()));
                });
            }
            if (asJsonObject.has("RecordCount")) {
                long asLong = asJsonObject.get("RecordCount").getAsLong();
                if (asLong >= 0 && this.splitOptions.getSplitMode().equals(SplitOptions.SplitMode.ROW_OFFSET)) {
                    this.inputSplitAssigner = new RowRangeInputSplitAssigner(this.sessionId, asLong);
                }
            }
            if (asJsonObject.has("SplitsCount") && (asInt = asJsonObject.get("SplitsCount").getAsInt()) >= 0 && !this.splitOptions.getSplitMode().equals(SplitOptions.SplitMode.BUCKET)) {
                this.inputSplitAssigner = new IndexedInputSplitAssigner(this.sessionId, asInt);
            }
        } catch (Exception e) {
            throw new TunnelException("Invalid session response: \n" + str, e);
        }
    }
}
