ws.go 8.5 KB


  1. package ws
  2. import (
  3. "context"
  4. "device-manage/tools/app"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "sync"
  9. "time"
  10. "github.com/gin-gonic/gin"
  11. "github.com/gorilla/websocket"
  12. uuid "github.com/satori/go.uuid"
  13. )
  14. // Manager 所有 websocket 信息
  15. type Manager struct {
  16. Group map[string]map[string]*Client
  17. groupCount, clientCount uint
  18. Lock sync.Mutex
  19. Register, UnRegister chan *Client
  20. Message chan *MessageData
  21. GroupMessage chan *GroupMessageData
  22. BroadCastMessage chan *BroadCastMessageData
  23. }
  24. // Client 单个 websocket 信息
  25. type Client struct {
  26. Id, Group string
  27. Context context.Context
  28. CancelFunc context.CancelFunc
  29. Socket *websocket.Conn
  30. Message chan []byte
  31. }
  32. // messageData 单个发送数据信息
  33. type MessageData struct {
  34. Id, Group string
  35. Context context.Context
  36. Message []byte
  37. }
  38. // groupMessageData 组广播数据信息
  39. type GroupMessageData struct {
  40. Group string
  41. Message []byte
  42. }
  43. // 广播发送数据信息
  44. type BroadCastMessageData struct {
  45. Message []byte
  46. }
  47. // 读信息,从 websocket 连接直接读取数据
  48. func (c *Client) Read(cxt context.Context) {
  49. defer func(cxt context.Context) {
  50. WebsocketManager.UnRegister <- c
  51. log.Printf("client [%s] disconnect", c.Id)
  52. if err := c.Socket.Close(); err != nil {
  53. log.Printf("client [%s] disconnect err: %s", c.Id, err)
  54. }
  55. }(cxt)
  56. for {
  57. if cxt.Err() != nil {
  58. break
  59. }
  60. messageType, message, err := c.Socket.ReadMessage()
  61. if err != nil || messageType == websocket.CloseMessage {
  62. break
  63. }
  64. log.Printf("client [%s] receive message: %s", c.Id, string(message))
  65. c.Message <- message
  66. }
  67. }
  68. // 写信息,从 channel 变量 Send 中读取数据写入 websocket 连接
  69. func (c *Client) Write(cxt context.Context) {
  70. defer func(cxt context.Context) {
  71. log.Printf("client [%s] disconnect", c.Id)
  72. if err := c.Socket.Close(); err != nil {
  73. log.Printf("client [%s] disconnect err: %s", c.Id, err)
  74. }
  75. }(cxt)
  76. for {
  77. if cxt.Err() != nil {
  78. break
  79. }
  80. select {
  81. case message, ok := <-c.Message:
  82. if !ok {
  83. _ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
  84. return
  85. }
  86. log.Printf("client [%s] write message: %s", c.Id, string(message))
  87. err := c.Socket.WriteMessage(websocket.TextMessage, message)
  88. if err != nil {
  89. log.Printf("client [%s] writemessage err: %s", c.Id, err)
  90. }
  91. case _ = <-c.Context.Done():
  92. break
  93. }
  94. }
  95. }
  96. // 启动 websocket 管理器
  97. func (manager *Manager) Start() {
  98. fmt.Println("websocket manage start")
  99. for {
  100. select {
  101. // 注册
  102. case client := <-manager.Register:
  103. log.Printf("client [%s] connect", client.Id)
  104. log.Printf("register client [%s] to group [%s]", client.Id, client.Group)
  105. manager.Lock.Lock()
  106. if manager.Group[client.Group] == nil {
  107. manager.Group[client.Group] = make(map[string]*Client)
  108. manager.groupCount += 1
  109. }
  110. manager.Group[client.Group][client.Id] = client
  111. manager.clientCount += 1
  112. manager.Lock.Unlock()
  113. // 注销
  114. case client := <-manager.UnRegister:
  115. log.Printf("unregister client [%s] from group [%s]", client.Id, client.Group)
  116. manager.Lock.Lock()
  117. if mGroup, ok := manager.Group[client.Group]; ok {
  118. if mClient, ok := mGroup[client.Id]; ok {
  119. close(mClient.Message)
  120. delete(mGroup, client.Id)
  121. manager.clientCount -= 1
  122. if len(mGroup) == 0 {
  123. //log.Printf("delete empty group [%s]", client.Group)
  124. delete(manager.Group, client.Group)
  125. manager.groupCount -= 1
  126. }
  127. mClient.CancelFunc()
  128. }
  129. }
  130. manager.Lock.Unlock()
  131. // 发送广播数据到某个组的 channel 变量 Send 中
  132. //case data := <-manager.boardCast:
  133. // if groupMap, ok := manager.wsGroup[data.GroupId]; ok {
  134. // for _, conn := range groupMap {
  135. // conn.Send <- data.Data
  136. // }
  137. // }
  138. }
  139. }
  140. }
  141. // 处理单个 client 发送数据
  142. func (manager *Manager) SendService() {
  143. for {
  144. select {
  145. case data := <-manager.Message:
  146. if groupMap, ok := manager.Group[data.Group]; ok {
  147. if conn, ok := groupMap[data.Id]; ok {
  148. conn.Message <- data.Message
  149. }
  150. }
  151. }
  152. }
  153. }
  154. // 处理 group 广播数据
  155. func (manager *Manager) SendGroupService() {
  156. for {
  157. select {
  158. // 发送广播数据到某个组的 channel 变量 Send 中
  159. case data := <-manager.GroupMessage:
  160. if groupMap, ok := manager.Group[data.Group]; ok {
  161. for _, conn := range groupMap {
  162. conn.Message <- data.Message
  163. }
  164. }
  165. }
  166. }
  167. }
  168. // 处理广播数据
  169. func (manager *Manager) SendAllService() {
  170. for {
  171. select {
  172. case data := <-manager.BroadCastMessage:
  173. for _, v := range manager.Group {
  174. for _, conn := range v {
  175. conn.Message <- data.Message
  176. }
  177. }
  178. }
  179. }
  180. }
  181. // 向指定的 client 发送数据
  182. func (manager *Manager) Send(cxt context.Context, id string, group string, message []byte) {
  183. data := &MessageData{
  184. Id: id,
  185. Context: cxt,
  186. Group: group,
  187. Message: message,
  188. }
  189. manager.Message <- data
  190. }
  191. // 向指定的 Group 广播
  192. func (manager *Manager) SendGroup(group string, message []byte) {
  193. data := &GroupMessageData{
  194. Group: group,
  195. Message: message,
  196. }
  197. manager.GroupMessage <- data
  198. }
  199. // 广播
  200. func (manager *Manager) SendAll(message []byte) {
  201. data := &BroadCastMessageData{
  202. Message: message,
  203. }
  204. manager.BroadCastMessage <- data
  205. }
  206. // 注册
  207. func (manager *Manager) RegisterClient(client *Client) {
  208. manager.Register <- client
  209. }
  210. // 注销
  211. func (manager *Manager) UnRegisterClient(client *Client) {
  212. manager.UnRegister <- client
  213. }
  214. // 当前组个数
  215. func (manager *Manager) LenGroup() uint {
  216. return manager.groupCount
  217. }
  218. // 当前连接个数
  219. func (manager *Manager) LenClient() uint {
  220. return manager.clientCount
  221. }
  222. // 获取 wsManager 管理器信息
  223. func (manager *Manager) Info() map[string]interface{} {
  224. managerInfo := make(map[string]interface{})
  225. managerInfo["groupLen"] = manager.LenGroup()
  226. managerInfo["clientLen"] = manager.LenClient()
  227. managerInfo["chanRegisterLen"] = len(manager.Register)
  228. managerInfo["chanUnregisterLen"] = len(manager.UnRegister)
  229. managerInfo["chanMessageLen"] = len(manager.Message)
  230. managerInfo["chanGroupMessageLen"] = len(manager.GroupMessage)
  231. managerInfo["chanBroadCastMessageLen"] = len(manager.BroadCastMessage)
  232. return managerInfo
  233. }
  234. // 初始化 wsManager 管理器
  235. var WebsocketManager = Manager{
  236. Group: make(map[string]map[string]*Client),
  237. Register: make(chan *Client, 128),
  238. UnRegister: make(chan *Client, 128),
  239. GroupMessage: make(chan *GroupMessageData, 128),
  240. Message: make(chan *MessageData, 128),
  241. BroadCastMessage: make(chan *BroadCastMessageData, 128),
  242. groupCount: 0,
  243. clientCount: 0,
  244. }
  245. // gin 处理 websocket handler
  246. func (manager *Manager) WsClient(c *gin.Context) {
  247. ctx, cancel := context.WithCancel(context.Background())
  248. upGrader := websocket.Upgrader{
  249. // cross origin domain
  250. CheckOrigin: func(r *http.Request) bool {
  251. return true
  252. },
  253. // 处理 Sec-WebSocket-Protocol Header
  254. Subprotocols: []string{c.GetHeader("Sec-WebSocket-Protocol")},
  255. }
  256. conn, err := upGrader.Upgrade(c.Writer, c.Request, nil)
  257. if err != nil {
  258. log.Printf("websocket connect error: %s", c.Param("channel"))
  259. return
  260. }
  261. //fmt.Println("token: ", c.Query("token"))
  262. client := &Client{
  263. Id: uuid.NewV4().String(),
  264. Group: c.Param("channel"),
  265. Context: ctx,
  266. CancelFunc: cancel,
  267. Socket: conn,
  268. Message: make(chan []byte, 1024),
  269. }
  270. manager.RegisterClient(client)
  271. go client.Read(ctx)
  272. go client.Write(ctx)
  273. time.Sleep(time.Second * 3)
  274. // 测试单个 client 发送数据
  275. //manager.Send(ctx, client.Id, client.Group, []byte("Send message ----"+time.Now().Format("2006-01-02 15:04:05")))
  276. //manager.SendAll([]byte("Send message ----" + time.Now().Format("2006-01-02 15:04:05")))
  277. //tools.FileMonitoringById(ctx, "temp/logs/job/db-20200820.log", c.Param("id"), c.Param("channel"), SendOne)
  278. }
  279. func (manager *Manager) UnWsClient(c *gin.Context) {
  280. id := c.Param("id")
  281. group := c.Param("channel")
  282. WsLogout(id, group)
  283. app.OK(c, "ws close success", "success")
  284. }
  285. func SendGroup(msg []byte) {
  286. WebsocketManager.SendGroup("leffss", []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
  287. fmt.Println(WebsocketManager.Info())
  288. }
  289. func SendAll(msg []byte) {
  290. WebsocketManager.SendAll([]byte("{\"code\":200,\"data\":" + string(msg) + "}"))
  291. fmt.Println(WebsocketManager.Info())
  292. }
  293. func SendOne(ctx context.Context, id string, group string, msg []byte) {
  294. WebsocketManager.Send(ctx, id, group, []byte("{\"code\":200,\"data\":"+string(msg)+"}"))
  295. fmt.Println(WebsocketManager.Info())
  296. }
  297. func WsLogout(id string, group string) {
  298. WebsocketManager.UnRegisterClient(&Client{Id: id, Group: group})
  299. fmt.Println(WebsocketManager.Info())
  300. }