bus_test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package eventbus
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "testing"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  8. "github.com/op/go-logging"
  9. )
  10. func TestMain(m *testing.M) {
  11. logger.InitLogger(logging.ERROR)
  12. m.Run()
  13. }
  14. func TestBusPublishSubscribe(t *testing.T) {
  15. b := New(16)
  16. defer b.Stop()
  17. var received Event
  18. var wg sync.WaitGroup
  19. wg.Add(1)
  20. b.Subscribe("test", func(e Event) {
  21. received = e
  22. wg.Done()
  23. })
  24. b.Publish(Event{Type: EventOutboundDown, Source: "my-proxy"})
  25. select {
  26. case <-waitDone(&wg):
  27. case <-time.After(time.Second):
  28. t.Fatal("subscriber did not receive event")
  29. }
  30. if received.Type != EventOutboundDown {
  31. t.Errorf("got type %q, want %q", received.Type, EventOutboundDown)
  32. }
  33. if received.Source != "my-proxy" {
  34. t.Errorf("got source %q, want %q", received.Source, "my-proxy")
  35. }
  36. if received.Timestamp.IsZero() {
  37. t.Error("timestamp not set")
  38. }
  39. }
  40. func TestBusMultipleSubscribers(t *testing.T) {
  41. b := New(16)
  42. defer b.Stop()
  43. var count atomic.Int32
  44. var wg sync.WaitGroup
  45. wg.Add(2)
  46. b.Subscribe("a", func(e Event) {
  47. count.Add(1)
  48. wg.Done()
  49. })
  50. b.Subscribe("b", func(e Event) {
  51. count.Add(1)
  52. wg.Done()
  53. })
  54. b.Publish(Event{Type: EventXrayCrash})
  55. select {
  56. case <-waitDone(&wg):
  57. case <-time.After(time.Second):
  58. t.Fatal("subscribers did not receive event")
  59. }
  60. if count.Load() != 2 {
  61. t.Errorf("got %d calls, want 2", count.Load())
  62. }
  63. }
  64. func TestBusUnsubscribe(t *testing.T) {
  65. b := New(16)
  66. defer b.Stop()
  67. var count atomic.Int32
  68. b.Subscribe("test", func(e Event) {
  69. count.Add(1)
  70. })
  71. b.Unsubscribe("test")
  72. b.Publish(Event{Type: EventOutboundUp})
  73. time.Sleep(50 * time.Millisecond)
  74. if count.Load() != 0 {
  75. t.Errorf("got %d calls after unsubscribe, want 0", count.Load())
  76. }
  77. }
  78. func TestBusReplaceSubscriber(t *testing.T) {
  79. b := New(16)
  80. defer b.Stop()
  81. var last string
  82. var wg sync.WaitGroup
  83. wg.Add(1)
  84. b.Subscribe("test", func(e Event) {
  85. last = "old"
  86. })
  87. b.Subscribe("test", func(e Event) {
  88. last = "new"
  89. wg.Done()
  90. })
  91. b.Publish(Event{Type: EventOutboundDown})
  92. select {
  93. case <-waitDone(&wg):
  94. case <-time.After(time.Second):
  95. t.Fatal("subscriber did not receive event")
  96. }
  97. if last != "new" {
  98. t.Errorf("got %q, want %q", last, "new")
  99. }
  100. }
  101. func TestBusPanicRecovery(t *testing.T) {
  102. b := New(16)
  103. defer b.Stop()
  104. var wg sync.WaitGroup
  105. wg.Add(1)
  106. b.Subscribe("panicker", func(e Event) {
  107. panic("oops")
  108. })
  109. b.Subscribe("after", func(e Event) {
  110. wg.Done()
  111. })
  112. b.Publish(Event{Type: EventOutboundDown})
  113. select {
  114. case <-waitDone(&wg):
  115. case <-time.After(time.Second):
  116. t.Fatal("subscriber after panicker did not receive event")
  117. }
  118. }
  119. func TestBusBufferFull(t *testing.T) {
  120. b := New(2)
  121. defer b.Stop()
  122. b.Subscribe("slow", func(e Event) {
  123. time.Sleep(100 * time.Millisecond)
  124. })
  125. b.Publish(Event{Type: EventOutboundDown})
  126. b.Publish(Event{Type: EventOutboundUp})
  127. b.Publish(Event{Type: EventXrayCrash})
  128. time.Sleep(50 * time.Millisecond)
  129. }
  130. func TestBusZeroTimestamp(t *testing.T) {
  131. b := New(16)
  132. defer b.Stop()
  133. var received Event
  134. var wg sync.WaitGroup
  135. wg.Add(1)
  136. b.Subscribe("test", func(e Event) {
  137. received = e
  138. wg.Done()
  139. })
  140. b.Publish(Event{Type: EventOutboundDown})
  141. select {
  142. case <-waitDone(&wg):
  143. case <-time.After(time.Second):
  144. t.Fatal("subscriber did not receive event")
  145. }
  146. if received.Timestamp.IsZero() {
  147. t.Error("timestamp should be set automatically")
  148. }
  149. }
  150. func waitDone(wg *sync.WaitGroup) <-chan struct{} {
  151. ch := make(chan struct{})
  152. go func() {
  153. wg.Wait()
  154. close(ch)
  155. }()
  156. return ch
  157. }