package mqttcli import ( "crypto/rand" "crypto/tls" "crypto/x509" "device-manage/tools/config" "errors" "fmt" "io/ioutil" "net/url" "strings" "time" flathandler "device-manage/common/mqttcli/flatbuffer" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" log "github.com/sirupsen/logrus" ) var MqttClientList map[string]*MQTTClient type MQTTClient struct { ClientID string Client mqtt.Client Subscribed map[string]byte Url *url.URL } func MqttSetup() { var err error MqttClientList = make(map[string]*MQTTClient, len(config.MqttCfgInfo.ServerUrl)) for _, addr := range config.MqttCfgInfo.ServerUrl { mqClient := new(MQTTClient) if mqClient.Url, err = url.Parse(addr); err != nil { fmt.Println(err) continue } mqClient.ClientID = config.MqttCfgInfo.ClientId mqClient.Subscribed = make(map[string]byte) //默认只监听设备主动上报事件. mqClient.Subscribed[mqClient.ClientID+TopicListenerEvent] = 0x01 MqttClientList[mqClient.Url.Hostname()] = mqClient mqClient.MqttConnTask() } } // Connects connect to the MQTT broker with Options. func (m *MQTTClient) Connect() (mqtt.Client, error) { if !m.Client.IsConnected() { log.Info("connecting...") if token := m.Client.Connect(); token.Wait() && token.Error() != nil { return nil, token.Error() } } return m.Client, nil } //mqtt connect timer task func (m *MQTTClient) MqttConnTask() { go func() { for true { if m.Client != nil { if !m.Client.IsConnected() { log.Info("retry connecting...") //Opts := m.createClientOptions() //m.Client = mqtt.NewClient(Opts) if token := m.Client.Connect(); token.Wait() && token.Error() != nil { log.Info("retry connect fail.") } else { log.Info("mqtt connected.") m.SubscribeOnConnect() } } else { log.Info("mqtt alived.") //ws.WebsocketManager.SendAll([]byte("device online")) } } else { log.Info("first connect...") Opts := m.createClientOptions() m.Client = mqtt.NewClient(Opts) if token := m.Client.Connect(); token.Wait() && token.Error() != nil { log.Info("connection fail.") } else { log.Info("mqtt connected.") m.SubscribeOnConnect() } } time.Sleep(30 * time.Second) } }() } func (m *MQTTClient) Disconnect() error { if m.Client.IsConnected() { m.Client.Disconnect(20) log.Info("client disconnected") } return nil } func (m *MQTTClient) SubscribeOnConnect() { if len(m.Subscribed) > 0 { token := m.Client.SubscribeMultiple(m.Subscribed, m.onMessageReceived) token.Wait() if token.Error() != nil { log.Error(token.Error()) } } } //connect call back func OnConnect(client mqtt.Client) { } //disconnect call back func OnConnectionLost(client mqtt.Client, reason error) { log.Errorf("client disconnected: %s", reason) } func (m *MQTTClient) onMessageReceived(client mqtt.Client, message mqtt.Message) { fmt.Println("recv topic:", message.Topic()) topicArry := strings.Split(message.Topic(), "/") devId := topicArry[len(topicArry)-1] flathandler.FlatbufferHandler(devId, message.Payload()) } func getCertPool(pemPath string) (*x509.CertPool, error) { certs := x509.NewCertPool() pemData, err := ioutil.ReadFile(pemPath) if err != nil { return nil, err } certs.AppendCertsFromPEM(pemData) return certs, nil } // getRandomClientId returns randomized ClientId. func getRandomClientId() string { const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" var bytes = make([]byte, MaxClientIdLen) rand.Read(bytes) for i, b := range bytes { bytes[i] = alphanum[b%byte(len(alphanum))] } return "mqttcli-" + string(bytes) } // createClientOptions returns ClientOptions via parsing config options. func (m *MQTTClient) createClientOptions() *mqtt.ClientOptions { opts := mqtt.NewClientOptions() opts.SetClientID(m.ClientID) opts.SetUsername(m.Url.User.Username()) password, _ := m.Url.User.Password() opts.SetPassword(password) opts.AddBroker(fmt.Sprintf("tcp://%s", m.Url.Host)) opts.SetConnectRetry(false) opts.SetOnConnectHandler(OnConnect) opts.SetConnectionLostHandler(OnConnectionLost) return opts } func (m *MQTTClient) Publish(topic string, msg []byte) { if m.Client.IsConnected() { m.Client.Publish(topic, 2, false, msg) } else { token := m.Client.Connect() for !token.WaitTimeout(5 * time.Second) { } if err := token.Error(); err != nil { log.Debug("cli connect:", err.Error()) //log.Fatal(err) } } } // makeTlsConfig creats new tls.Config. If returned ok is false, does not need set to MQTToption. func makeTlsConfig(cafile, cert, key string, insecure bool) (*tls.Config, bool, error) { TLSConfig := &tls.Config{InsecureSkipVerify: false} var ok bool if insecure { TLSConfig.InsecureSkipVerify = true ok = true } if cafile != "" { certPool, err := getCertPool(cafile) if err != nil { return nil, false, err } TLSConfig.RootCAs = certPool ok = true } if cert != "" { certPool, err := getCertPool(cert) if err != nil { return nil, false, err } TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert TLSConfig.ClientCAs = certPool ok = true } if key != "" { if cert == "" { return nil, false, fmt.Errorf("key specified but cert is not specified") } cert, err := tls.LoadX509KeyPair(cert, key) if err != nil { return nil, false, err } TLSConfig.Certificates = []tls.Certificate{cert} ok = true } return TLSConfig, ok, nil } func (m *MQTTClient) CollectLog(userId, deviceSn string, collectUrl string) error { uuid := uuid.New().String() topic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, m.ClientID) resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, uuid) fmt.Println("Publish topic[%s]", topic) fmt.Println("Subscribe resptopic[%s]", resptopic) buf := flathandler.CreateLogCollectBuffer(userId, collectUrl, uuid) timer := time.NewTimer(10 * time.Second) defer timer.Stop() result := make(chan error) defer close(result) m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) { go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) { fmt.Println("CollectLog recv topic :%s", msg.Topic()) c.Client.Unsubscribe(resptopic) result <- flathandler.FlatbufferHandler(deviceSn, msg.Payload()) }(m, client, msg) }) m.Publish(topic, buf) select { case <-timer.C: m.Client.Unsubscribe(resptopic) fmt.Println("CollectLog timeout.") return errors.New("CollectLog timeout.") case res := <-result: fmt.Println("开始获取日志") return res } } func (m *MQTTClient) FileUpgrade(userId, deviceID, appKey, downloadUrl, fileUid string) error { reply := uuid.New().String() topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID) resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, reply) fmt.Println("Publish topic[%s]", topic) fmt.Println("Subscribe resptopic[%s]", resptopic) buf := flathandler.CreateFileUpgradeBuffer(userId, downloadUrl, appKey, reply, fileUid) timer := time.NewTimer(10 * time.Second) defer timer.Stop() result := make(chan error) defer close(result) m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) { go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) { fmt.Println("upgrade recv topic :%s", msg.Topic()) c.Client.Unsubscribe(resptopic) result <- flathandler.FlatbufferHandler(deviceID, msg.Payload()) }(m, client, msg) }) m.Publish(topic, buf) select { case <-timer.C: m.Client.Unsubscribe(resptopic) fmt.Println("upgrade timeout.") return errors.New("upgrade timeout.") case res := <-result: fmt.Println("开始安装应用") return res } } func (m *MQTTClient) Reboot(deviceID string) error { uuid := uuid.New().String() topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID) resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, uuid) log.Info("Publish topic[%s]\n", topic) log.Info("Subscribe resptopic[%s]\n", resptopic) buf := flathandler.CreateRebootBuffer(0, uuid) timer := time.NewTimer(10 * time.Second) defer timer.Stop() result := make(chan error) defer close(result) m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) { go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) { fmt.Println("Reboot recv topic :%s\n", msg.Topic()) c.Client.Unsubscribe(resptopic) result <- flathandler.FlatbufferHandler(deviceID, msg.Payload()) }(m, client, msg) }) m.Publish(topic, buf) select { case <-timer.C: m.Client.Unsubscribe(resptopic) log.Info("Reboot timeout.") return errors.New("Reboot timeout.") case res := <-result: fmt.Println("开始重启") return res } }