hub.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. // Package websocket provides a WebSocket hub for real-time updates and notifications.
  2. package websocket
  3. import (
  4. "context"
  5. "encoding/json"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v2/logger"
  9. )
  10. // MessageType identifies the kind of WebSocket message.
  11. type MessageType string
  12. const (
  13. MessageTypeStatus MessageType = "status"
  14. MessageTypeTraffic MessageType = "traffic"
  15. MessageTypeInbounds MessageType = "inbounds"
  16. MessageTypeOutbounds MessageType = "outbounds"
  17. MessageTypeNodes MessageType = "nodes"
  18. MessageTypeNotification MessageType = "notification"
  19. MessageTypeXrayState MessageType = "xray_state"
  20. // MessageTypeClientStats carries absolute traffic counters for the clients
  21. // that had activity in the latest collection window. Frontend applies these
  22. // in-place — far smaller than re-broadcasting the full inbound list and
  23. // scales to 10k+ clients without falling back to REST.
  24. MessageTypeClientStats MessageType = "client_stats"
  25. MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort).
  26. // maxMessageSize caps the WebSocket payload. Beyond this the hub sends a
  27. // lightweight invalidate signal and the frontend re-fetches via REST.
  28. // 10MB lets typical 2k–8k-client deployments push directly via WS (low
  29. // latency); larger installs fall back to invalidate.
  30. maxMessageSize = 10 * 1024 * 1024 // 10MB
  31. enqueueTimeout = 100 * time.Millisecond
  32. clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
  33. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
  34. hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
  35. // minBroadcastInterval throttles per-type broadcasts so cron storms or
  36. // rapid mutations cannot drown the hub. Bursts within the interval are
  37. // dropped (not coalesced); the next broadcast outside the window delivers
  38. // the latest state. Only message types in throttledMessageTypes are gated —
  39. // heartbeat and real-time signals (status, traffic, client_stats,
  40. // notification, xray_state, invalidate) bypass this so they are never delayed.
  41. minBroadcastInterval = 250 * time.Millisecond
  42. // hubRestartAttempts caps panic-recovery restarts. After this many
  43. // consecutive failures we stop trying and log; the panel keeps running
  44. // (frontend falls back to REST polling) and the operator can investigate.
  45. hubRestartAttempts = 3
  46. )
  47. // NewClient builds a Client ready for hub registration.
  48. func NewClient(id string) *Client {
  49. return &Client{
  50. ID: id,
  51. Send: make(chan []byte, clientSendQueue),
  52. }
  53. }
  54. // Message is the wire format sent to clients.
  55. type Message struct {
  56. Type MessageType `json:"type"`
  57. Payload any `json:"payload"`
  58. Time int64 `json:"time"`
  59. }
  60. // Client represents a single WebSocket connection.
  61. type Client struct {
  62. ID string
  63. Send chan []byte
  64. closeOnce sync.Once
  65. }
  66. // Hub fan-outs messages to all connected clients.
  67. type Hub struct {
  68. clients map[*Client]struct{}
  69. broadcast chan []byte
  70. register chan *Client
  71. unregister chan *Client
  72. mu sync.RWMutex
  73. ctx context.Context
  74. cancel context.CancelFunc
  75. throttleMu sync.Mutex
  76. lastBroadcast map[MessageType]time.Time
  77. }
  78. // NewHub creates a hub. Call Run in a goroutine to start its event loop.
  79. func NewHub() *Hub {
  80. ctx, cancel := context.WithCancel(context.Background())
  81. return &Hub{
  82. clients: make(map[*Client]struct{}),
  83. broadcast: make(chan []byte, hubBroadcastQueue),
  84. register: make(chan *Client, hubControlQueue),
  85. unregister: make(chan *Client, hubControlQueue),
  86. ctx: ctx,
  87. cancel: cancel,
  88. lastBroadcast: make(map[MessageType]time.Time),
  89. }
  90. }
  91. // throttledMessageTypes is the explicit allow-list of message types subject to
  92. // the per-type rate limit. Everything else (status, traffic, client_stats,
  93. // notification, xray_state, invalidate) is heartbeat- or signal-class and must
  94. // not be delayed. Keeping the set explicit (vs. an exclusion list) makes the
  95. // intent obvious when new message types are added — by default they bypass.
  96. var throttledMessageTypes = map[MessageType]struct{}{
  97. MessageTypeInbounds: {},
  98. MessageTypeOutbounds: {},
  99. }
  100. // shouldThrottle returns true if a broadcast of msgType is rate-limited and
  101. // happened within minBroadcastInterval of the previous one. Only message types
  102. // in throttledMessageTypes are gated.
  103. func (h *Hub) shouldThrottle(msgType MessageType) bool {
  104. if _, gated := throttledMessageTypes[msgType]; !gated {
  105. return false
  106. }
  107. h.throttleMu.Lock()
  108. defer h.throttleMu.Unlock()
  109. now := time.Now()
  110. if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
  111. return true
  112. }
  113. h.lastBroadcast[msgType] = now
  114. return false
  115. }
  116. // Run drives the hub. The inner loop is wrapped in a panic-recovery harness
  117. // that retries up to hubRestartAttempts times with backoff so a transient
  118. // panic doesn't permanently kill real-time updates for commercial deployments.
  119. // After the cap, the hub stays down and the frontend falls back to REST polling.
  120. func (h *Hub) Run() {
  121. for attempt := 0; attempt < hubRestartAttempts; attempt++ {
  122. stopped := h.runOnce()
  123. if stopped {
  124. return
  125. }
  126. if attempt < hubRestartAttempts-1 {
  127. wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
  128. logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
  129. select {
  130. case <-time.After(wait):
  131. case <-h.ctx.Done():
  132. return
  133. }
  134. }
  135. }
  136. logger.Error("WebSocket hub stopped after exhausting restart attempts")
  137. }
  138. // runOnce drives the event loop once and returns true if the hub stopped
  139. // cleanly (context cancelled). On panic, recover logs and returns false so
  140. // Run can decide whether to retry.
  141. func (h *Hub) runOnce() (stopped bool) {
  142. defer func() {
  143. if r := recover(); r != nil {
  144. logger.Errorf("WebSocket hub panic recovered: %v", r)
  145. stopped = false
  146. }
  147. }()
  148. for {
  149. select {
  150. case <-h.ctx.Done():
  151. h.shutdown()
  152. return true
  153. case c := <-h.register:
  154. if c == nil {
  155. continue
  156. }
  157. h.mu.Lock()
  158. h.clients[c] = struct{}{}
  159. n := len(h.clients)
  160. h.mu.Unlock()
  161. logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
  162. case c := <-h.unregister:
  163. if c == nil {
  164. continue
  165. }
  166. h.removeClient(c)
  167. case msg := <-h.broadcast:
  168. h.fanout(msg)
  169. }
  170. }
  171. }
  172. // shutdown closes all client send channels and clears the registry.
  173. func (h *Hub) shutdown() {
  174. h.mu.Lock()
  175. for c := range h.clients {
  176. c.closeOnce.Do(func() { close(c.Send) })
  177. }
  178. h.clients = make(map[*Client]struct{})
  179. h.mu.Unlock()
  180. logger.Info("WebSocket hub stopped")
  181. }
  182. // removeClient deletes a client and closes its send channel exactly once.
  183. func (h *Hub) removeClient(c *Client) {
  184. h.mu.Lock()
  185. if _, ok := h.clients[c]; ok {
  186. delete(h.clients, c)
  187. c.closeOnce.Do(func() { close(c.Send) })
  188. }
  189. n := len(h.clients)
  190. h.mu.Unlock()
  191. logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
  192. }
  193. // fanout delivers msg to every client. Each send is non-blocking — a client
  194. // whose buffer is full is collected for direct removal at the end. We do NOT
  195. // route slow-client unregistrations through the unregister channel: under
  196. // burst load (panel restart, network blip) that channel can fill up while the
  197. // hub itself is the consumer, causing a self-deadlock.
  198. func (h *Hub) fanout(msg []byte) {
  199. if msg == nil {
  200. return
  201. }
  202. h.mu.RLock()
  203. if len(h.clients) == 0 {
  204. h.mu.RUnlock()
  205. return
  206. }
  207. targets := make([]*Client, 0, len(h.clients))
  208. for c := range h.clients {
  209. targets = append(targets, c)
  210. }
  211. h.mu.RUnlock()
  212. var dead []*Client
  213. for _, c := range targets {
  214. if !trySend(c, msg) {
  215. dead = append(dead, c)
  216. }
  217. }
  218. if len(dead) == 0 {
  219. return
  220. }
  221. h.mu.Lock()
  222. for _, c := range dead {
  223. if _, ok := h.clients[c]; ok {
  224. delete(h.clients, c)
  225. c.closeOnce.Do(func() { close(c.Send) })
  226. logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
  227. }
  228. }
  229. h.mu.Unlock()
  230. }
  231. // trySend performs a non-blocking write to the client's Send channel.
  232. // Returns false if the client should be evicted (full buffer or closed channel).
  233. // A defer-recover guards against the rare race where the channel was closed
  234. // concurrently — sending on a closed channel always panics, even with select+default.
  235. func trySend(c *Client, msg []byte) (ok bool) {
  236. defer func() {
  237. if r := recover(); r != nil {
  238. ok = false
  239. }
  240. }()
  241. select {
  242. case c.Send <- msg:
  243. return true
  244. default:
  245. return false
  246. }
  247. }
  248. // Broadcast serializes payload and queues it for delivery to all clients.
  249. // If the serialized message exceeds maxMessageSize, an invalidate signal is
  250. // queued instead so the frontend re-fetches via REST. Broadcasts of throttled
  251. // message types (see throttledMessageTypes) within minBroadcastInterval of
  252. // the previous one are dropped — the next legitimate mutation will push the
  253. // fresh state.
  254. func (h *Hub) Broadcast(messageType MessageType, payload any) {
  255. if h == nil || payload == nil || h.GetClientCount() == 0 {
  256. return
  257. }
  258. if h.shouldThrottle(messageType) {
  259. return
  260. }
  261. data, err := json.Marshal(Message{
  262. Type: messageType,
  263. Payload: payload,
  264. Time: time.Now().UnixMilli(),
  265. })
  266. if err != nil {
  267. logger.Error("WebSocket marshal failed:", err)
  268. return
  269. }
  270. if len(data) > maxMessageSize {
  271. logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
  272. h.broadcastInvalidate(messageType)
  273. return
  274. }
  275. h.enqueue(data)
  276. }
  277. // broadcastInvalidate queues a lightweight signal telling clients to re-fetch
  278. // the named data type via REST.
  279. func (h *Hub) broadcastInvalidate(originalType MessageType) {
  280. data, err := json.Marshal(Message{
  281. Type: MessageTypeInvalidate,
  282. Payload: map[string]string{"type": string(originalType)},
  283. Time: time.Now().UnixMilli(),
  284. })
  285. if err != nil {
  286. logger.Error("WebSocket invalidate marshal failed:", err)
  287. return
  288. }
  289. h.enqueue(data)
  290. }
  291. // enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
  292. // (channel full for >100ms) or shutdown.
  293. func (h *Hub) enqueue(data []byte) {
  294. select {
  295. case h.broadcast <- data:
  296. case <-time.After(enqueueTimeout):
  297. logger.Warning("WebSocket broadcast channel full, dropping message")
  298. case <-h.ctx.Done():
  299. }
  300. }
  301. // GetClientCount returns the number of connected clients.
  302. func (h *Hub) GetClientCount() int {
  303. if h == nil {
  304. return 0
  305. }
  306. h.mu.RLock()
  307. defer h.mu.RUnlock()
  308. return len(h.clients)
  309. }
  310. // Register adds a client to the hub.
  311. func (h *Hub) Register(c *Client) {
  312. if h == nil || c == nil {
  313. return
  314. }
  315. select {
  316. case h.register <- c:
  317. case <-h.ctx.Done():
  318. }
  319. }
  320. // Unregister removes a client from the hub. Fast path queues for the hub
  321. // goroutine; if the channel is saturated (disconnect storm) we fall back
  322. // to a direct removal under the write lock so dead clients aren't left in
  323. // the registry waiting for their Send buffer to fill (minutes of wasted
  324. // fanout work at low broadcast rates).
  325. //
  326. // Direct removal is safe from any caller: external goroutines (read/write
  327. // pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
  328. // when it calls Unregister — fanout releases its RLock before per-client
  329. // sends, so we can't self-deadlock here.
  330. func (h *Hub) Unregister(c *Client) {
  331. if h == nil || c == nil {
  332. return
  333. }
  334. select {
  335. case h.unregister <- c:
  336. default:
  337. h.removeClient(c)
  338. }
  339. }
  340. // Stop signals the hub to shut down and close all client connections.
  341. func (h *Hub) Stop() {
  342. if h != nil && h.cancel != nil {
  343. h.cancel()
  344. }
  345. }