package com.aliyun.odps.tunnel;

import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Headers;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import com.aliyun.odps.tunnel.io.StreamRecordPackImpl;
import com.aliyun.odps.utils.ConnectionWatcher;
import com.aliyun.odps.volume.Path;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

/* loaded from: input_file:com/aliyun/odps/tunnel/StreamUploadSessionImpl.class */
public class StreamUploadSessionImpl implements TableTunnel.StreamUploadSession {
    private String id;
    private Configuration conf;
    private String projectName;
    private String tableName;
    private String partitionSpec;
    private RestClient tunnelServiceClient;
    private Slots slots;
    private TableSchema schema = new TableSchema();
    private boolean p2pMode = false;

    /* loaded from: input_file:com/aliyun/odps/tunnel/StreamUploadSessionImpl$Slot.class */
    public class Slot {
        private String slot;
        private String ip;
        private int port;

        public Slot(String str, String str2, boolean z) throws TunnelException {
            if (str.isEmpty() || str2.isEmpty()) {
                throw new TunnelException("Slot or Routed server is empty");
            }
            this.slot = str;
            setServer(str2, !z);
        }

        public String getSlot() {
            return this.slot;
        }

        public String getIp() {
            return this.ip;
        }

        public int getPort() {
            return this.port;
        }

        public String getServer() {
            return this.ip + ":" + this.port;
        }

        public boolean equals(Slot slot) {
            return this.slot == slot.slot && this.ip == slot.ip && this.port == slot.port;
        }

        public void setServer(String str, boolean z) throws TunnelException {
            String[] split = str.split(":");
            if (split.length != 2) {
                throw new TunnelException("Invalid slot format: " + str);
            }
            if (!split[0].isEmpty()) {
                this.ip = split[0];
            } else if (z) {
                throw new TunnelException("Empty server ip: " + str);
            }
            this.port = Integer.valueOf(split[1]).intValue();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/odps/tunnel/StreamUploadSessionImpl$Slots.class */
    public class Slots implements Iterable<Slot> {
        private Random rand = new Random();
        private final List<Slot> slots = new ArrayList();
        private int curSlotIndex;
        private Iterator<Slot> iter;

        public Slots(JsonArray jsonArray, boolean z) throws TunnelException {
            this.curSlotIndex = -1;
            Iterator<JsonElement> it = jsonArray.iterator();
            while (it.hasNext()) {
                JsonElement next = it.next();
                if (!next.isJsonArray()) {
                    throw new TunnelException("Invalid slot routes");
                }
                JsonArray asJsonArray = next.getAsJsonArray();
                if (asJsonArray.size() != 2) {
                    throw new TunnelException("Invalid slot routes");
                }
                this.slots.add(new Slot(asJsonArray.get(0).getAsString(), asJsonArray.get(1).getAsString(), z));
            }
            if (this.slots.size() > 0) {
                this.curSlotIndex = this.rand.nextInt(this.slots.size());
            }
            this.iter = new Iterator<Slot>() { // from class: com.aliyun.odps.tunnel.StreamUploadSessionImpl.Slots.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return Slots.this.curSlotIndex >= 0;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public synchronized Slot next() {
                    if (!hasNext()) {
                        return null;
                    }
                    if (Slots.this.curSlotIndex >= Slots.this.slots.size()) {
                        Slots.this.curSlotIndex = 0;
                    }
                    return (Slot) Slots.this.slots.get(Slots.access$008(Slots.this));
                }
            };
        }

        @Override // java.lang.Iterable
        public Iterator<Slot> iterator() {
            return this.iter;
        }

        public int getSlotNum() {
            return this.slots.size();
        }

        static /* synthetic */ int access$008(Slots slots) {
            int i = slots.curSlotIndex;
            slots.curSlotIndex = i + 1;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamUploadSessionImpl(String str, String str2, String str3, Configuration configuration, long j, boolean z) throws TunnelException {
        this.conf = configuration;
        this.projectName = str;
        this.tableName = str2;
        this.partitionSpec = str3;
        this.tunnelServiceClient = this.conf.newRestClient(str);
        initiate(j, z);
    }

    private void initiate(long j, boolean z) throws TunnelException {
        HashMap hashMap = new HashMap();
        if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
            hashMap.put(TunnelConstants.RES_PARTITION, this.partitionSpec);
        }
        if (z) {
            hashMap.put(TunnelConstants.CREATE_PARTITION, "");
        }
        HashMap<String, String> commonHeader = TableTunnel.getCommonHeader();
        if (j > 0) {
            commonHeader.put(HttpHeaders.HEADER_ODPS_SLOT_NUM, String.valueOf(j));
        }
        Connection connection = null;
        try {
            try {
                try {
                    Connection connect = this.tunnelServiceClient.connect(getResource(), "POST", hashMap, commonHeader);
                    Response response = connect.getResponse();
                    String header = response.getHeader("x-odps-request-id");
                    if (!response.isOK()) {
                        throw new TunnelException(header, connect.getInputStream(), Integer.valueOf(response.getStatus()));
                    }
                    loadFromJson(header, connect.getInputStream(), false);
                    if (connect != null) {
                        try {
                            connect.disconnect();
                        } catch (IOException e) {
                        }
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            connection.disconnect();
                        } catch (IOException e2) {
                        }
                    }
                    throw th;
                }
            } catch (IOException e3) {
                throw new TunnelException((String) null, "Failed to create upload session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e3);
            }
        } catch (TunnelException e4) {
            throw e4;
        } catch (OdpsException e5) {
            throw new TunnelException((String) null, e5.getMessage(), e5);
        }
    }

    private void reload(String str) throws TunnelException {
        HashMap hashMap = new HashMap();
        hashMap.put(TunnelConstants.UPLOADID, this.id);
        if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
            hashMap.put(TunnelConstants.RES_PARTITION, this.partitionSpec);
        }
        Connection connection = null;
        try {
            try {
                try {
                    try {
                        Connection connect = this.tunnelServiceClient.connect(getResource(), "GET", hashMap, TableTunnel.getCommonHeader());
                        Response response = connect.getResponse();
                        String header = response.getHeader("x-odps-request-id");
                        if (!response.isOK()) {
                            throw new TunnelException(header, connect.getInputStream(), Integer.valueOf(response.getStatus()));
                        }
                        loadFromJson(header, connect.getInputStream(), true);
                        if (connect != null) {
                            try {
                                connect.disconnect();
                            } catch (IOException e) {
                            }
                        }
                    } catch (TunnelException e2) {
                        throw e2;
                    }
                } catch (IOException e3) {
                    throw new TunnelException((String) null, "Failed to reload upload session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e3);
                }
            } catch (OdpsException e4) {
                throw new TunnelException((String) null, e4.getMessage(), e4);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                try {
                    connection.disconnect();
                } catch (IOException e5) {
                }
            }
            throw th;
        }
    }

    private void loadFromJson(String str, InputStream inputStream, boolean z) throws TunnelException {
        try {
            String readStreamAsString = IOUtils.readStreamAsString(inputStream);
            JsonObject asJsonObject = new JsonParser().parse(readStreamAsString).getAsJsonObject();
            if (!z) {
                if (!asJsonObject.has("session_name") || !asJsonObject.has("schema")) {
                    throw new TunnelException(str, "Incomplete session info: '" + readStreamAsString + "'");
                }
                this.id = asJsonObject.get("session_name").getAsString();
                this.schema = new TunnelTableSchema(asJsonObject.get("schema").getAsJsonObject());
            }
            if (!asJsonObject.has("slots") || !asJsonObject.has("status")) {
                throw new TunnelException(str, "Incomplete session info: '" + readStreamAsString + "'");
            }
            if (asJsonObject.get("status").getAsString().equals("init")) {
                throw new TunnelException(str, "Session is initiating. Session name: " + this.id);
            }
            this.slots = new Slots(asJsonObject.getAsJsonArray("slots"), z);
        } catch (TunnelException e) {
            throw e;
        } catch (Exception e2) {
            throw new TunnelException(str, "Invalid json content: ''", e2);
        }
    }

    public void reloadSlots(Slot slot, String str, int i) throws TunnelException {
        if (this.slots.getSlotNum() != i) {
            reload(slot.getServer());
        } else {
            if (slot.getServer().equals(str)) {
                return;
            }
            slot.setServer(str, false);
        }
    }

    private Connection getConnection(CompressOption compressOption, Slot slot, long j, long j2) throws OdpsException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(TunnelConstants.UPLOADID, this.id);
        hashMap.put(TunnelConstants.SLOT_ID, slot.getSlot());
        if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
            hashMap.put(TunnelConstants.RES_PARTITION, this.partitionSpec);
        }
        if (j2 > 0) {
            hashMap.put(TunnelConstants.RECORD_COUNT, String.valueOf(j2));
        }
        HashMap hashMap2 = new HashMap();
        if (j < 0) {
            hashMap2.put("Transfer-Encoding", Headers.CHUNKED);
        } else {
            hashMap2.put("Content-Length", String.valueOf(j));
        }
        hashMap2.put("Content-Type", "application/octet-stream");
        hashMap2.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(4));
        hashMap2.put(HttpHeaders.HEADER_ODPS_SLOT_NUM, String.valueOf(this.slots.getSlotNum()));
        switch (compressOption.algorithm) {
            case ODPS_RAW:
                break;
            case ODPS_ZLIB:
                hashMap2.put("Content-Encoding", "deflate");
                break;
            case ODPS_SNAPPY:
                hashMap2.put("Content-Encoding", "x-snappy-framed");
                break;
            default:
                throw new TunnelException("unsupported compression option.");
        }
        hashMap2.put(HttpHeaders.HEADER_ODPS_ROUTED_SERVER, slot.getServer());
        if (!this.p2pMode) {
            return this.tunnelServiceClient.connect(getResource(), "PUT", hashMap, hashMap2);
        }
        try {
            return this.tunnelServiceClient.connect(getResource(), "PUT", hashMap, hashMap2, new URI(this.tunnelServiceClient.getEndpoint()).getScheme() + "://" + slot.getIp());
        } catch (URISyntaxException e) {
            throw new TunnelException("Invalid endpoint: " + this.tunnelServiceClient.getEndpoint());
        }
    }

    private String getResource() {
        return this.conf.getResource(this.projectName, this.tableName) + Path.SEPARATOR + TunnelConstants.STREAMS;
    }

    public String writeBlock(ProtobufRecordPack protobufRecordPack) throws IOException {
        return writeBlock(protobufRecordPack, 0L);
    }

    public String writeBlock(ProtobufRecordPack protobufRecordPack, long j) throws IOException {
        Connection connection = null;
        try {
            try {
                Slot next = this.slots.iterator().next();
                connection = getConnection(protobufRecordPack.getCompressOption(), next, protobufRecordPack.getTotalBytes(), protobufRecordPack.getSize());
                String sendBlock = sendBlock(protobufRecordPack, connection, next, j);
                if (null != connection) {
                    connection.disconnect();
                }
                return sendBlock;
            } catch (OdpsException e) {
                throw new IOException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            if (null != connection) {
                connection.disconnect();
            }
            throw th;
        }
    }

    private String sendBlock(ProtobufRecordPack protobufRecordPack, Connection connection, Slot slot, long j) throws IOException, TunnelException {
        if (null == connection) {
            throw new IOException("Invalid connection");
        }
        ByteArrayOutputStream protobufStream = protobufRecordPack.getProtobufStream();
        if (j > 0) {
            ConnectionWatcher.getInstance().mark(connection, j);
        }
        try {
            try {
                protobufStream.writeTo(connection.getOutputStream());
                connection.getOutputStream().close();
                protobufStream.close();
                Response response = connection.getResponse();
                if (j > 0) {
                    ConnectionWatcher.getInstance().release(connection);
                }
                if (response.isOK()) {
                    reloadSlots(slot, response.getHeader(HttpHeaders.HEADER_ODPS_ROUTED_SERVER), Integer.valueOf(response.getHeader(HttpHeaders.HEADER_ODPS_SLOT_NUM)).intValue());
                    return response.getHeader("x-odps-request-id");
                }
                TunnelException tunnelException = new TunnelException(response.getHeader("x-odps-request-id"), connection.getInputStream(), Integer.valueOf(response.getStatus()));
                throw new IOException(tunnelException.getMessage(), tunnelException);
            } catch (Throwable th) {
                if (j <= 0 || !ConnectionWatcher.getInstance().checkTimedOut(connection)) {
                    throw th;
                }
                throw new SocketTimeoutException("Flush time exceeded timeout user set: " + j + "ms");
            }
        } catch (Throwable th2) {
            if (j > 0) {
                ConnectionWatcher.getInstance().release(connection);
            }
            throw th2;
        }
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public String getId() {
        return this.id;
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public void setP2pMode(boolean z) {
        this.p2pMode = z;
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableSchema getSchema() {
        return this.schema;
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableTunnel.StreamRecordPack newRecordPack() throws IOException {
        return new StreamRecordPackImpl(this, new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0));
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableTunnel.StreamRecordPack newRecordPack(CompressOption compressOption) throws IOException {
        return new StreamRecordPackImpl(this, compressOption);
    }

    @Override // com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public Record newRecord() {
        return new ArrayRecord((Column[]) this.schema.getColumns().toArray(new Column[0]));
    }
}
