hub.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. // Package websocket provides a WebSocket hub for real-time updates and notifications.
  2. package websocket
  3. import (
  4. "context"
  5. "encoding/json"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/logger"
  9. )
  10. // MessageType identifies the kind of WebSocket message.
  11. type MessageType string
  12. const (
  13. MessageTypeStatus MessageType = "status"
  14. MessageTypeTraffic MessageType = "traffic"
  15. MessageTypeInbounds MessageType = "inbounds"
  16. MessageTypeOutbounds MessageType = "outbounds"
  17. MessageTypeNodes MessageType = "nodes"
  18. MessageTypeNotification MessageType = "notification"
  19. MessageTypeXrayState MessageType = "xray_state"
  20. // MessageTypeClientStats carries absolute traffic counters for the clients
  21. // that had activity in the latest collection window. Frontend applies these
  22. // in-place — far smaller than re-broadcasting the full inbound list and
  23. // scales to 10k+ clients without falling back to REST.
  24. MessageTypeClientStats MessageType = "client_stats"
  25. MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort).
  26. // maxMessageSize caps the WebSocket payload. Beyond this the hub sends a
  27. // lightweight invalidate signal and the frontend re-fetches via REST.
  28. // 10MB lets typical 2k–8k-client deployments push directly via WS (low
  29. // latency); larger installs fall back to invalidate.
  30. maxMessageSize = 10 * 1024 * 1024 // 10MB
  31. enqueueTimeout = 100 * time.Millisecond
  32. clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
  33. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
  34. hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
  35. // minBroadcastInterval throttles per-type broadcasts so cron storms or
  36. // rapid mutations cannot drown the hub. Bursts within the interval are
  37. // dropped (not coalesced); the next broadcast outside the window delivers
  38. // the latest state. Only message types in throttledMessageTypes are gated —
  39. // heartbeat and one-shot signals (status, notification, xray_state,
  40. // invalidate) bypass this so they are never delayed.
  41. minBroadcastInterval = 250 * time.Millisecond
  42. // hubRestartAttempts caps panic-recovery restarts. After this many
  43. // consecutive failures we stop trying and log; the panel keeps running
  44. // (frontend falls back to REST polling) and the operator can investigate.
  45. hubRestartAttempts = 3
  46. )
  47. // NewClient builds a Client ready for hub registration.
  48. func NewClient(id string) *Client {
  49. return &Client{
  50. ID: id,
  51. Send: make(chan []byte, clientSendQueue),
  52. }
  53. }
  54. // Message is the wire format sent to clients.
  55. type Message struct {
  56. Type MessageType `json:"type"`
  57. Payload any `json:"payload"`
  58. Time int64 `json:"time"`
  59. }
  60. // Client represents a single WebSocket connection.
  61. type Client struct {
  62. ID string
  63. Send chan []byte
  64. closeOnce sync.Once
  65. }
  66. // Hub fan-outs messages to all connected clients.
  67. type Hub struct {
  68. clients map[*Client]struct{}
  69. broadcast chan []byte
  70. register chan *Client
  71. unregister chan *Client
  72. mu sync.RWMutex
  73. ctx context.Context
  74. cancel context.CancelFunc
  75. throttleMu sync.Mutex
  76. lastBroadcast map[MessageType]time.Time
  77. }
  78. // NewHub creates a hub. Call Run in a goroutine to start its event loop.
  79. func NewHub() *Hub {
  80. ctx, cancel := context.WithCancel(context.Background())
  81. return &Hub{
  82. clients: make(map[*Client]struct{}),
  83. broadcast: make(chan []byte, hubBroadcastQueue),
  84. register: make(chan *Client, hubControlQueue),
  85. unregister: make(chan *Client, hubControlQueue),
  86. ctx: ctx,
  87. cancel: cancel,
  88. lastBroadcast: make(map[MessageType]time.Time),
  89. }
  90. }
  91. var throttledMessageTypes = map[MessageType]struct{}{
  92. MessageTypeInbounds: {},
  93. MessageTypeOutbounds: {},
  94. MessageTypeTraffic: {},
  95. MessageTypeClientStats: {},
  96. }
  97. func (h *Hub) shouldThrottle(msgType MessageType) bool {
  98. if _, gated := throttledMessageTypes[msgType]; !gated {
  99. return false
  100. }
  101. h.throttleMu.Lock()
  102. defer h.throttleMu.Unlock()
  103. now := time.Now()
  104. if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
  105. return true
  106. }
  107. h.lastBroadcast[msgType] = now
  108. return false
  109. }
  110. // Run drives the hub. The inner loop is wrapped in a panic-recovery harness
  111. // that retries up to hubRestartAttempts times with backoff so a transient
  112. // panic doesn't permanently kill real-time updates for commercial deployments.
  113. // After the cap, the hub stays down and the frontend falls back to REST polling.
  114. func (h *Hub) Run() {
  115. for attempt := 0; attempt < hubRestartAttempts; attempt++ {
  116. stopped := h.runOnce()
  117. if stopped {
  118. return
  119. }
  120. if attempt < hubRestartAttempts-1 {
  121. wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
  122. logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
  123. select {
  124. case <-time.After(wait):
  125. case <-h.ctx.Done():
  126. return
  127. }
  128. }
  129. }
  130. logger.Error("WebSocket hub stopped after exhausting restart attempts")
  131. }
  132. // runOnce drives the event loop once and returns true if the hub stopped
  133. // cleanly (context cancelled). On panic, recover logs and returns false so
  134. // Run can decide whether to retry.
  135. func (h *Hub) runOnce() (stopped bool) {
  136. defer func() {
  137. if r := recover(); r != nil {
  138. logger.Errorf("WebSocket hub panic recovered: %v", r)
  139. stopped = false
  140. }
  141. }()
  142. for {
  143. select {
  144. case <-h.ctx.Done():
  145. h.shutdown()
  146. return true
  147. case c := <-h.register:
  148. if c == nil {
  149. continue
  150. }
  151. h.mu.Lock()
  152. h.clients[c] = struct{}{}
  153. n := len(h.clients)
  154. h.mu.Unlock()
  155. logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
  156. case c := <-h.unregister:
  157. if c == nil {
  158. continue
  159. }
  160. h.removeClient(c)
  161. case msg := <-h.broadcast:
  162. h.fanout(msg)
  163. }
  164. }
  165. }
  166. // shutdown closes all client send channels and clears the registry.
  167. func (h *Hub) shutdown() {
  168. h.mu.Lock()
  169. for c := range h.clients {
  170. c.closeOnce.Do(func() { close(c.Send) })
  171. }
  172. h.clients = make(map[*Client]struct{})
  173. h.mu.Unlock()
  174. logger.Info("WebSocket hub stopped")
  175. }
  176. // removeClient deletes a client and closes its send channel exactly once.
  177. func (h *Hub) removeClient(c *Client) {
  178. h.mu.Lock()
  179. if _, ok := h.clients[c]; ok {
  180. delete(h.clients, c)
  181. c.closeOnce.Do(func() { close(c.Send) })
  182. }
  183. n := len(h.clients)
  184. h.mu.Unlock()
  185. logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
  186. }
  187. // fanout delivers msg to every client. Each send is non-blocking — a client
  188. // whose buffer is full is collected for direct removal at the end. We do NOT
  189. // route slow-client unregistrations through the unregister channel: under
  190. // burst load (panel restart, network blip) that channel can fill up while the
  191. // hub itself is the consumer, causing a self-deadlock.
  192. func (h *Hub) fanout(msg []byte) {
  193. if msg == nil {
  194. return
  195. }
  196. h.mu.RLock()
  197. if len(h.clients) == 0 {
  198. h.mu.RUnlock()
  199. return
  200. }
  201. targets := make([]*Client, 0, len(h.clients))
  202. for c := range h.clients {
  203. targets = append(targets, c)
  204. }
  205. h.mu.RUnlock()
  206. var dead []*Client
  207. for _, c := range targets {
  208. if !trySend(c, msg) {
  209. dead = append(dead, c)
  210. }
  211. }
  212. if len(dead) == 0 {
  213. return
  214. }
  215. h.mu.Lock()
  216. for _, c := range dead {
  217. if _, ok := h.clients[c]; ok {
  218. delete(h.clients, c)
  219. c.closeOnce.Do(func() { close(c.Send) })
  220. logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
  221. }
  222. }
  223. h.mu.Unlock()
  224. }
  225. // trySend performs a non-blocking write to the client's Send channel.
  226. // Returns false if the client should be evicted (full buffer or closed channel).
  227. // A defer-recover guards against the rare race where the channel was closed
  228. // concurrently — sending on a closed channel always panics, even with select+default.
  229. func trySend(c *Client, msg []byte) (ok bool) {
  230. defer func() {
  231. if r := recover(); r != nil {
  232. ok = false
  233. }
  234. }()
  235. select {
  236. case c.Send <- msg:
  237. return true
  238. default:
  239. return false
  240. }
  241. }
  242. // Broadcast serializes payload and queues it for delivery to all clients.
  243. // If the serialized message exceeds maxMessageSize, an invalidate signal is
  244. // queued instead so the frontend re-fetches via REST. Broadcasts of throttled
  245. // message types (see throttledMessageTypes) within minBroadcastInterval of
  246. // the previous one are dropped — the next legitimate mutation will push the
  247. // fresh state.
  248. func (h *Hub) Broadcast(messageType MessageType, payload any) {
  249. if h == nil || payload == nil || h.GetClientCount() == 0 {
  250. return
  251. }
  252. if h.shouldThrottle(messageType) {
  253. return
  254. }
  255. data, err := json.Marshal(Message{
  256. Type: messageType,
  257. Payload: payload,
  258. Time: time.Now().UnixMilli(),
  259. })
  260. if err != nil {
  261. logger.Error("WebSocket marshal failed:", err)
  262. return
  263. }
  264. if len(data) > maxMessageSize {
  265. logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
  266. h.broadcastInvalidate(messageType)
  267. return
  268. }
  269. h.enqueue(data)
  270. }
  271. // broadcastInvalidate queues a lightweight signal telling clients to re-fetch
  272. // the named data type via REST.
  273. func (h *Hub) broadcastInvalidate(originalType MessageType) {
  274. data, err := json.Marshal(Message{
  275. Type: MessageTypeInvalidate,
  276. Payload: map[string]string{"type": string(originalType)},
  277. Time: time.Now().UnixMilli(),
  278. })
  279. if err != nil {
  280. logger.Error("WebSocket invalidate marshal failed:", err)
  281. return
  282. }
  283. h.enqueue(data)
  284. }
  285. // enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
  286. // (channel full for >100ms) or shutdown.
  287. func (h *Hub) enqueue(data []byte) {
  288. select {
  289. case h.broadcast <- data:
  290. case <-time.After(enqueueTimeout):
  291. logger.Warning("WebSocket broadcast channel full, dropping message")
  292. case <-h.ctx.Done():
  293. }
  294. }
  295. // GetClientCount returns the number of connected clients.
  296. func (h *Hub) GetClientCount() int {
  297. if h == nil {
  298. return 0
  299. }
  300. h.mu.RLock()
  301. defer h.mu.RUnlock()
  302. return len(h.clients)
  303. }
  304. // Register adds a client to the hub.
  305. func (h *Hub) Register(c *Client) {
  306. if h == nil || c == nil {
  307. return
  308. }
  309. select {
  310. case h.register <- c:
  311. case <-h.ctx.Done():
  312. }
  313. }
  314. // Unregister removes a client from the hub. Fast path queues for the hub
  315. // goroutine; if the channel is saturated (disconnect storm) we fall back
  316. // to a direct removal under the write lock so dead clients aren't left in
  317. // the registry waiting for their Send buffer to fill (minutes of wasted
  318. // fanout work at low broadcast rates).
  319. //
  320. // Direct removal is safe from any caller: external goroutines (read/write
  321. // pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
  322. // when it calls Unregister — fanout releases its RLock before per-client
  323. // sends, so we can't self-deadlock here.
  324. func (h *Hub) Unregister(c *Client) {
  325. if h == nil || c == nil {
  326. return
  327. }
  328. select {
  329. case h.unregister <- c:
  330. default:
  331. h.removeClient(c)
  332. }
  333. }
  334. // Stop signals the hub to shut down and close all client connections.
  335. func (h *Hub) Stop() {
  336. if h != nil && h.cancel != nil {
  337. h.cancel()
  338. }
  339. }