bus.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package eventbus
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  6. )
  7. // DefaultBufferSize is the number of events the bus can hold before Publish starts dropping.
  8. const DefaultBufferSize = 256
  9. // subscriber pairs an ID with its event handler.
  10. type subscriber struct {
  11. id string
  12. handler func(Event)
  13. }
  14. // Bus is a minimal in-process pub/sub event bus backed by a buffered channel.
  15. // Producers call Publish (non-blocking) and every event is fanned out to all
  16. // subscribers; per-event filtering is the subscriber's responsibility.
  17. type Bus struct {
  18. ch chan Event
  19. subs []subscriber
  20. mu sync.RWMutex
  21. done chan struct{}
  22. wg sync.WaitGroup
  23. }
  24. // New creates a Bus with the given buffer size. Use 0 for DefaultBufferSize.
  25. func New(bufSize int) *Bus {
  26. if bufSize <= 0 {
  27. bufSize = DefaultBufferSize
  28. }
  29. b := &Bus{
  30. ch: make(chan Event, bufSize),
  31. done: make(chan struct{}),
  32. }
  33. b.wg.Add(1)
  34. go b.dispatch()
  35. return b
  36. }
  37. // Subscribe registers a handler that receives every published event.
  38. // The id is used for Unsubscribe; it must be unique across active subscribers.
  39. // Subscribing with an already-registered id replaces the previous handler.
  40. func (b *Bus) Subscribe(id string, handler func(Event)) {
  41. b.mu.Lock()
  42. defer b.mu.Unlock()
  43. for i, s := range b.subs {
  44. if s.id == id {
  45. b.subs[i].handler = handler
  46. return
  47. }
  48. }
  49. b.subs = append(b.subs, subscriber{id: id, handler: handler})
  50. }
  51. // Unsubscribe removes a subscriber by id. Safe to call with unknown id.
  52. func (b *Bus) Unsubscribe(id string) {
  53. b.mu.Lock()
  54. defer b.mu.Unlock()
  55. for i, s := range b.subs {
  56. if s.id == id {
  57. b.subs = append(b.subs[:i], b.subs[i+1:]...)
  58. return
  59. }
  60. }
  61. }
  62. // Publish sends an event to all subscribers. Non-blocking — if the buffer is
  63. // full the event is dropped and a warning is logged.
  64. func (b *Bus) Publish(e Event) {
  65. if e.Timestamp.IsZero() {
  66. e.Timestamp = time.Now()
  67. }
  68. select {
  69. case b.ch <- e:
  70. default:
  71. logger.Warning("eventbus: buffer full, dropping event ", e.Type)
  72. }
  73. }
  74. // dispatch is the fan-out loop. It reads events from the channel and calls
  75. // every subscriber's handler sequentially. Handlers run on the dispatch
  76. // goroutine — they must not block.
  77. func (b *Bus) dispatch() {
  78. defer b.wg.Done()
  79. for {
  80. select {
  81. case e, ok := <-b.ch:
  82. if !ok {
  83. return
  84. }
  85. b.mu.RLock()
  86. subs := make([]subscriber, len(b.subs))
  87. copy(subs, b.subs)
  88. b.mu.RUnlock()
  89. for _, s := range subs {
  90. safeCall(s.handler, e)
  91. }
  92. case <-b.done:
  93. return
  94. }
  95. }
  96. }
  97. // safeCall invokes handler with panic recovery.
  98. func safeCall(fn func(Event), e Event) {
  99. defer func() {
  100. if r := recover(); r != nil {
  101. logger.Errorf("eventbus: subscriber panicked on %s: %v", e.Type, r)
  102. }
  103. }()
  104. fn(e)
  105. }
  106. // Stop shuts down the bus: the dispatch goroutine exits, in-flight handlers
  107. // finish, and any events still buffered may be dropped. Safe to call once.
  108. func (b *Bus) Stop() {
  109. close(b.done)
  110. b.wg.Wait()
  111. }