mqttcli.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package mqttcli
  2. import (
  3. "crypto/rand"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "device-manage/tools/config"
  7. "errors"
  8. "fmt"
  9. "io/ioutil"
  10. "net/url"
  11. "strings"
  12. "time"
  13. flathandler "device-manage/common/mqttcli/flatbuffer"
  14. mqtt "github.com/eclipse/paho.mqtt.golang"
  15. "github.com/google/uuid"
  16. log "github.com/sirupsen/logrus"
  17. )
  18. var MqttClientList map[string]*MQTTClient
  19. type MQTTClient struct {
  20. ClientID string
  21. Client mqtt.Client
  22. Subscribed map[string]byte
  23. Url *url.URL
  24. }
  25. func MqttSetup() {
  26. var err error
  27. MqttClientList = make(map[string]*MQTTClient, len(config.MqttCfgInfo.ServerUrl))
  28. for _, addr := range config.MqttCfgInfo.ServerUrl {
  29. mqClient := new(MQTTClient)
  30. if mqClient.Url, err = url.Parse(addr); err != nil {
  31. fmt.Println(err)
  32. continue
  33. }
  34. mqClient.ClientID = config.MqttCfgInfo.ClientId
  35. mqClient.Subscribed = make(map[string]byte)
  36. //默认只监听设备主动上报事件.
  37. mqClient.Subscribed[mqClient.ClientID+TopicListenerEvent] = 0x01
  38. MqttClientList[mqClient.Url.Hostname()] = mqClient
  39. mqClient.MqttConnTask()
  40. }
  41. }
  42. // Connects connect to the MQTT broker with Options.
  43. func (m *MQTTClient) Connect() (mqtt.Client, error) {
  44. if !m.Client.IsConnected() {
  45. log.Info("connecting...")
  46. if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
  47. return nil, token.Error()
  48. }
  49. }
  50. return m.Client, nil
  51. }
  52. //mqtt connect timer task
  53. func (m *MQTTClient) MqttConnTask() {
  54. go func() {
  55. for true {
  56. if m.Client != nil {
  57. if !m.Client.IsConnected() {
  58. log.Info("retry connecting...")
  59. //Opts := m.createClientOptions()
  60. //m.Client = mqtt.NewClient(Opts)
  61. if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
  62. log.Info("retry connect fail.")
  63. } else {
  64. log.Info("mqtt connected.")
  65. m.SubscribeOnConnect()
  66. }
  67. } else {
  68. log.Info("mqtt alived.")
  69. //ws.WebsocketManager.SendAll([]byte("device online"))
  70. }
  71. } else {
  72. log.Info("first connect...")
  73. Opts := m.createClientOptions()
  74. m.Client = mqtt.NewClient(Opts)
  75. if token := m.Client.Connect(); token.Wait() && token.Error() != nil {
  76. log.Info("connection fail.")
  77. } else {
  78. log.Info("mqtt connected.")
  79. m.SubscribeOnConnect()
  80. }
  81. }
  82. time.Sleep(30 * time.Second)
  83. }
  84. }()
  85. }
  86. func (m *MQTTClient) Disconnect() error {
  87. if m.Client.IsConnected() {
  88. m.Client.Disconnect(20)
  89. log.Info("client disconnected")
  90. }
  91. return nil
  92. }
  93. func (m *MQTTClient) SubscribeOnConnect() {
  94. if len(m.Subscribed) > 0 {
  95. token := m.Client.SubscribeMultiple(m.Subscribed, m.onMessageReceived)
  96. token.Wait()
  97. if token.Error() != nil {
  98. log.Error(token.Error())
  99. }
  100. }
  101. }
  102. //connect call back
  103. func OnConnect(client mqtt.Client) {
  104. }
  105. //disconnect call back
  106. func OnConnectionLost(client mqtt.Client, reason error) {
  107. log.Errorf("client disconnected: %s", reason)
  108. }
  109. func (m *MQTTClient) onMessageReceived(client mqtt.Client, message mqtt.Message) {
  110. fmt.Println("recv topic:", message.Topic())
  111. topicArry := strings.Split(message.Topic(), "/")
  112. devId := topicArry[len(topicArry)-1]
  113. flathandler.FlatbufferHandler(devId, message.Payload())
  114. }
  115. func getCertPool(pemPath string) (*x509.CertPool, error) {
  116. certs := x509.NewCertPool()
  117. pemData, err := ioutil.ReadFile(pemPath)
  118. if err != nil {
  119. return nil, err
  120. }
  121. certs.AppendCertsFromPEM(pemData)
  122. return certs, nil
  123. }
  124. // getRandomClientId returns randomized ClientId.
  125. func getRandomClientId() string {
  126. const alphanum = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
  127. var bytes = make([]byte, MaxClientIdLen)
  128. rand.Read(bytes)
  129. for i, b := range bytes {
  130. bytes[i] = alphanum[b%byte(len(alphanum))]
  131. }
  132. return "mqttcli-" + string(bytes)
  133. }
  134. // createClientOptions returns ClientOptions via parsing config options.
  135. func (m *MQTTClient) createClientOptions() *mqtt.ClientOptions {
  136. opts := mqtt.NewClientOptions()
  137. opts.SetClientID(m.ClientID)
  138. opts.SetUsername(m.Url.User.Username())
  139. password, _ := m.Url.User.Password()
  140. opts.SetPassword(password)
  141. opts.AddBroker(fmt.Sprintf("tcp://%s", m.Url.Host))
  142. opts.SetConnectRetry(false)
  143. opts.SetOnConnectHandler(OnConnect)
  144. opts.SetConnectionLostHandler(OnConnectionLost)
  145. return opts
  146. }
  147. func (m *MQTTClient) Publish(topic string, msg []byte) {
  148. if m.Client.IsConnected() {
  149. m.Client.Publish(topic, 2, false, msg)
  150. } else {
  151. token := m.Client.Connect()
  152. for !token.WaitTimeout(5 * time.Second) {
  153. }
  154. if err := token.Error(); err != nil {
  155. log.Debug("cli connect:", err.Error())
  156. //log.Fatal(err)
  157. }
  158. }
  159. }
  160. // makeTlsConfig creats new tls.Config. If returned ok is false, does not need set to MQTToption.
  161. func makeTlsConfig(cafile, cert, key string, insecure bool) (*tls.Config, bool, error) {
  162. TLSConfig := &tls.Config{InsecureSkipVerify: false}
  163. var ok bool
  164. if insecure {
  165. TLSConfig.InsecureSkipVerify = true
  166. ok = true
  167. }
  168. if cafile != "" {
  169. certPool, err := getCertPool(cafile)
  170. if err != nil {
  171. return nil, false, err
  172. }
  173. TLSConfig.RootCAs = certPool
  174. ok = true
  175. }
  176. if cert != "" {
  177. certPool, err := getCertPool(cert)
  178. if err != nil {
  179. return nil, false, err
  180. }
  181. TLSConfig.ClientAuth = tls.RequireAndVerifyClientCert
  182. TLSConfig.ClientCAs = certPool
  183. ok = true
  184. }
  185. if key != "" {
  186. if cert == "" {
  187. return nil, false, fmt.Errorf("key specified but cert is not specified")
  188. }
  189. cert, err := tls.LoadX509KeyPair(cert, key)
  190. if err != nil {
  191. return nil, false, err
  192. }
  193. TLSConfig.Certificates = []tls.Certificate{cert}
  194. ok = true
  195. }
  196. return TLSConfig, ok, nil
  197. }
  198. func (m *MQTTClient) CollectLog(userId, deviceSn string, collectUrl string) error {
  199. uuid := uuid.New().String()
  200. topic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, m.ClientID)
  201. resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceSn, uuid)
  202. fmt.Println("Publish topic[%s]", topic)
  203. fmt.Println("Subscribe resptopic[%s]", resptopic)
  204. buf := flathandler.CreateLogCollectBuffer(userId, collectUrl, uuid)
  205. timer := time.NewTimer(10 * time.Second)
  206. defer timer.Stop()
  207. result := make(chan error)
  208. defer close(result)
  209. m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
  210. go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
  211. fmt.Println("CollectLog recv topic :%s", msg.Topic())
  212. c.Client.Unsubscribe(resptopic)
  213. result <- flathandler.FlatbufferHandler(deviceSn, msg.Payload())
  214. }(m, client, msg)
  215. })
  216. m.Publish(topic, buf)
  217. select {
  218. case <-timer.C:
  219. m.Client.Unsubscribe(resptopic)
  220. fmt.Println("CollectLog timeout.")
  221. return errors.New("CollectLog timeout.")
  222. case res := <-result:
  223. fmt.Println("开始获取日志")
  224. return res
  225. }
  226. }
  227. func (m *MQTTClient) FileUpgrade(userId, deviceID, appKey, downloadUrl, fileUid string) error {
  228. reply := uuid.New().String()
  229. topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID)
  230. resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, reply)
  231. fmt.Println("Publish topic[%s]", topic)
  232. fmt.Println("Subscribe resptopic[%s]", resptopic)
  233. buf := flathandler.CreateFileUpgradeBuffer(userId, downloadUrl, appKey, reply, fileUid)
  234. timer := time.NewTimer(10 * time.Second)
  235. defer timer.Stop()
  236. result := make(chan error)
  237. defer close(result)
  238. m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
  239. go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
  240. fmt.Println("upgrade recv topic :%s", msg.Topic())
  241. c.Client.Unsubscribe(resptopic)
  242. result <- flathandler.FlatbufferHandler(deviceID, msg.Payload())
  243. }(m, client, msg)
  244. })
  245. m.Publish(topic, buf)
  246. select {
  247. case <-timer.C:
  248. m.Client.Unsubscribe(resptopic)
  249. fmt.Println("upgrade timeout.")
  250. return errors.New("upgrade timeout.")
  251. case res := <-result:
  252. fmt.Println("开始安装应用")
  253. return res
  254. }
  255. }
  256. func (m *MQTTClient) Reboot(deviceID string) error {
  257. uuid := uuid.New().String()
  258. topic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, m.ClientID)
  259. resptopic := fmt.Sprintf("%v/control[-flat]/%v", deviceID, uuid)
  260. log.Info("Publish topic[%s]\n", topic)
  261. log.Info("Subscribe resptopic[%s]\n", resptopic)
  262. buf := flathandler.CreateRebootBuffer(0, uuid)
  263. timer := time.NewTimer(10 * time.Second)
  264. defer timer.Stop()
  265. result := make(chan error)
  266. defer close(result)
  267. m.Client.Subscribe(resptopic, 0, func(client mqtt.Client, msg mqtt.Message) {
  268. go func(c *MQTTClient, client mqtt.Client, msg mqtt.Message) {
  269. fmt.Println("Reboot recv topic :%s\n", msg.Topic())
  270. c.Client.Unsubscribe(resptopic)
  271. result <- flathandler.FlatbufferHandler(deviceID, msg.Payload())
  272. }(m, client, msg)
  273. })
  274. m.Publish(topic, buf)
  275. select {
  276. case <-timer.C:
  277. m.Client.Unsubscribe(resptopic)
  278. log.Info("Reboot timeout.")
  279. return errors.New("Reboot timeout.")
  280. case res := <-result:
  281. fmt.Println("开始重启")
  282. return res
  283. }
  284. }