// Package websocket provides a WebSocket hub for real-time updates and notifications. package websocket import ( "context" "encoding/json" "sync" "time" "github.com/mhsanaei/3x-ui/v3/logger" ) // MessageType identifies the kind of WebSocket message. type MessageType string const ( MessageTypeStatus MessageType = "status" MessageTypeTraffic MessageType = "traffic" MessageTypeInbounds MessageType = "inbounds" MessageTypeOutbounds MessageType = "outbounds" MessageTypeNodes MessageType = "nodes" MessageTypeNotification MessageType = "notification" MessageTypeXrayState MessageType = "xray_state" MessageTypeClientStats MessageType = "client_stats" MessageTypeClients MessageType = "clients" MessageTypeInvalidate MessageType = "invalidate" maxMessageSize = 10 * 1024 * 1024 // 10MB enqueueTimeout = 100 * time.Millisecond clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts. hubOpsQueue = 128 // Backlog for register+unregister bursts (page reloads, disconnect storms). minBroadcastInterval = 250 * time.Millisecond hubRestartAttempts = 3 ) type clientOpKind int const ( opRegister clientOpKind = iota opUnregister ) type clientOp struct { kind clientOpKind c *Client } // NewClient builds a Client ready for hub registration. func NewClient(id string) *Client { return &Client{ ID: id, Send: make(chan []byte, clientSendQueue), } } // Message is the wire format sent to clients. type Message struct { Type MessageType `json:"type"` Payload any `json:"payload"` Time int64 `json:"time"` } // Client represents a single WebSocket connection. type Client struct { ID string Send chan []byte closeOnce sync.Once } // Hub fan-outs messages to all connected clients. type Hub struct { clients map[*Client]struct{} broadcast chan []byte ops chan clientOp mu sync.RWMutex ctx context.Context cancel context.CancelFunc throttleMu sync.Mutex lastBroadcast map[MessageType]time.Time } // NewHub creates a hub. Call Run in a goroutine to start its event loop. func NewHub() *Hub { ctx, cancel := context.WithCancel(context.Background()) return &Hub{ clients: make(map[*Client]struct{}), broadcast: make(chan []byte, hubBroadcastQueue), ops: make(chan clientOp, hubOpsQueue), ctx: ctx, cancel: cancel, lastBroadcast: make(map[MessageType]time.Time), } } var throttledMessageTypes = map[MessageType]struct{}{ MessageTypeInbounds: {}, MessageTypeOutbounds: {}, MessageTypeTraffic: {}, MessageTypeClientStats: {}, } func (h *Hub) shouldThrottle(msgType MessageType) bool { if _, gated := throttledMessageTypes[msgType]; !gated { return false } h.throttleMu.Lock() defer h.throttleMu.Unlock() now := time.Now() if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval { return true } h.lastBroadcast[msgType] = now return false } // Run drives the hub. The inner loop is wrapped in a panic-recovery harness // that retries up to hubRestartAttempts times with backoff so a transient // panic doesn't permanently kill real-time updates for commercial deployments. // After the cap, the hub stays down and the frontend falls back to REST polling. func (h *Hub) Run() { for attempt := range hubRestartAttempts { stopped := h.runOnce() if stopped { return } if attempt < hubRestartAttempts-1 { wait := time.Duration(1< maxMessageSize { logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType) h.broadcastInvalidate(messageType) return } h.enqueue(data) } // broadcastInvalidate queues a lightweight signal telling clients to re-fetch // the named data type via REST. func (h *Hub) broadcastInvalidate(originalType MessageType) { data, err := json.Marshal(Message{ Type: MessageTypeInvalidate, Payload: map[string]string{"type": string(originalType)}, Time: time.Now().UnixMilli(), }) if err != nil { logger.Error("WebSocket invalidate marshal failed:", err) return } h.enqueue(data) } // enqueue submits raw bytes to the broadcast channel. Dropped on backpressure // (channel full for >100ms) or shutdown. func (h *Hub) enqueue(data []byte) { select { case h.broadcast <- data: case <-time.After(enqueueTimeout): logger.Warning("WebSocket broadcast channel full, dropping message") case <-h.ctx.Done(): } } // GetClientCount returns the number of connected clients. func (h *Hub) GetClientCount() int { if h == nil { return 0 } h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) } // Register adds a client to the hub. func (h *Hub) Register(c *Client) { if h == nil || c == nil { return } select { case h.ops <- clientOp{kind: opRegister, c: c}: case <-h.ctx.Done(): } } // Unregister removes a client from the hub. Sends through the same ordered // ops channel as Register so a register-then-unregister sequence from one // goroutine is processed in program order — otherwise an unregister could // land in the map before its register and silently no-op, leaking the entry. // // On a saturated ops channel (disconnect storm) we fall back to a bounded // timeout drop rather than direct removal: a direct delete on a not-yet- // registered client is precisely the ordering bug we fix here. Stragglers // get evicted by fanout when their Send buffer fills. func (h *Hub) Unregister(c *Client) { if h == nil || c == nil { return } select { case h.ops <- clientOp{kind: opUnregister, c: c}: case <-time.After(enqueueTimeout): logger.Warningf("WebSocket ops channel full, dropping unregister for %s", c.ID) case <-h.ctx.Done(): } } // Stop signals the hub to shut down and close all client connections. func (h *Hub) Stop() { if h != nil && h.cancel != nil { h.cancel() } }