| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package eventbus
- import (
- "sync"
- "time"
- "github.com/mhsanaei/3x-ui/v3/internal/logger"
- )
- // DefaultBufferSize is the number of events the bus can hold before Publish starts dropping.
- const DefaultBufferSize = 256
- // subscriber pairs an ID with its event handler.
- type subscriber struct {
- id string
- handler func(Event)
- }
- // Bus is a minimal in-process pub/sub event bus backed by a buffered channel.
- // Producers call Publish (non-blocking) and every event is fanned out to all
- // subscribers; per-event filtering is the subscriber's responsibility.
- type Bus struct {
- ch chan Event
- subs []subscriber
- mu sync.RWMutex
- done chan struct{}
- wg sync.WaitGroup
- }
- // New creates a Bus with the given buffer size. Use 0 for DefaultBufferSize.
- func New(bufSize int) *Bus {
- if bufSize <= 0 {
- bufSize = DefaultBufferSize
- }
- b := &Bus{
- ch: make(chan Event, bufSize),
- done: make(chan struct{}),
- }
- b.wg.Add(1)
- go b.dispatch()
- return b
- }
- // Subscribe registers a handler that receives every published event.
- // The id is used for Unsubscribe; it must be unique across active subscribers.
- // Subscribing with an already-registered id replaces the previous handler.
- func (b *Bus) Subscribe(id string, handler func(Event)) {
- b.mu.Lock()
- defer b.mu.Unlock()
- for i, s := range b.subs {
- if s.id == id {
- b.subs[i].handler = handler
- return
- }
- }
- b.subs = append(b.subs, subscriber{id: id, handler: handler})
- }
- // Unsubscribe removes a subscriber by id. Safe to call with unknown id.
- func (b *Bus) Unsubscribe(id string) {
- b.mu.Lock()
- defer b.mu.Unlock()
- for i, s := range b.subs {
- if s.id == id {
- b.subs = append(b.subs[:i], b.subs[i+1:]...)
- return
- }
- }
- }
- // Publish sends an event to all subscribers. Non-blocking — if the buffer is
- // full the event is dropped and a warning is logged.
- func (b *Bus) Publish(e Event) {
- if e.Timestamp.IsZero() {
- e.Timestamp = time.Now()
- }
- select {
- case b.ch <- e:
- default:
- logger.Warning("eventbus: buffer full, dropping event ", e.Type)
- }
- }
- // dispatch is the fan-out loop. It reads events from the channel and calls
- // every subscriber's handler sequentially. Handlers run on the dispatch
- // goroutine — they must not block.
- func (b *Bus) dispatch() {
- defer b.wg.Done()
- for {
- select {
- case e, ok := <-b.ch:
- if !ok {
- return
- }
- b.mu.RLock()
- subs := make([]subscriber, len(b.subs))
- copy(subs, b.subs)
- b.mu.RUnlock()
- for _, s := range subs {
- safeCall(s.handler, e)
- }
- case <-b.done:
- return
- }
- }
- }
- // safeCall invokes handler with panic recovery.
- func safeCall(fn func(Event), e Event) {
- defer func() {
- if r := recover(); r != nil {
- logger.Errorf("eventbus: subscriber panicked on %s: %v", e.Type, r)
- }
- }()
- fn(e)
- }
- // Stop shuts down the bus: the dispatch goroutine exits, in-flight handlers
- // finish, and any events still buffered may be dropped. Safe to call once.
- func (b *Bus) Stop() {
- close(b.done)
- b.wg.Wait()
- }
|