| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- package mqttcli
- import (
- "crypto/rand"
- "crypto/tls"
- "crypto/x509"
- "device-manage/tools/config"
- "errors"
- "fmt"
- "io/ioutil"
- "net/url"
- "strings"
- "time"
- flathandler "device-manage/common/mqttcli/flatbuffer"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/google/uuid"
- log "github.com/sirupsen/logrus"
- )
- var MqttClientList map[string]*MQTTClient
- type MQTTClient struct {
- ClientID string
- Client mqtt.Client
- Subscribed map[string]byte
- Url *url.URL
- }
- func MqttSetup() {
- var err error
- MqttClientList = make(map[string]*MQTTClient, len(config.MqttCfgInfo.ServerUrl))
- for _, addr := range config.MqttCfgInfo.ServerUrl {
- mqClient := new(MQTTClient)
- if mqClient.Url, err = url.Parse(addr); err != nil {
- fmt.Println(err)
- continue
- }
- mqClient.ClientID = config.MqttCfgInfo.ClientId
- mqClient.Subscribed = make(map[string]byte)
- //默认只监听设备主动上报事件.
- mqClient.Subscribed[mqClient.ClientID+TopicListenerEvent] = 0x01
- MqttClientList[mqClient.Url.Hostname()] = mqClient
- mqClient.MqttConnTask()
- }
- }
- // Connects connect to the MQTT broker with Options.
- func (m *MQTTClient) Connect() (mqtt.Client, error) {
- if !m.Client.IsConnected() {
- log.Info("connecting...")
- if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
- return nil, token.Error()
- }
- }
- return m.Client, nil
- }
- //mqtt connect timer task
- func (m *MQTTClient) MqttConnTask() {
- go func() {
- for true {
- if m.Client != nil {
- if !m.Client.IsConnected() {
- log.Info("retry connecting...")
- //Opts := m.createClientOptions()
- //m.Client = mqtt.NewClient(Opts)
- if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
- log.Info("retry connect fail.")
- } else {
- log.Info("mqtt connected.")
- m.SubscribeOnConnect()
- }
- } else {
- log.Info("mqtt alived.")
- //ws.WebsocketManager.SendAll([]byte("device online"))
- }
- } else {
- log.Info("first connect...")
- Opts := m.createClientOptions()
- m.Client = mqtt.NewClient(Opts)
- if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
- log.Info("connection fail.")
- } else {
- log.Info("mqtt connected.")
- m.SubscribeOnConnect()
- }
- }
- time.Sleep(30 * time.Second)
- }
- }()
- }
- func (m *MQTTClient) Disconnect() error {
- if m.Client.IsConnected() {
- m.Client.Disconnect(20)
- log.Info("client disconnected")
- }
- return nil
- }
- func (m *MQTTClient) SubscribeOnConnect() {
- if len(m.Subscribed) > 0 {
- token := m.Client.SubscribeMultiple(m.Subscribed, m.onMessageReceived)
- token.Wait()
- if token.Error() != nil {
- log.Error(token.Error())
- }
- }
- }
- //connect call back
- func OnConnect(client mqtt.Client) {
- }
- //disconnect call back
- func OnConnectionLost(client mqtt.Client, reason error) {
- log.Errorf("client disconnected: %s", reason)
- }
- func (m *MQTTClient) onMessageReceived(client mqtt.Client, message mqtt.Message) {
- fmt.Println("recv topic:", message.Topic())
- topicArry := strings.Split(message.Topic(), "/")
- devId := topicArry[len(topicArry)-1]
- flathandler.FlatbufferHandler(devId, message.Payload())
- }
- func getCertPool(pemPath string) (*x509.CertPool, error) {
- certs := x509.NewCertPool()
- pemData, err := ioutil.ReadFile(pemPath)
- if err != nil {
- return nil, err
- }
- certs.AppendCertsFromPEM(pemData)
- return certs, nil
- }
- // getRandomClientId returns randomized ClientId.
- func getRandomClientId() string {
- const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
- var bytes = make([]byte, MaxClientIdLen)
- rand.Read(bytes)
- for i, b := range bytes {
- bytes[i] = alphanum[b%byte(len(alphanum))]
- }
- return "mqttcli-" + string(bytes)
- }
- // createClientOptions returns ClientOptions via parsing config options.
- func (m *MQTTClient) createClientOptions() *mqtt.ClientOptions {
- opts := mqtt.NewClientOptions()
- opts.SetClientID(m.ClientID)
- opts.SetUsername(m.Url.User.Username())
- password, _ := m.Url.User.Password()
- opts.SetPassword(password)
- opts.AddBroker(fmt.Sprintf("tcp://%s", m.Url.Host))
- opts.SetConnectRetry(false)
- opts.SetOnConnectHandler(OnConnect)
- opts.SetConnectionLostHandler(OnConnectionLost)
- return opts
- }
- func (m *MQTTClient) Publish(topic string, msg []byte) {
- if m.Client.IsConnected() {
- m.Client.Publish(topic, 2, false, msg)
- } else {
- token := m.Client.Connect()
- for !token.WaitTimeout(5 * time.Second) {
- }
- if err := token.Error(); err != nil {
- log.Debug("cli connect:", err.Error())
- //log.Fatal(err)
- }
- }
- }
- // makeTlsConfig creats new tls.Config. If returned ok is false, does not need set to MQTToption.
- func makeTlsConfig(cafile, cert, key string, insecure bool) (*tls.Config, bool, error) {
- TLSConfig := &tls.Config{InsecureSkipVerify: false}
- var ok bool
- if insecure {
- TLSConfig.InsecureSkipVerify = true
- ok = true
- }
- if cafile != "" {
- certPool, err := getCertPool(cafile)
- if err != nil {
- return nil, false, err
- }
- TLSConfig.RootCAs = certPool
- ok = true
- }
- if cert != "" {
- certPool, err := getCertPool(cert)
- if err != nil {
- return nil, false, err
- }
- TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
- TLSConfig.ClientCAs = certPool
- ok = true
- }
- if key != "" {
- if cert == "" {
- return nil, false, fmt.Errorf("key specified but cert is not specified")
- }
- cert, err := tls.LoadX509KeyPair(cert, key)
- if err != nil {
- return nil, false, err
- }
- TLSConfig.Certificates = []tls.Certificate{cert}
- ok = true
- }
- return TLSConfig, ok, nil
- }
- func (m *MQTTClient) CollectLog(userId, deviceSn string, collectUrl string) error {
- uuid := uuid.New().String()
- topic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, m.ClientID)
- resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, uuid)
- fmt.Println("Publish topic[%s]", topic)
- fmt.Println("Subscribe resptopic[%s]", resptopic)
- buf := flathandler.CreateLogCollectBuffer(userId, collectUrl, uuid)
- timer := time.NewTimer(10 * time.Second)
- defer timer.Stop()
- result := make(chan error)
- defer close(result)
- m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
- go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
- fmt.Println("CollectLog recv topic :%s", msg.Topic())
- c.Client.Unsubscribe(resptopic)
- result <- flathandler.FlatbufferHandler(deviceSn, msg.Payload())
- }(m, client, msg)
- })
- m.Publish(topic, buf)
- select {
- case <-timer.C:
- m.Client.Unsubscribe(resptopic)
- fmt.Println("CollectLog timeout.")
- return errors.New("CollectLog timeout.")
- case res := <-result:
- fmt.Println("开始获取日志")
- return res
- }
- }
- func (m *MQTTClient) FileUpgrade(userId, deviceID, appKey, downloadUrl, fileUid string) error {
- reply := uuid.New().String()
- topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID)
- resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, reply)
- fmt.Println("Publish topic[%s]", topic)
- fmt.Println("Subscribe resptopic[%s]", resptopic)
- buf := flathandler.CreateFileUpgradeBuffer(userId, downloadUrl, appKey, reply, fileUid)
- timer := time.NewTimer(10 * time.Second)
- defer timer.Stop()
- result := make(chan error)
- defer close(result)
- m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
- go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
- fmt.Println("upgrade recv topic :%s", msg.Topic())
- c.Client.Unsubscribe(resptopic)
- result <- flathandler.FlatbufferHandler(deviceID, msg.Payload())
- }(m, client, msg)
- })
- m.Publish(topic, buf)
- select {
- case <-timer.C:
- m.Client.Unsubscribe(resptopic)
- fmt.Println("upgrade timeout.")
- return errors.New("upgrade timeout.")
- case res := <-result:
- fmt.Println("开始安装应用")
- return res
- }
- }
- func (m *MQTTClient) Reboot(deviceID string) error {
- uuid := uuid.New().String()
- topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID)
- resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, uuid)
- log.Info("Publish topic[%s]\n", topic)
- log.Info("Subscribe resptopic[%s]\n", resptopic)
- buf := flathandler.CreateRebootBuffer(0, uuid)
- timer := time.NewTimer(10 * time.Second)
- defer timer.Stop()
- result := make(chan error)
- defer close(result)
- m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
- go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
- fmt.Println("Reboot recv topic :%s\n", msg.Topic())
- c.Client.Unsubscribe(resptopic)
- result <- flathandler.FlatbufferHandler(deviceID, msg.Payload())
- }(m, client, msg)
- })
- m.Publish(topic, buf)
- select {
- case <-timer.C:
- m.Client.Unsubscribe(resptopic)
- log.Info("Reboot timeout.")
- return errors.New("Reboot timeout.")
- case res := <-result:
- fmt.Println("开始重启")
- return res
- }
- }
|