1
0

websocket.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. // Package service: WebSocketService owns the per-connection pump goroutines
  2. // and bridges the HTTP-layer controller to the broadcast hub. The controller
  3. // handles the upgrade handshake and authentication, then hands the raw
  4. // connection to this service which takes ownership of its lifecycle.
  5. package service
  6. import (
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v2/logger"
  9. "github.com/mhsanaei/3x-ui/v2/util/common"
  10. "github.com/mhsanaei/3x-ui/v2/web/websocket"
  11. "github.com/google/uuid"
  12. ws "github.com/gorilla/websocket"
  13. )
  14. const (
  15. wsWriteWait = 10 * time.Second
  16. wsPongWait = 60 * time.Second
  17. wsPingPeriod = (wsPongWait * 9) / 10
  18. wsClientReadLimit = 512
  19. )
  20. // WebSocketService manages WebSocket client connections. It owns the
  21. // read/write pumps for each accepted connection and registers/unregisters
  22. // clients with the hub.
  23. type WebSocketService struct {
  24. hub *websocket.Hub
  25. }
  26. // NewWebSocketService creates a service backed by the given hub.
  27. func NewWebSocketService(hub *websocket.Hub) *WebSocketService {
  28. return &WebSocketService{hub: hub}
  29. }
  30. // HandleConnection takes ownership of an upgraded WebSocket connection:
  31. // registers a new client, starts the read/write pumps, and returns
  32. // immediately. The connection is closed when both pumps exit.
  33. func (s *WebSocketService) HandleConnection(conn *ws.Conn, remoteIP string) {
  34. if s == nil || s.hub == nil || conn == nil {
  35. if conn != nil {
  36. conn.Close()
  37. }
  38. return
  39. }
  40. client := websocket.NewClient(uuid.New().String())
  41. s.hub.Register(client)
  42. logger.Debugf("WebSocket client %s registered from %s", client.ID, remoteIP)
  43. go s.writePump(client, conn)
  44. go s.readPump(client, conn)
  45. }
  46. // readPump consumes inbound frames so the gorilla deadline/pong machinery keeps
  47. // running. Clients send no commands today; frames are discarded.
  48. func (s *WebSocketService) readPump(client *websocket.Client, conn *ws.Conn) {
  49. defer func() {
  50. if r := common.Recover("WebSocket readPump panic"); r != nil {
  51. logger.Error("WebSocket readPump panic recovered:", r)
  52. }
  53. s.hub.Unregister(client)
  54. conn.Close()
  55. }()
  56. conn.SetReadLimit(wsClientReadLimit)
  57. conn.SetReadDeadline(time.Now().Add(wsPongWait))
  58. conn.SetPongHandler(func(string) error {
  59. return conn.SetReadDeadline(time.Now().Add(wsPongWait))
  60. })
  61. for {
  62. if _, _, err := conn.ReadMessage(); err != nil {
  63. if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
  64. logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
  65. }
  66. return
  67. }
  68. }
  69. }
  70. // writePump pushes hub messages to the connection and emits keepalive pings.
  71. func (s *WebSocketService) writePump(client *websocket.Client, conn *ws.Conn) {
  72. ticker := time.NewTicker(wsPingPeriod)
  73. defer func() {
  74. if r := common.Recover("WebSocket writePump panic"); r != nil {
  75. logger.Error("WebSocket writePump panic recovered:", r)
  76. }
  77. ticker.Stop()
  78. conn.Close()
  79. }()
  80. for {
  81. select {
  82. case msg, ok := <-client.Send:
  83. conn.SetWriteDeadline(time.Now().Add(wsWriteWait))
  84. if !ok {
  85. conn.WriteMessage(ws.CloseMessage, []byte{})
  86. return
  87. }
  88. if err := conn.WriteMessage(ws.TextMessage, msg); err != nil {
  89. logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
  90. return
  91. }
  92. case <-ticker.C:
  93. conn.SetWriteDeadline(time.Now().Add(wsWriteWait))
  94. if err := conn.WriteMessage(ws.PingMessage, nil); err != nil {
  95. logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err)
  96. return
  97. }
  98. }
  99. }
  100. }