| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package eventbus
- import (
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- "github.com/op/go-logging"
- )
- func TestMain(m *testing.M) {
- logger.InitLogger(logging.ERROR)
- m.Run()
- }
- func TestBusPublishSubscribe(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var received Event
- var wg sync.WaitGroup
- wg.Add(1)
- b.Subscribe("test", func(e Event) {
- received = e
- wg.Done()
- })
- b.Publish(Event{Type: EventOutboundDown, Source: "my-proxy"})
- select {
- case <-waitDone(&wg):
- case <-time.After(time.Second):
- t.Fatal("subscriber did not receive event")
- }
- if received.Type != EventOutboundDown {
- t.Errorf("got type %q, want %q", received.Type, EventOutboundDown)
- }
- if received.Source != "my-proxy" {
- t.Errorf("got source %q, want %q", received.Source, "my-proxy")
- }
- if received.Timestamp.IsZero() {
- t.Error("timestamp not set")
- }
- }
- func TestBusMultipleSubscribers(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var count atomic.Int32
- var wg sync.WaitGroup
- wg.Add(2)
- b.Subscribe("a", func(e Event) {
- count.Add(1)
- wg.Done()
- })
- b.Subscribe("b", func(e Event) {
- count.Add(1)
- wg.Done()
- })
- b.Publish(Event{Type: EventXrayCrash})
- select {
- case <-waitDone(&wg):
- case <-time.After(time.Second):
- t.Fatal("subscribers did not receive event")
- }
- if count.Load() != 2 {
- t.Errorf("got %d calls, want 2", count.Load())
- }
- }
- func TestBusUnsubscribe(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var count atomic.Int32
- b.Subscribe("test", func(e Event) {
- count.Add(1)
- })
- b.Unsubscribe("test")
- b.Publish(Event{Type: EventOutboundUp})
- time.Sleep(50 * time.Millisecond)
- if count.Load() != 0 {
- t.Errorf("got %d calls after unsubscribe, want 0", count.Load())
- }
- }
- func TestBusReplaceSubscriber(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var last string
- var wg sync.WaitGroup
- wg.Add(1)
- b.Subscribe("test", func(e Event) {
- last = "old"
- })
- b.Subscribe("test", func(e Event) {
- last = "new"
- wg.Done()
- })
- b.Publish(Event{Type: EventOutboundDown})
- select {
- case <-waitDone(&wg):
- case <-time.After(time.Second):
- t.Fatal("subscriber did not receive event")
- }
- if last != "new" {
- t.Errorf("got %q, want %q", last, "new")
- }
- }
- func TestBusPanicRecovery(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var wg sync.WaitGroup
- wg.Add(1)
- b.Subscribe("panicker", func(e Event) {
- panic("oops")
- })
- b.Subscribe("after", func(e Event) {
- wg.Done()
- })
- b.Publish(Event{Type: EventOutboundDown})
- select {
- case <-waitDone(&wg):
- case <-time.After(time.Second):
- t.Fatal("subscriber after panicker did not receive event")
- }
- }
- func TestBusBufferFull(t *testing.T) {
- b := New(2)
- defer b.Stop()
- b.Subscribe("slow", func(e Event) {
- time.Sleep(100 * time.Millisecond)
- })
- b.Publish(Event{Type: EventOutboundDown})
- b.Publish(Event{Type: EventOutboundUp})
- b.Publish(Event{Type: EventXrayCrash})
- time.Sleep(50 * time.Millisecond)
- }
- func TestBusZeroTimestamp(t *testing.T) {
- b := New(16)
- defer b.Stop()
- var received Event
- var wg sync.WaitGroup
- wg.Add(1)
- b.Subscribe("test", func(e Event) {
- received = e
- wg.Done()
- })
- b.Publish(Event{Type: EventOutboundDown})
- select {
- case <-waitDone(&wg):
- case <-time.After(time.Second):
- t.Fatal("subscriber did not receive event")
- }
- if received.Timestamp.IsZero() {
- t.Error("timestamp should be set automatically")
- }
- }
- func waitDone(wg *sync.WaitGroup) <-chan struct{} {
- ch := make(chan struct{})
- go func() {
- wg.Wait()
- close(ch)
- }()
- return ch
- }
|