hub_test.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package websocket
  2. import (
  3. "encoding/json"
  4. "os"
  5. "sync"
  6. "testing"
  7. "time"
  8. xuilogger "github.com/mhsanaei/3x-ui/v3/logger"
  9. "github.com/op/go-logging"
  10. )
  11. func TestMain(m *testing.M) {
  12. _ = os.Setenv("XUI_LOG_FOLDER", os.TempDir())
  13. xuilogger.InitLogger(logging.ERROR)
  14. os.Exit(m.Run())
  15. }
  16. func TestNewClient_HasBufferedSendChannel(t *testing.T) {
  17. c := NewClient("client-1")
  18. if c.ID != "client-1" {
  19. t.Fatalf("ID = %q, want client-1", c.ID)
  20. }
  21. if cap(c.Send) != clientSendQueue {
  22. t.Fatalf("Send cap = %d, want %d", cap(c.Send), clientSendQueue)
  23. }
  24. }
  25. func TestHub_NilReceiver_DoesNotPanic(t *testing.T) {
  26. var h *Hub
  27. if h.GetClientCount() != 0 {
  28. t.Fatal("nil hub GetClientCount should return 0")
  29. }
  30. h.Broadcast(MessageTypeStatus, "anything")
  31. h.Register(NewClient("x"))
  32. h.Unregister(NewClient("x"))
  33. h.Stop()
  34. }
  35. func TestHub_BroadcastDropsWhenNoClients(t *testing.T) {
  36. h := NewHub()
  37. defer h.Stop()
  38. go h.Run()
  39. h.Broadcast(MessageTypeStatus, "payload")
  40. select {
  41. case <-h.broadcast:
  42. t.Fatal("Broadcast should drop when client count is zero")
  43. case <-time.After(50 * time.Millisecond):
  44. }
  45. }
  46. func TestHub_BroadcastDropsNilPayload(t *testing.T) {
  47. h := NewHub()
  48. defer h.Stop()
  49. go h.Run()
  50. c := NewClient("c1")
  51. h.Register(c)
  52. waitClientCount(t, h, 1)
  53. h.Broadcast(MessageTypeStatus, nil)
  54. select {
  55. case <-c.Send:
  56. t.Fatal("nil payload should be dropped, not delivered")
  57. case <-time.After(50 * time.Millisecond):
  58. }
  59. }
  60. func TestHub_BroadcastDeliversToClient(t *testing.T) {
  61. h := NewHub()
  62. defer h.Stop()
  63. go h.Run()
  64. c := NewClient("c1")
  65. h.Register(c)
  66. waitClientCount(t, h, 1)
  67. h.Broadcast(MessageTypeStatus, map[string]string{"k": "v"})
  68. select {
  69. case raw := <-c.Send:
  70. var m Message
  71. if err := json.Unmarshal(raw, &m); err != nil {
  72. t.Fatalf("payload is not valid JSON: %v\n%s", err, raw)
  73. }
  74. if m.Type != MessageTypeStatus {
  75. t.Fatalf("Type = %q, want %q", m.Type, MessageTypeStatus)
  76. }
  77. if m.Time == 0 {
  78. t.Fatal("Time should be set to a non-zero unix-millis value")
  79. }
  80. case <-time.After(500 * time.Millisecond):
  81. t.Fatal("timed out waiting for broadcast to reach client")
  82. }
  83. }
  84. func TestHub_UnregisterClosesSendAndDecrementsCount(t *testing.T) {
  85. h := NewHub()
  86. defer h.Stop()
  87. go h.Run()
  88. c := NewClient("c1")
  89. h.Register(c)
  90. waitClientCount(t, h, 1)
  91. h.Unregister(c)
  92. waitClientCount(t, h, 0)
  93. select {
  94. case _, ok := <-c.Send:
  95. if ok {
  96. t.Fatal("expected Send channel to be closed after Unregister")
  97. }
  98. case <-time.After(500 * time.Millisecond):
  99. t.Fatal("Send channel was not closed after Unregister")
  100. }
  101. }
  102. func TestHub_StopClosesAllClients(t *testing.T) {
  103. h := NewHub()
  104. go h.Run()
  105. c1 := NewClient("c1")
  106. c2 := NewClient("c2")
  107. h.Register(c1)
  108. h.Register(c2)
  109. waitClientCount(t, h, 2)
  110. h.Stop()
  111. for _, c := range []*Client{c1, c2} {
  112. select {
  113. case _, ok := <-c.Send:
  114. if ok {
  115. t.Fatalf("client %s Send should be closed after Stop", c.ID)
  116. }
  117. case <-time.After(500 * time.Millisecond):
  118. t.Fatalf("client %s Send not closed after Stop", c.ID)
  119. }
  120. }
  121. }
  122. func TestHub_ShouldThrottle(t *testing.T) {
  123. h := NewHub()
  124. defer h.Stop()
  125. if h.shouldThrottle(MessageTypeStatus) {
  126. t.Fatal("non-gated message type should never throttle")
  127. }
  128. if h.shouldThrottle(MessageTypeStatus) {
  129. t.Fatal("non-gated message type should never throttle on second call")
  130. }
  131. if h.shouldThrottle(MessageTypeTraffic) {
  132. t.Fatal("first call for gated type should not throttle")
  133. }
  134. if !h.shouldThrottle(MessageTypeTraffic) {
  135. t.Fatal("immediate second call for gated type should throttle")
  136. }
  137. }
  138. func TestHub_ShouldThrottle_DistinctTypesIndependent(t *testing.T) {
  139. h := NewHub()
  140. defer h.Stop()
  141. if h.shouldThrottle(MessageTypeTraffic) {
  142. t.Fatal("first Traffic call should not throttle")
  143. }
  144. if h.shouldThrottle(MessageTypeInbounds) {
  145. t.Fatal("first Inbounds call should not throttle even after Traffic")
  146. }
  147. }
  148. func TestTrySend_SucceedsWithRoom(t *testing.T) {
  149. c := &Client{ID: "c", Send: make(chan []byte, 1)}
  150. if !trySend(c, []byte("hi")) {
  151. t.Fatal("trySend should succeed when buffer has room")
  152. }
  153. }
  154. func TestTrySend_FailsWhenFull(t *testing.T) {
  155. c := &Client{ID: "c", Send: make(chan []byte, 1)}
  156. c.Send <- []byte("first")
  157. if trySend(c, []byte("second")) {
  158. t.Fatal("trySend should fail when buffer is full")
  159. }
  160. }
  161. func TestTrySend_FailsOnClosedChannel(t *testing.T) {
  162. c := &Client{ID: "c", Send: make(chan []byte, 1)}
  163. close(c.Send)
  164. if trySend(c, []byte("after-close")) {
  165. t.Fatal("trySend should fail (not panic) when channel is closed")
  166. }
  167. }
  168. func TestHub_FanoutEvictsSlowClient(t *testing.T) {
  169. h := NewHub()
  170. defer h.Stop()
  171. go h.Run()
  172. slow := &Client{ID: "slow", Send: make(chan []byte, 1)}
  173. slow.Send <- []byte("buffer-already-full")
  174. h.Register(slow)
  175. waitClientCount(t, h, 1)
  176. h.Broadcast(MessageTypeStatus, "payload")
  177. waitClientCount(t, h, 0)
  178. select {
  179. case _, ok := <-slow.Send:
  180. if ok {
  181. _, ok = <-slow.Send
  182. if ok {
  183. t.Fatal("slow client Send should eventually be closed by fanout eviction")
  184. }
  185. }
  186. case <-time.After(500 * time.Millisecond):
  187. t.Fatal("slow client Send channel was not closed")
  188. }
  189. }
  190. func TestHub_ConcurrentRegisterUnregister(t *testing.T) {
  191. h := NewHub()
  192. defer h.Stop()
  193. go h.Run()
  194. const n = 50
  195. var wg sync.WaitGroup
  196. for i := range n {
  197. wg.Add(1)
  198. go func(idx int) {
  199. defer wg.Done()
  200. c := NewClient("c")
  201. h.Register(c)
  202. h.Unregister(c)
  203. }(i)
  204. }
  205. wg.Wait()
  206. waitClientCount(t, h, 0)
  207. }
  208. func waitClientCount(t *testing.T, h *Hub, want int) {
  209. t.Helper()
  210. deadline := time.Now().Add(time.Second)
  211. for time.Now().Before(deadline) {
  212. if h.GetClientCount() == want {
  213. return
  214. }
  215. time.Sleep(5 * time.Millisecond)
  216. }
  217. t.Fatalf("client count never reached %d (last seen %d)", want, h.GetClientCount())
  218. }