hub.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. // Package websocket provides a WebSocket hub for real-time updates and notifications.
  2. package websocket
  3. import (
  4. "context"
  5. "encoding/json"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v2/logger"
  9. )
  10. // MessageType identifies the kind of WebSocket message.
  11. type MessageType string
  12. const (
  13. MessageTypeStatus MessageType = "status"
  14. MessageTypeTraffic MessageType = "traffic"
  15. MessageTypeInbounds MessageType = "inbounds"
  16. MessageTypeOutbounds MessageType = "outbounds"
  17. MessageTypeNotification MessageType = "notification"
  18. MessageTypeXrayState MessageType = "xray_state"
  19. // MessageTypeClientStats carries absolute traffic counters for the clients
  20. // that had activity in the latest collection window. Frontend applies these
  21. // in-place — far smaller than re-broadcasting the full inbound list and
  22. // scales to 10k+ clients without falling back to REST.
  23. MessageTypeClientStats MessageType = "client_stats"
  24. MessageTypeInvalidate MessageType = "invalidate" // Tells frontend to re-fetch via REST (last-resort).
  25. // maxMessageSize caps the WebSocket payload. Beyond this the hub sends a
  26. // lightweight invalidate signal and the frontend re-fetches via REST.
  27. // 10MB lets typical 2k–8k-client deployments push directly via WS (low
  28. // latency); larger installs fall back to invalidate.
  29. maxMessageSize = 10 * 1024 * 1024 // 10MB
  30. enqueueTimeout = 100 * time.Millisecond
  31. clientSendQueue = 512 // ~50s of buffering for a momentarily slow browser.
  32. hubBroadcastQueue = 2048 // Headroom for cron-storm + admin-mutation bursts.
  33. hubControlQueue = 64 // Backlog for register/unregister bursts (page reloads, disconnect storms).
  34. // minBroadcastInterval throttles per-type broadcasts so cron storms or
  35. // rapid mutations cannot drown the hub. Bursts within the interval are
  36. // dropped (not coalesced); the next broadcast outside the window delivers
  37. // the latest state. Only message types in throttledMessageTypes are gated —
  38. // heartbeat and real-time signals (status, traffic, client_stats,
  39. // notification, xray_state, invalidate) bypass this so they are never delayed.
  40. minBroadcastInterval = 250 * time.Millisecond
  41. // hubRestartAttempts caps panic-recovery restarts. After this many
  42. // consecutive failures we stop trying and log; the panel keeps running
  43. // (frontend falls back to REST polling) and the operator can investigate.
  44. hubRestartAttempts = 3
  45. )
  46. // NewClient builds a Client ready for hub registration.
  47. func NewClient(id string) *Client {
  48. return &Client{
  49. ID: id,
  50. Send: make(chan []byte, clientSendQueue),
  51. }
  52. }
  53. // Message is the wire format sent to clients.
  54. type Message struct {
  55. Type MessageType `json:"type"`
  56. Payload any `json:"payload"`
  57. Time int64 `json:"time"`
  58. }
  59. // Client represents a single WebSocket connection.
  60. type Client struct {
  61. ID string
  62. Send chan []byte
  63. closeOnce sync.Once
  64. }
  65. // Hub fan-outs messages to all connected clients.
  66. type Hub struct {
  67. clients map[*Client]struct{}
  68. broadcast chan []byte
  69. register chan *Client
  70. unregister chan *Client
  71. mu sync.RWMutex
  72. ctx context.Context
  73. cancel context.CancelFunc
  74. throttleMu sync.Mutex
  75. lastBroadcast map[MessageType]time.Time
  76. }
  77. // NewHub creates a hub. Call Run in a goroutine to start its event loop.
  78. func NewHub() *Hub {
  79. ctx, cancel := context.WithCancel(context.Background())
  80. return &Hub{
  81. clients: make(map[*Client]struct{}),
  82. broadcast: make(chan []byte, hubBroadcastQueue),
  83. register: make(chan *Client, hubControlQueue),
  84. unregister: make(chan *Client, hubControlQueue),
  85. ctx: ctx,
  86. cancel: cancel,
  87. lastBroadcast: make(map[MessageType]time.Time),
  88. }
  89. }
  90. // throttledMessageTypes is the explicit allow-list of message types subject to
  91. // the per-type rate limit. Everything else (status, traffic, client_stats,
  92. // notification, xray_state, invalidate) is heartbeat- or signal-class and must
  93. // not be delayed. Keeping the set explicit (vs. an exclusion list) makes the
  94. // intent obvious when new message types are added — by default they bypass.
  95. var throttledMessageTypes = map[MessageType]struct{}{
  96. MessageTypeInbounds: {},
  97. MessageTypeOutbounds: {},
  98. }
  99. // shouldThrottle returns true if a broadcast of msgType is rate-limited and
  100. // happened within minBroadcastInterval of the previous one. Only message types
  101. // in throttledMessageTypes are gated.
  102. func (h *Hub) shouldThrottle(msgType MessageType) bool {
  103. if _, gated := throttledMessageTypes[msgType]; !gated {
  104. return false
  105. }
  106. h.throttleMu.Lock()
  107. defer h.throttleMu.Unlock()
  108. now := time.Now()
  109. if last, ok := h.lastBroadcast[msgType]; ok && now.Sub(last) < minBroadcastInterval {
  110. return true
  111. }
  112. h.lastBroadcast[msgType] = now
  113. return false
  114. }
  115. // Run drives the hub. The inner loop is wrapped in a panic-recovery harness
  116. // that retries up to hubRestartAttempts times with backoff so a transient
  117. // panic doesn't permanently kill real-time updates for commercial deployments.
  118. // After the cap, the hub stays down and the frontend falls back to REST polling.
  119. func (h *Hub) Run() {
  120. for attempt := 0; attempt < hubRestartAttempts; attempt++ {
  121. stopped := h.runOnce()
  122. if stopped {
  123. return
  124. }
  125. if attempt < hubRestartAttempts-1 {
  126. wait := time.Duration(1<<attempt) * time.Second // 1s, 2s, 4s
  127. logger.Errorf("WebSocket hub crashed, restarting in %s (%d/%d)", wait, attempt+1, hubRestartAttempts-1)
  128. select {
  129. case <-time.After(wait):
  130. case <-h.ctx.Done():
  131. return
  132. }
  133. }
  134. }
  135. logger.Error("WebSocket hub stopped after exhausting restart attempts")
  136. }
  137. // runOnce drives the event loop once and returns true if the hub stopped
  138. // cleanly (context cancelled). On panic, recover logs and returns false so
  139. // Run can decide whether to retry.
  140. func (h *Hub) runOnce() (stopped bool) {
  141. defer func() {
  142. if r := recover(); r != nil {
  143. logger.Errorf("WebSocket hub panic recovered: %v", r)
  144. stopped = false
  145. }
  146. }()
  147. for {
  148. select {
  149. case <-h.ctx.Done():
  150. h.shutdown()
  151. return true
  152. case c := <-h.register:
  153. if c == nil {
  154. continue
  155. }
  156. h.mu.Lock()
  157. h.clients[c] = struct{}{}
  158. n := len(h.clients)
  159. h.mu.Unlock()
  160. logger.Debugf("WebSocket client connected: %s (total: %d)", c.ID, n)
  161. case c := <-h.unregister:
  162. if c == nil {
  163. continue
  164. }
  165. h.removeClient(c)
  166. case msg := <-h.broadcast:
  167. h.fanout(msg)
  168. }
  169. }
  170. }
  171. // shutdown closes all client send channels and clears the registry.
  172. func (h *Hub) shutdown() {
  173. h.mu.Lock()
  174. for c := range h.clients {
  175. c.closeOnce.Do(func() { close(c.Send) })
  176. }
  177. h.clients = make(map[*Client]struct{})
  178. h.mu.Unlock()
  179. logger.Info("WebSocket hub stopped")
  180. }
  181. // removeClient deletes a client and closes its send channel exactly once.
  182. func (h *Hub) removeClient(c *Client) {
  183. h.mu.Lock()
  184. if _, ok := h.clients[c]; ok {
  185. delete(h.clients, c)
  186. c.closeOnce.Do(func() { close(c.Send) })
  187. }
  188. n := len(h.clients)
  189. h.mu.Unlock()
  190. logger.Debugf("WebSocket client disconnected: %s (total: %d)", c.ID, n)
  191. }
  192. // fanout delivers msg to every client. Each send is non-blocking — a client
  193. // whose buffer is full is collected for direct removal at the end. We do NOT
  194. // route slow-client unregistrations through the unregister channel: under
  195. // burst load (panel restart, network blip) that channel can fill up while the
  196. // hub itself is the consumer, causing a self-deadlock.
  197. func (h *Hub) fanout(msg []byte) {
  198. if msg == nil {
  199. return
  200. }
  201. h.mu.RLock()
  202. if len(h.clients) == 0 {
  203. h.mu.RUnlock()
  204. return
  205. }
  206. targets := make([]*Client, 0, len(h.clients))
  207. for c := range h.clients {
  208. targets = append(targets, c)
  209. }
  210. h.mu.RUnlock()
  211. var dead []*Client
  212. for _, c := range targets {
  213. if !trySend(c, msg) {
  214. dead = append(dead, c)
  215. }
  216. }
  217. if len(dead) == 0 {
  218. return
  219. }
  220. h.mu.Lock()
  221. for _, c := range dead {
  222. if _, ok := h.clients[c]; ok {
  223. delete(h.clients, c)
  224. c.closeOnce.Do(func() { close(c.Send) })
  225. logger.Debugf("WebSocket client %s send buffer full, disconnected", c.ID)
  226. }
  227. }
  228. h.mu.Unlock()
  229. }
  230. // trySend performs a non-blocking write to the client's Send channel.
  231. // Returns false if the client should be evicted (full buffer or closed channel).
  232. // A defer-recover guards against the rare race where the channel was closed
  233. // concurrently — sending on a closed channel always panics, even with select+default.
  234. func trySend(c *Client, msg []byte) (ok bool) {
  235. defer func() {
  236. if r := recover(); r != nil {
  237. ok = false
  238. }
  239. }()
  240. select {
  241. case c.Send <- msg:
  242. return true
  243. default:
  244. return false
  245. }
  246. }
  247. // Broadcast serializes payload and queues it for delivery to all clients.
  248. // If the serialized message exceeds maxMessageSize, an invalidate signal is
  249. // queued instead so the frontend re-fetches via REST. Broadcasts of throttled
  250. // message types (see throttledMessageTypes) within minBroadcastInterval of
  251. // the previous one are dropped — the next legitimate mutation will push the
  252. // fresh state.
  253. func (h *Hub) Broadcast(messageType MessageType, payload any) {
  254. if h == nil || payload == nil || h.GetClientCount() == 0 {
  255. return
  256. }
  257. if h.shouldThrottle(messageType) {
  258. return
  259. }
  260. data, err := json.Marshal(Message{
  261. Type: messageType,
  262. Payload: payload,
  263. Time: time.Now().UnixMilli(),
  264. })
  265. if err != nil {
  266. logger.Error("WebSocket marshal failed:", err)
  267. return
  268. }
  269. if len(data) > maxMessageSize {
  270. logger.Debugf("WebSocket payload %d bytes exceeds limit, sending invalidate for %s", len(data), messageType)
  271. h.broadcastInvalidate(messageType)
  272. return
  273. }
  274. h.enqueue(data)
  275. }
  276. // broadcastInvalidate queues a lightweight signal telling clients to re-fetch
  277. // the named data type via REST.
  278. func (h *Hub) broadcastInvalidate(originalType MessageType) {
  279. data, err := json.Marshal(Message{
  280. Type: MessageTypeInvalidate,
  281. Payload: map[string]string{"type": string(originalType)},
  282. Time: time.Now().UnixMilli(),
  283. })
  284. if err != nil {
  285. logger.Error("WebSocket invalidate marshal failed:", err)
  286. return
  287. }
  288. h.enqueue(data)
  289. }
  290. // enqueue submits raw bytes to the broadcast channel. Dropped on backpressure
  291. // (channel full for >100ms) or shutdown.
  292. func (h *Hub) enqueue(data []byte) {
  293. select {
  294. case h.broadcast <- data:
  295. case <-time.After(enqueueTimeout):
  296. logger.Warning("WebSocket broadcast channel full, dropping message")
  297. case <-h.ctx.Done():
  298. }
  299. }
  300. // GetClientCount returns the number of connected clients.
  301. func (h *Hub) GetClientCount() int {
  302. if h == nil {
  303. return 0
  304. }
  305. h.mu.RLock()
  306. defer h.mu.RUnlock()
  307. return len(h.clients)
  308. }
  309. // Register adds a client to the hub.
  310. func (h *Hub) Register(c *Client) {
  311. if h == nil || c == nil {
  312. return
  313. }
  314. select {
  315. case h.register <- c:
  316. case <-h.ctx.Done():
  317. }
  318. }
  319. // Unregister removes a client from the hub. Fast path queues for the hub
  320. // goroutine; if the channel is saturated (disconnect storm) we fall back
  321. // to a direct removal under the write lock so dead clients aren't left in
  322. // the registry waiting for their Send buffer to fill (minutes of wasted
  323. // fanout work at low broadcast rates).
  324. //
  325. // Direct removal is safe from any caller: external goroutines (read/write
  326. // pumps) hold no hub locks, and the hub goroutine itself never holds h.mu
  327. // when it calls Unregister — fanout releases its RLock before per-client
  328. // sends, so we can't self-deadlock here.
  329. func (h *Hub) Unregister(c *Client) {
  330. if h == nil || c == nil {
  331. return
  332. }
  333. select {
  334. case h.unregister <- c:
  335. default:
  336. h.removeClient(c)
  337. }
  338. }
  339. // Stop signals the hub to shut down and close all client connections.
  340. func (h *Hub) Stop() {
  341. if h != nil && h.cancel != nil {
  342. h.cancel()
  343. }
  344. }