package com.aliyun.odps.tunnel.io;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.ArrowUtils;
import com.aliyun.odps.data.ArrowRecordReader;
import com.aliyun.odps.rest.ResourceBuilder;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.tunnel.HttpHeaders;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelConstants;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.ArrowMessage;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageChannelReader;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

/* loaded from: input_file:com/aliyun/odps/tunnel/io/ArrowTunnelRecordReader.class */
public class ArrowTunnelRecordReader implements ArrowRecordReader {
    private RestClient tunnelServiceClient;
    private long start;
    private long count;
    private List<Column> columnList;
    private TableTunnel.DownloadSession tableSession;
    private boolean isClosed;
    private ArrowHttpInputStream inputStream;
    private Connection connection;
    private BufferAllocator allocator;
    private MessageChannelReader messageReader;
    private Schema arrowSchema;
    private CompressOption compression;

    public ArrowTunnelRecordReader(long j, long j2, List<Column> list, RestClient restClient, TableTunnel.DownloadSession downloadSession, BufferAllocator bufferAllocator, CompressOption compressOption) throws TunnelException, IOException {
        this.start = 0L;
        this.count = 0L;
        this.start = j;
        this.count = j2;
        this.columnList = list;
        if (bufferAllocator == null) {
            this.allocator = new RootAllocator(Long.MAX_VALUE);
        } else {
            this.allocator = bufferAllocator;
        }
        this.tunnelServiceClient = restClient;
        this.tableSession = downloadSession;
        this.isClosed = false;
        this.arrowSchema = ArrowUtils.tableSchemaToArrowSchema(downloadSession.getSchema(), list);
        this.compression = compressOption;
        openReaderConnection(this.start, this.count, this.columnList, this.tunnelServiceClient, this.tableSession);
    }

    private ArrowRecordBatch readBatch() throws IOException {
        if (this.isClosed) {
            throw new IOException("Arrow reader is closed");
        }
        if (this.inputStream == null) {
            this.inputStream = new ArrowHttpInputStream(this.connection.getInputStream(), this.compression);
            this.messageReader = new MessageChannelReader(new ReadChannel(this.inputStream), this.allocator);
        }
        ArrowMessage deserializeMessageBatch = MessageSerializer.deserializeMessageBatch(this.messageReader);
        if (deserializeMessageBatch == null) {
            return null;
        }
        return (ArrowRecordBatch) deserializeMessageBatch;
    }

    @Override // com.aliyun.odps.data.ArrowRecordReader
    public VectorSchemaRoot read() throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<Field> it = this.arrowSchema.getFields().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().createVector(this.allocator));
        }
        VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(this.arrowSchema, arrayList, 0);
        VectorLoader vectorLoader = new VectorLoader(vectorSchemaRoot);
        ArrowRecordBatch readBatch = readBatch();
        if (readBatch == null) {
            return null;
        }
        vectorLoader.load(readBatch);
        readBatch.close();
        return vectorSchemaRoot;
    }

    @Override // com.aliyun.odps.data.ArrowRecordReader
    public long bytesRead() {
        if (this.messageReader != null) {
            return this.messageReader.bytesRead();
        }
        return 0L;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isClosed) {
            return;
        }
        if (this.inputStream != null) {
            this.inputStream.close();
        }
        this.connection.disconnect();
        this.isClosed = true;
    }

    private void openReaderConnection(long j, long j2, List<Column> list, RestClient restClient, TableTunnel.DownloadSession downloadSession) throws IOException, TunnelException {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("Content-Length", String.valueOf(0));
        hashMap2.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(5));
        switch (this.compression.algorithm) {
            case ODPS_RAW:
                break;
            case ODPS_ZLIB:
                hashMap2.put("Accept-Encoding", "deflate");
                break;
            case ODPS_SNAPPY:
                hashMap2.put("Accept-Encoding", "x-snappy-framed");
                break;
            case ODPS_ARROW_LZ4_FRAME:
                hashMap2.put("Accept-Encoding", "x-odps-lz4-frame");
                break;
            default:
                throw new TunnelException("invalid compression option.");
        }
        if (list != null && list.size() != 0) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < list.size(); i++) {
                sb.append(list.get(i).getName());
                if (i != list.size() - 1) {
                    sb.append(StringUtils.COMMA_STR);
                }
            }
            hashMap.put(TunnelConstants.RES_COLUMNS, sb.toString());
        }
        hashMap.put(TunnelConstants.DOWNLOADID, downloadSession.getId());
        hashMap.put("data", null);
        hashMap.put(TunnelConstants.ROW_RANGE, "(" + j + StringUtils.COMMA_STR + j2 + ")");
        String partitionSpec = downloadSession.getPartitionSpec();
        if (partitionSpec != null && partitionSpec.length() > 0) {
            hashMap.put(TunnelConstants.RES_PARTITION, partitionSpec);
        }
        hashMap.put(TunnelConstants.PARAM_ARROW, "");
        try {
            Connection connect = restClient.connect(ResourceBuilder.buildTableResource(downloadSession.getProjectName(), downloadSession.getSchemaName(), downloadSession.getTableName()), "GET", hashMap, hashMap2);
            Response response = connect.getResponse();
            if (!response.isOK()) {
                TunnelException tunnelException = new TunnelException(connect.getInputStream());
                tunnelException.setRequestId(response.getHeader("x-odps-request-id"));
                throw tunnelException;
            }
            CompressOption compressOption = null;
            String header = response.getHeader("Content-Encoding");
            if (header != null) {
                if (header.equals("deflate")) {
                    compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ZLIB, -1, 0);
                } else if (header.equals("x-snappy-framed")) {
                    compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_SNAPPY, -1, 0);
                } else {
                    if (!header.equals("x-odps-lz4-frame")) {
                        throw new TunnelException("invalid content encoding");
                    }
                    compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_ARROW_LZ4_FRAME, -1, 0);
                }
            }
            this.compression = compressOption;
            this.connection = connect;
        } catch (TunnelException e) {
            throw e;
        } catch (OdpsException e2) {
            if (this.connection != null) {
                this.connection.disconnect();
            }
            throw new TunnelException(e2.getMessage(), e2);
        } catch (IOException e3) {
            if (this.connection != null) {
                this.connection.disconnect();
            }
            throw new TunnelException(e3.getMessage(), e3);
        }
    }
}
