浏览代码

refactor(websocket): split controller into service + thin controller

Move per-connection lifecycle out of the controller and into a new
service.WebSocketService. The controller is now HTTP-layer only:
authenticate, validate origin, upgrade, and hand the connection off.

- web/service/websocket.go (new): owns the read/write pumps, hub
  registration, and connection lifetime. Pump constants are prefixed
  (wsWriteWait, wsPongWait, wsPingPeriod, wsClientReadLimit) to avoid
  collisions in the larger service package namespace.
- web/controller/websocket.go: trimmed to the upgrader, same-origin
  check, auth gate, and hand-off to the service.
- web/web.go: wires controller.NewWebSocketController(service.NewWebSocketService(hub)).

The hub package (web/websocket) stays as low-level fan-out
infrastructure. Behavior is unchanged — this is a structural cleanup
to align with the rest of the codebase's controller/service split.

Also includes a small range-int modernization in login_limiter_test.go
that gopls flagged.

Co-Authored-By: Claude Opus 4.7 <[email protected]>
MHSanaei 2 天之前
父节点
当前提交
c394938f01
共有 4 个文件被更改,包括 133 次插入90 次删除
  1. 4 4
      web/controller/login_limiter_test.go
  2. 11 84
      web/controller/websocket.go
  3. 115 0
      web/service/websocket.go
  4. 3 2
      web/web.go

+ 4 - 4
web/controller/login_limiter_test.go

@@ -10,7 +10,7 @@ func TestLoginLimiterBlocksAfterConfiguredFailures(t *testing.T) {
 	limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute)
 	limiter.now = func() time.Time { return now }
 
-	for i := 0; i < 4; i++ {
+	for i := range 4 {
 		if _, blocked := limiter.registerFailure("192.0.2.10", "Admin"); blocked {
 			t.Fatalf("failure %d should not block yet", i+1)
 		}
@@ -41,7 +41,7 @@ func TestLoginLimiterPrunesOldFailuresAndResetsOnSuccess(t *testing.T) {
 	limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute)
 	limiter.now = func() time.Time { return now }
 
-	for i := 0; i < 4; i++ {
+	for range 4 {
 		limiter.registerFailure("192.0.2.10", "admin")
 	}
 	now = now.Add(6 * time.Minute)
@@ -50,7 +50,7 @@ func TestLoginLimiterPrunesOldFailuresAndResetsOnSuccess(t *testing.T) {
 	}
 
 	limiter.registerSuccess("192.0.2.10", "admin")
-	for i := 0; i < 4; i++ {
+	for i := range 4 {
 		if _, blocked := limiter.registerFailure("192.0.2.10", "admin"); blocked {
 			t.Fatalf("success should reset previous failures; failure %d blocked", i+1)
 		}
@@ -62,7 +62,7 @@ func TestLoginLimiterSeparatesIPAndUsername(t *testing.T) {
 	limiter := newLoginLimiter(5, 5*time.Minute, 15*time.Minute)
 	limiter.now = func() time.Time { return now }
 
-	for i := 0; i < 5; i++ {
+	for range 5 {
 		limiter.registerFailure("192.0.2.10", "admin")
 	}
 	if _, ok := limiter.allow("192.0.2.11", "admin"); !ok {

+ 11 - 84
web/controller/websocket.go

@@ -5,25 +5,15 @@ import (
 	"net/http"
 	"net/url"
 	"strings"
-	"time"
 
-	"github.com/google/uuid"
 	"github.com/mhsanaei/3x-ui/v2/logger"
-	"github.com/mhsanaei/3x-ui/v2/util/common"
+	"github.com/mhsanaei/3x-ui/v2/web/service"
 	"github.com/mhsanaei/3x-ui/v2/web/session"
-	"github.com/mhsanaei/3x-ui/v2/web/websocket"
 
 	"github.com/gin-gonic/gin"
 	ws "github.com/gorilla/websocket"
 )
 
-const (
-	writeWait       = 10 * time.Second
-	pongWait        = 60 * time.Second
-	pingPeriod      = (pongWait * 9) / 10
-	clientReadLimit = 512
-)
-
 var upgrader = ws.Upgrader{
 	ReadBufferSize:    32768,
 	WriteBufferSize:   32768,
@@ -57,18 +47,21 @@ func checkSameOrigin(r *http.Request) bool {
 	return strings.EqualFold(u.Hostname(), host)
 }
 
-// WebSocketController handles WebSocket connections for real-time updates.
+// WebSocketController handles the HTTP→WebSocket upgrade for real-time updates.
+// All per-connection lifecycle (pumps, hub registration) lives in
+// service.WebSocketService — this controller is HTTP-layer only.
 type WebSocketController struct {
 	BaseController
-	hub *websocket.Hub
+	service *service.WebSocketService
 }
 
-// NewWebSocketController creates a new WebSocket controller.
-func NewWebSocketController(hub *websocket.Hub) *WebSocketController {
-	return &WebSocketController{hub: hub}
+// NewWebSocketController creates a controller wired to the given service.
+func NewWebSocketController(svc *service.WebSocketService) *WebSocketController {
+	return &WebSocketController{service: svc}
 }
 
-// HandleWebSocket upgrades the HTTP connection and starts the read/write pumps.
+// HandleWebSocket authenticates the request, upgrades the HTTP connection, and
+// hands ownership of the connection off to the service.
 func (w *WebSocketController) HandleWebSocket(c *gin.Context) {
 	if !session.IsLogin(c) {
 		logger.Warningf("Unauthorized WebSocket connection attempt from %s", getRemoteIp(c))
@@ -82,71 +75,5 @@ func (w *WebSocketController) HandleWebSocket(c *gin.Context) {
 		return
 	}
 
-	client := websocket.NewClient(uuid.New().String())
-	w.hub.Register(client)
-	logger.Debugf("WebSocket client %s registered from %s", client.ID, getRemoteIp(c))
-
-	go w.writePump(client, conn)
-	go w.readPump(client, conn)
-}
-
-// readPump consumes inbound frames so the gorilla deadline/pong machinery keeps
-// running. Clients send no commands today; frames are discarded.
-func (w *WebSocketController) readPump(client *websocket.Client, conn *ws.Conn) {
-	defer func() {
-		if r := common.Recover("WebSocket readPump panic"); r != nil {
-			logger.Error("WebSocket readPump panic recovered:", r)
-		}
-		w.hub.Unregister(client)
-		conn.Close()
-	}()
-
-	conn.SetReadLimit(clientReadLimit)
-	conn.SetReadDeadline(time.Now().Add(pongWait))
-	conn.SetPongHandler(func(string) error {
-		return conn.SetReadDeadline(time.Now().Add(pongWait))
-	})
-
-	for {
-		if _, _, err := conn.ReadMessage(); err != nil {
-			if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
-				logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
-			}
-			return
-		}
-	}
-}
-
-// writePump pushes hub messages to the connection and emits keepalive pings.
-func (w *WebSocketController) writePump(client *websocket.Client, conn *ws.Conn) {
-	ticker := time.NewTicker(pingPeriod)
-	defer func() {
-		if r := common.Recover("WebSocket writePump panic"); r != nil {
-			logger.Error("WebSocket writePump panic recovered:", r)
-		}
-		ticker.Stop()
-		conn.Close()
-	}()
-
-	for {
-		select {
-		case msg, ok := <-client.Send:
-			conn.SetWriteDeadline(time.Now().Add(writeWait))
-			if !ok {
-				conn.WriteMessage(ws.CloseMessage, []byte{})
-				return
-			}
-			if err := conn.WriteMessage(ws.TextMessage, msg); err != nil {
-				logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
-				return
-			}
-
-		case <-ticker.C:
-			conn.SetWriteDeadline(time.Now().Add(writeWait))
-			if err := conn.WriteMessage(ws.PingMessage, nil); err != nil {
-				logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err)
-				return
-			}
-		}
-	}
+	w.service.HandleConnection(conn, getRemoteIp(c))
 }

+ 115 - 0
web/service/websocket.go

@@ -0,0 +1,115 @@
+// Package service: WebSocketService owns the per-connection pump goroutines
+// and bridges the HTTP-layer controller to the broadcast hub. The controller
+// handles the upgrade handshake and authentication, then hands the raw
+// connection to this service which takes ownership of its lifecycle.
+package service
+
+import (
+	"time"
+
+	"github.com/mhsanaei/3x-ui/v2/logger"
+	"github.com/mhsanaei/3x-ui/v2/util/common"
+	"github.com/mhsanaei/3x-ui/v2/web/websocket"
+
+	"github.com/google/uuid"
+	ws "github.com/gorilla/websocket"
+)
+
+const (
+	wsWriteWait       = 10 * time.Second
+	wsPongWait        = 60 * time.Second
+	wsPingPeriod      = (wsPongWait * 9) / 10
+	wsClientReadLimit = 512
+)
+
+// WebSocketService manages WebSocket client connections. It owns the
+// read/write pumps for each accepted connection and registers/unregisters
+// clients with the hub.
+type WebSocketService struct {
+	hub *websocket.Hub
+}
+
+// NewWebSocketService creates a service backed by the given hub.
+func NewWebSocketService(hub *websocket.Hub) *WebSocketService {
+	return &WebSocketService{hub: hub}
+}
+
+// HandleConnection takes ownership of an upgraded WebSocket connection:
+// registers a new client, starts the read/write pumps, and returns
+// immediately. The connection is closed when both pumps exit.
+func (s *WebSocketService) HandleConnection(conn *ws.Conn, remoteIP string) {
+	if s == nil || s.hub == nil || conn == nil {
+		if conn != nil {
+			conn.Close()
+		}
+		return
+	}
+
+	client := websocket.NewClient(uuid.New().String())
+	s.hub.Register(client)
+	logger.Debugf("WebSocket client %s registered from %s", client.ID, remoteIP)
+
+	go s.writePump(client, conn)
+	go s.readPump(client, conn)
+}
+
+// readPump consumes inbound frames so the gorilla deadline/pong machinery keeps
+// running. Clients send no commands today; frames are discarded.
+func (s *WebSocketService) readPump(client *websocket.Client, conn *ws.Conn) {
+	defer func() {
+		if r := common.Recover("WebSocket readPump panic"); r != nil {
+			logger.Error("WebSocket readPump panic recovered:", r)
+		}
+		s.hub.Unregister(client)
+		conn.Close()
+	}()
+
+	conn.SetReadLimit(wsClientReadLimit)
+	conn.SetReadDeadline(time.Now().Add(wsPongWait))
+	conn.SetPongHandler(func(string) error {
+		return conn.SetReadDeadline(time.Now().Add(wsPongWait))
+	})
+
+	for {
+		if _, _, err := conn.ReadMessage(); err != nil {
+			if ws.IsUnexpectedCloseError(err, ws.CloseGoingAway, ws.CloseAbnormalClosure) {
+				logger.Debugf("WebSocket read error for client %s: %v", client.ID, err)
+			}
+			return
+		}
+	}
+}
+
+// writePump pushes hub messages to the connection and emits keepalive pings.
+func (s *WebSocketService) writePump(client *websocket.Client, conn *ws.Conn) {
+	ticker := time.NewTicker(wsPingPeriod)
+	defer func() {
+		if r := common.Recover("WebSocket writePump panic"); r != nil {
+			logger.Error("WebSocket writePump panic recovered:", r)
+		}
+		ticker.Stop()
+		conn.Close()
+	}()
+
+	for {
+		select {
+		case msg, ok := <-client.Send:
+			conn.SetWriteDeadline(time.Now().Add(wsWriteWait))
+			if !ok {
+				conn.WriteMessage(ws.CloseMessage, []byte{})
+				return
+			}
+			if err := conn.WriteMessage(ws.TextMessage, msg); err != nil {
+				logger.Debugf("WebSocket write error for client %s: %v", client.ID, err)
+				return
+			}
+
+		case <-ticker.C:
+			conn.SetWriteDeadline(time.Now().Add(wsWriteWait))
+			if err := conn.WriteMessage(ws.PingMessage, nil); err != nil {
+				logger.Debugf("WebSocket ping error for client %s: %v", client.ID, err)
+				return
+			}
+		}
+	}
+}

+ 3 - 2
web/web.go

@@ -289,8 +289,9 @@ func (s *Server) initRouter() (*gin.Engine, error) {
 	s.wsHub = websocket.NewHub()
 	go s.wsHub.Run()
 
-	// Initialize WebSocket controller
-	s.ws = controller.NewWebSocketController(s.wsHub)
+	// Initialize WebSocket controller — service owns per-connection pumps,
+	// controller is HTTP-layer only (auth + upgrade).
+	s.ws = controller.NewWebSocketController(service.NewWebSocketService(s.wsHub))
 	// Register WebSocket route with basePath (g already has basePath prefix)
 	g.GET("/ws", s.ws.HandleWebSocket)