package com.huaweicloud.sdk.iot.module.transport;

import com.huaweicloud.sdk.iot.module.ClientConfig;
import com.huaweicloud.sdk.iot.module.ConnectionStatusChangeCallback;
import com.huaweicloud.sdk.iot.module.exception.GeneraException;
import com.huaweicloud.sdk.iot.module.exception.TransportException;
import com.huaweicloud.sdk.iot.module.thread.AsyncTaskExcutor;
import com.huaweicloud.sdk.iot.module.transport.mqtt.MqttConnection;
import com.huaweicloud.sdk.iot.utils.ExceptionUtil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/huaweicloud/sdk/iot/module/transport/IotTransport.class */
public class IotTransport implements IotListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) IotTransport.class);
    private static final String MODULE_CUSTOMIZED_MESSAGE = "/modules/%s/customized/inputs";
    private static final int QUEUE_SIZE = 5000;
    private boolean opened;
    private IotConnection connection;
    private ClientConfig config;
    private List<String> topics;
    private ScheduledExecutorService taskScheduler;
    private ConnectionStatusChangeCallback connectionStatusChangeCallback;
    private volatile ConnectionStatus connectionStatus = ConnectionStatus.DISCONNECTED;
    private final Object reconnectionLock = new Object();
    private Queue<InnerMessage> messagesToSend = new ConcurrentLinkedQueue();
    private Map<String, InnerMessageCallback> messageCallbacks = new HashMap();
    private AsyncTaskExcutor recvTaskExecutor = new AsyncTaskExcutor("MQTT_RCV", 1, 3, 5000, "MqttRcvPool_");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/huaweicloud/sdk/iot/module/transport/IotTransport$SendTask.class */
    public class SendTask implements Runnable {
        SendTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            InnerMessage innerMessage;
            int maxPerPeriod = IotTransport.this.config.getMaxPerPeriod();
            while (IotTransport.this.connectionStatus == ConnectionStatus.CONNECTED) {
                int i = maxPerPeriod;
                maxPerPeriod--;
                if (i <= 0 || (innerMessage = (InnerMessage) IotTransport.this.messagesToSend.peek()) == null) {
                    return;
                }
                try {
                    IotTransport.this.connection.sendMessage(innerMessage);
                    IotTransport.this.messagesToSend.poll();
                    IotTransport.LOG.debug("Send message {}", innerMessage);
                } catch (Exception e) {
                    IotTransport.LOG.error("Send message fail, message = {}, error = {}", innerMessage, e.getMessage());
                    return;
                }
            }
        }
    }

    public IotTransport(ClientConfig clientConfig, List<String> list) {
        this.config = clientConfig;
        this.topics = list;
    }

    public void addTopic(List<String> list) {
        if (list == null || list.size() == 0) {
            return;
        }
        this.topics.addAll(list);
    }

    public void setMessageCallback(String str, InnerMessageCallback innerMessageCallback) {
        this.messageCallbacks.put(str, innerMessageCallback);
    }

    public void setConnectionStatusChangeCallback(ConnectionStatusChangeCallback connectionStatusChangeCallback) {
        this.connectionStatusChangeCallback = connectionStatusChangeCallback;
    }

    public void open() throws TransportException {
        if (this.opened) {
            return;
        }
        this.connection = new MqttConnection(this.config);
        this.connection.setListener(this);
        this.connection.open(this.topics);
        this.connectionStatus = ConnectionStatus.CONNECTED;
        this.taskScheduler = Executors.newScheduledThreadPool(1);
        this.taskScheduler.scheduleAtFixedRate(new SendTask(), 0L, this.config.getSendPeriod(), TimeUnit.MILLISECONDS);
        this.opened = true;
        LOG.info("Transport connection is opened");
    }

    public void close() throws TransportException {
        if (this.taskScheduler != null) {
            this.taskScheduler.shutdown();
        }
        this.connection.close();
        this.opened = false;
        LOG.info("Transport connection is closed");
    }

    public void sendMessage(InnerMessage innerMessage) {
        if (!this.opened) {
            throw new IllegalStateException("Client is closed");
        }
        if (this.messagesToSend.size() >= this.config.getSendQueueSize()) {
            throw new IllegalStateException("Sending queue is full");
        }
        this.messagesToSend.add(innerMessage);
    }

    private void reconnect(String str) {
        LOG.info("Reconnect begin,{}", str);
        long minReconnectPeriod = this.config.getMinReconnectPeriod();
        while (this.connectionStatus == ConnectionStatus.DISCONNECTED_RETRYING) {
            try {
                Thread.sleep(minReconnectPeriod);
            } catch (InterruptedException e) {
            }
            try {
                this.connection.open(this.topics);
                this.connectionStatus = ConnectionStatus.CONNECTED;
                LOG.info("Reconnect success");
                if (this.connectionStatusChangeCallback != null) {
                    this.connectionStatusChangeCallback.onConnectionStatusChanged(this.connectionStatus);
                    return;
                }
                return;
            } catch (TransportException e2) {
                if (!e2.isRetryable()) {
                    LOG.error("Reconnect fail");
                    forceClose();
                    return;
                } else {
                    minReconnectPeriod *= 2;
                    if (minReconnectPeriod > this.config.getMaxReconnectPeriod()) {
                        minReconnectPeriod = this.config.getMaxReconnectPeriod();
                    }
                }
            }
        }
    }

    private void forceClose() {
        LOG.error("Connection is lost and can'not be recovered, transport will be closed");
        this.connectionStatus = ConnectionStatus.DISCONNECTED;
        if (this.connectionStatusChangeCallback != null) {
            this.connectionStatusChangeCallback.onConnectionStatusChanged(this.connectionStatus);
        }
        try {
            close();
        } catch (GeneraException e) {
        }
    }

    @Override // com.huaweicloud.sdk.iot.module.transport.IotListener
    public void onMessageReceived(InnerMessage innerMessage) {
        LOG.debug("Receive message {}", innerMessage);
        this.recvTaskExecutor.execute(() -> {
            handlePublishedMessage(innerMessage);
        });
    }

    private void handlePublishedMessage(InnerMessage innerMessage) {
        InnerMessageCallback innerMessageCallback = innerMessage.getTopic().startsWith(String.format(MODULE_CUSTOMIZED_MESSAGE, this.config.getModuleId())) ? this.messageCallbacks.get(String.format(MODULE_CUSTOMIZED_MESSAGE, this.config.getModuleId())) : this.messageCallbacks.get(innerMessage.getTopic());
        if (innerMessageCallback != null) {
            try {
                innerMessageCallback.onMessageReceived(innerMessage);
            } catch (Throwable th) {
                LOG.error("MessageCallback exception, message = {}, error = {}", innerMessage, ExceptionUtil.getBriefExceptionStackTrace(th));
            }
        }
    }

    @Override // com.huaweicloud.sdk.iot.module.transport.IotListener
    public void onConnectionLost(String str, TransportException transportException) {
        LOG.info("Connection lost, error = {}", transportException.getMessage());
        synchronized (this.reconnectionLock) {
            if (this.connectionStatus != ConnectionStatus.CONNECTED) {
                return;
            }
            if (transportException.isRetryable()) {
                this.connectionStatus = ConnectionStatus.DISCONNECTED_RETRYING;
                if (this.connectionStatusChangeCallback != null) {
                    this.connectionStatusChangeCallback.onConnectionStatusChanged(this.connectionStatus);
                }
                reconnect(str);
            } else {
                forceClose();
            }
        }
    }

    public synchronized void sendSyncMessage(InnerMessage innerMessage) throws TransportException {
        if (!this.opened) {
            throw new IllegalStateException("Client is closed");
        }
        this.connection.sendSyncMessage(innerMessage, this.config.getReportTimeout());
        LOG.debug("Send message {}", innerMessage);
    }
}
