hub.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. // Package websocket provides a WebSocket hub for real-time updates and notifications.
  2. package websocket
  3. import (
  4. "context"
  5. "encoding/json"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/logger"
  9. )
  10. // MessageType identifies the kind of WebSocket message.
  11. type MessageType string
  12. const (
  13. MessageTypeStatus MessageType = "status"
  14. MessageTypeTraffic MessageType = "traffic"
  15. MessageTypeInbounds MessageType = "inbounds"
  16. MessageTypeOutbounds MessageType = "outbounds"
  17. MessageTypeNodes MessageType = "nodes"
  18. MessageTypeNotification MessageType = "notification"
  19. MessageTypeXrayState MessageType = "xray_state"
  20. MessageTypeClientStats MessageType = "client_stats"
  21. MessageTypeClients MessageType = "clients"
  22. MessageTypeInvalidate MessageType = "invalidate"
  23. maxMessageSize = 10 * 1024 * 1024 // 10MB
  24. enqueueTimeout = 100 * time.Millisecond
  25. clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
  26. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
  27. hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
  28. minBroadcastInterval = 250 * time.Millisecond
  29. hubRestartAttempts = 3
  30. )
  31. // NewClient builds a Client ready for hub registration.
  32. func NewClient(id string) *Client {
  33. return &Client{
  34. ID: id,
  35. Send: make(chan []byte, clientSendQueue),
  36. }
  37. }
  38. // Message is the wire format sent to clients.
  39. type Message struct {
  40. Type MessageType `json:"type"`
  41. Payload any `json:"payload"`
  42. Time int64 `json:"time"`
  43. }
  44. // Client represents a single WebSocket connection.
  45. type Client struct {
  46. ID string
  47. Send chan []byte
  48. closeOnce sync.Once
  49. }
  50. // Hub fan-outs messages to all connected clients.
  51. type Hub struct {
  52. clients map[*Client]struct{}
  53. broadcast chan []byte
  54. register chan *Client
  55. unregister chan *Client
  56. mu sync.RWMutex
  57. ctx context.Context
  58. cancel context.CancelFunc
  59. throttleMu sync.Mutex
  60. lastBroadcast map[MessageType]time.Time
  61. }
  62. // NewHub creates a hub. Call Run in a goroutine to start its event loop.
  63. func NewHub() *Hub {
  64. ctx, cancel := context.WithCancel(context.Background())
  65. return &Hub{
  66. clients: make(map[*Client]struct{}),
  67. broadcast: make(chan []byte, hubBroadcastQueue),
  68. register: make(chan *Client, hubControlQueue),
  69. unregister: make(chan *Client, hubControlQueue),
  70. ctx: ctx,
  71. cancel: cancel,
  72. lastBroadcast: make(map[MessageType]time.Time),
  73. }
  74. }
  75. var throttledMessageTypes = map[MessageType]struct{}{
  76. MessageTypeInbounds: {},
  77. MessageTypeOutbounds: {},
  78. MessageTypeTraffic: {},
  79. MessageTypeClientStats: {},
  80. }
  81. func (h *Hub) shouldThrottle(msgType MessageType) bool {
  82. if _, gated := throttledMessageTypes[msgType]; !gated {
  83. return false
  84. }
  85. h.throttleMu.Lock()
  86. defer h.throttleMu.Unlock()
  87. now := time.Now()
  88. if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
  89. return true
  90. }
  91. h.lastBroadcast[msgType] = now
  92. return false
  93. }
  94. // Run drives the hub. The inner loop is wrapped in a panic-recovery harness
  95. // that retries up to hubRestartAttempts times with backoff so a transient
  96. // panic doesn't permanently kill real-time updates for commercial deployments.
  97. // After the cap, the hub stays down and the frontend falls back to REST polling.
  98. func (h *Hub) Run() {
  99. for attempt := range hubRestartAttempts {
  100. stopped := h.runOnce()
  101. if stopped {
  102. return
  103. }
  104. if attempt < hubRestartAttempts-1 {
  105. wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
  106. logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
  107. select {
  108. case <-time.After(wait):
  109. case <-h.ctx.Done():
  110. return
  111. }
  112. }
  113. }
  114. logger.Error("WebSocket hub stopped after exhausting restart attempts")
  115. }
  116. // runOnce drives the event loop once and returns true if the hub stopped
  117. // cleanly (context cancelled). On panic, recover logs and returns false so
  118. // Run can decide whether to retry.
  119. func (h *Hub) runOnce() (stopped bool) {
  120. defer func() {
  121. if r := recover(); r != nil {
  122. logger.Errorf("WebSocket hub panic recovered: %v", r)
  123. stopped = false
  124. }
  125. }()
  126. for {
  127. select {
  128. case <-h.ctx.Done():
  129. h.shutdown()
  130. return true
  131. case c := <-h.register:
  132. if c == nil {
  133. continue
  134. }
  135. h.mu.Lock()
  136. h.clients[c] = struct{}{}
  137. n := len(h.clients)
  138. h.mu.Unlock()
  139. logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
  140. case c := <-h.unregister:
  141. if c == nil {
  142. continue
  143. }
  144. h.removeClient(c)
  145. case msg := <-h.broadcast:
  146. h.fanout(msg)
  147. }
  148. }
  149. }
  150. // shutdown closes all client send channels and clears the registry.
  151. func (h *Hub) shutdown() {
  152. h.mu.Lock()
  153. for c := range h.clients {
  154. c.closeOnce.Do(func() { close(c.Send) })
  155. }
  156. h.clients = make(map[*Client]struct{})
  157. h.mu.Unlock()
  158. logger.Info("WebSocket hub stopped")
  159. }
  160. // removeClient deletes a client and closes its send channel exactly once.
  161. func (h *Hub) removeClient(c *Client) {
  162. h.mu.Lock()
  163. if _, ok := h.clients[c]; ok {
  164. delete(h.clients, c)
  165. c.closeOnce.Do(func() { close(c.Send) })
  166. }
  167. n := len(h.clients)
  168. h.mu.Unlock()
  169. logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
  170. }
  171. // fanout delivers msg to every client. Each send is non-blocking — a client
  172. // whose buffer is full is collected for direct removal at the end. We do NOT
  173. // route slow-client unregistrations through the unregister channel: under
  174. // burst load (panel restart, network blip) that channel can fill up while the
  175. // hub itself is the consumer, causing a self-deadlock.
  176. func (h *Hub) fanout(msg []byte) {
  177. if msg == nil {
  178. return
  179. }
  180. h.mu.RLock()
  181. if len(h.clients) == 0 {
  182. h.mu.RUnlock()
  183. return
  184. }
  185. targets := make([]*Client, 0, len(h.clients))
  186. for c := range h.clients {
  187. targets = append(targets, c)
  188. }
  189. h.mu.RUnlock()
  190. var dead []*Client
  191. for _, c := range targets {
  192. if !trySend(c, msg) {
  193. dead = append(dead, c)
  194. }
  195. }
  196. if len(dead) == 0 {
  197. return
  198. }
  199. h.mu.Lock()
  200. for _, c := range dead {
  201. if _, ok := h.clients[c]; ok {
  202. delete(h.clients, c)
  203. c.closeOnce.Do(func() { close(c.Send) })
  204. logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
  205. }
  206. }
  207. h.mu.Unlock()
  208. }
  209. // trySend performs a non-blocking write to the client's Send channel.
  210. // Returns false if the client should be evicted (full buffer or closed channel).
  211. // A defer-recover guards against the rare race where the channel was closed
  212. // concurrently — sending on a closed channel always panics, even with select+default.
  213. func trySend(c *Client, msg []byte) (ok bool) {
  214. defer func() {
  215. if r := recover(); r != nil {
  216. ok = false
  217. }
  218. }()
  219. select {
  220. case c.Send <- msg:
  221. return true
  222. default:
  223. return false
  224. }
  225. }
  226. // Broadcast serializes payload and queues it for delivery to all clients.
  227. // If the serialized message exceeds maxMessageSize, an invalidate signal is
  228. // queued instead so the frontend re-fetches via REST. Broadcasts of throttled
  229. // message types (see throttledMessageTypes) within minBroadcastInterval of
  230. // the previous one are dropped — the next legitimate mutation will push the
  231. // fresh state.
  232. func (h *Hub) Broadcast(messageType MessageType, payload any) {
  233. if h == nil || payload == nil || h.GetClientCount() == 0 {
  234. return
  235. }
  236. if h.shouldThrottle(messageType) {
  237. return
  238. }
  239. data, err := json.Marshal(Message{
  240. Type: messageType,
  241. Payload: payload,
  242. Time: time.Now().UnixMilli(),
  243. })
  244. if err != nil {
  245. logger.Error("WebSocket marshal failed:", err)
  246. return
  247. }
  248. if len(data) > maxMessageSize {
  249. logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
  250. h.broadcastInvalidate(messageType)
  251. return
  252. }
  253. h.enqueue(data)
  254. }
  255. // broadcastInvalidate queues a lightweight signal telling clients to re-fetch
  256. // the named data type via REST.
  257. func (h *Hub) broadcastInvalidate(originalType MessageType) {
  258. data, err := json.Marshal(Message{
  259. Type: MessageTypeInvalidate,
  260. Payload: map[string]string{"type": string(originalType)},
  261. Time: time.Now().UnixMilli(),
  262. })
  263. if err != nil {
  264. logger.Error("WebSocket invalidate marshal failed:", err)
  265. return
  266. }
  267. h.enqueue(data)
  268. }
  269. // enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
  270. // (channel full for >100ms) or shutdown.
  271. func (h *Hub) enqueue(data []byte) {
  272. select {
  273. case h.broadcast <- data:
  274. case <-time.After(enqueueTimeout):
  275. logger.Warning("WebSocket broadcast channel full, dropping message")
  276. case <-h.ctx.Done():
  277. }
  278. }
  279. // GetClientCount returns the number of connected clients.
  280. func (h *Hub) GetClientCount() int {
  281. if h == nil {
  282. return 0
  283. }
  284. h.mu.RLock()
  285. defer h.mu.RUnlock()
  286. return len(h.clients)
  287. }
  288. // Register adds a client to the hub.
  289. func (h *Hub) Register(c *Client) {
  290. if h == nil || c == nil {
  291. return
  292. }
  293. select {
  294. case h.register <- c:
  295. case <-h.ctx.Done():
  296. }
  297. }
  298. // Unregister removes a client from the hub. Fast path queues for the hub
  299. // goroutine; if the channel is saturated (disconnect storm) we fall back
  300. // to a direct removal under the write lock so dead clients aren't left in
  301. // the registry waiting for their Send buffer to fill (minutes of wasted
  302. // fanout work at low broadcast rates).
  303. //
  304. // Direct removal is safe from any caller: external goroutines (read/write
  305. // pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
  306. // when it calls Unregister — fanout releases its RLock before per-client
  307. // sends, so we can't self-deadlock here.
  308. func (h *Hub) Unregister(c *Client) {
  309. if h == nil || c == nil {
  310. return
  311. }
  312. select {
  313. case h.unregister <- c:
  314. default:
  315. h.removeClient(c)
  316. }
  317. }
  318. // Stop signals the hub to shut down and close all client connections.
  319. func (h *Hub) Stop() {
  320. if h != nil && h.cancel != nil {
  321. h.cancel()
  322. }
  323. }