traffic_writer.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/logger"
  9. )
  10. const (
  11. trafficWriterQueueSize = 256
  12. trafficWriterSubmitTimeout = 5 * time.Second
  13. )
  14. type trafficWriteRequest struct {
  15. apply func() error
  16. done chan error
  17. }
  18. var (
  19. twQueue chan *trafficWriteRequest
  20. twCtx context.Context
  21. twCancel context.CancelFunc
  22. twDone chan struct{}
  23. twOnce sync.Once
  24. )
  25. func StartTrafficWriter() {
  26. twOnce.Do(func() {
  27. twQueue = make(chan *trafficWriteRequest, trafficWriterQueueSize)
  28. twCtx, twCancel = context.WithCancel(context.Background())
  29. twDone = make(chan struct{})
  30. go runTrafficWriter()
  31. })
  32. }
  33. func StopTrafficWriter() {
  34. if twCancel != nil {
  35. twCancel()
  36. <-twDone
  37. }
  38. }
  39. func runTrafficWriter() {
  40. defer close(twDone)
  41. for {
  42. select {
  43. case req := <-twQueue:
  44. req.done <- safeApply(req.apply)
  45. case <-twCtx.Done():
  46. for {
  47. select {
  48. case req := <-twQueue:
  49. req.done <- safeApply(req.apply)
  50. default:
  51. return
  52. }
  53. }
  54. }
  55. }
  56. }
  57. func safeApply(fn func() error) (err error) {
  58. defer func() {
  59. if r := recover(); r != nil {
  60. err = fmt.Errorf("traffic writer panic: %v", r)
  61. logger.Error(err.Error())
  62. }
  63. }()
  64. return fn()
  65. }
  66. func submitTrafficWrite(fn func() error) error {
  67. if twQueue == nil {
  68. return safeApply(fn)
  69. }
  70. req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
  71. select {
  72. case twQueue <- req:
  73. case <-time.After(trafficWriterSubmitTimeout):
  74. return errors.New("traffic writer queue full")
  75. }
  76. return <-req.done
  77. }