filter.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. package eventbus
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // RateLimiter prevents notification spam from flapping events.
  7. type RateLimiter struct {
  8. mu sync.Mutex
  9. lastSent map[string]time.Time
  10. cooldown time.Duration
  11. lastPrune time.Time
  12. }
  13. // NewRateLimiter creates a rate limiter with the given cooldown period.
  14. func NewRateLimiter(cooldown time.Duration) *RateLimiter {
  15. return &RateLimiter{
  16. lastSent: make(map[string]time.Time),
  17. cooldown: cooldown,
  18. }
  19. }
  20. // Allow returns true if the event should be sent (cooldown has elapsed).
  21. func (r *RateLimiter) Allow(eventType EventType, source string) bool {
  22. key := string(eventType) + ":" + source
  23. now := time.Now()
  24. r.mu.Lock()
  25. defer r.mu.Unlock()
  26. r.pruneLocked(now)
  27. if now.Sub(r.lastSent[key]) < r.cooldown {
  28. return false
  29. }
  30. r.lastSent[key] = now
  31. return true
  32. }
  33. // pruneLocked drops keys whose cooldown has elapsed. Such an entry no longer
  34. // affects Allow's result, so removing it is safe and keeps the map from
  35. // retaining one entry per (eventType, source) ever seen. Throttled to once per
  36. // cooldown so a busy bus doesn't sweep the whole map on every event.
  37. func (r *RateLimiter) pruneLocked(now time.Time) {
  38. if now.Sub(r.lastPrune) < r.cooldown {
  39. return
  40. }
  41. r.lastPrune = now
  42. for k, v := range r.lastSent {
  43. if now.Sub(v) >= r.cooldown {
  44. delete(r.lastSent, k)
  45. }
  46. }
  47. }