1
0

traffic_writer.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/logger"
  9. )
  10. const (
  11. trafficWriterQueueSize = 256
  12. trafficWriterSubmitTimeout = 5 * time.Second
  13. )
  14. type trafficWriteRequest struct {
  15. apply func() error
  16. done chan error
  17. }
  18. var (
  19. twMu sync.Mutex
  20. twQueue chan *trafficWriteRequest
  21. twCancel context.CancelFunc
  22. twDone chan struct{}
  23. )
  24. // StartTrafficWriter spins up the serial writer goroutine. Safe to call again
  25. // after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The
  26. // previous sync.Once-based implementation deadlocked after a SIGHUP-driven
  27. // panel restart: Stop killed the consumer goroutine but Once prevented Start
  28. // from spawning a new one, so every later submitTrafficWrite blocked forever
  29. // on <-req.done with no consumer (including the AddTraffic call inside
  30. // XrayService.GetXrayConfig that runs from startTask).
  31. func StartTrafficWriter() {
  32. twMu.Lock()
  33. defer twMu.Unlock()
  34. if twQueue != nil {
  35. return
  36. }
  37. queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
  38. ctx, cancel := context.WithCancel(context.Background())
  39. done := make(chan struct{})
  40. twQueue = queue
  41. twCancel = cancel
  42. twDone = done
  43. go runTrafficWriter(queue, ctx, done)
  44. }
  45. // StopTrafficWriter cancels the writer context and waits for the goroutine to
  46. // drain any pending requests before returning. Resets the package state so a
  47. // subsequent StartTrafficWriter can spawn a fresh consumer.
  48. func StopTrafficWriter() {
  49. twMu.Lock()
  50. cancel := twCancel
  51. done := twDone
  52. twQueue = nil
  53. twCancel = nil
  54. twDone = nil
  55. twMu.Unlock()
  56. if cancel != nil {
  57. cancel()
  58. }
  59. if done != nil {
  60. <-done
  61. }
  62. }
  63. func runTrafficWriter(queue chan *trafficWriteRequest, ctx context.Context, done chan struct{}) {
  64. defer close(done)
  65. for {
  66. select {
  67. case req := <-queue:
  68. req.done <- safeApply(req.apply)
  69. case <-ctx.Done():
  70. for {
  71. select {
  72. case req := <-queue:
  73. req.done <- safeApply(req.apply)
  74. default:
  75. return
  76. }
  77. }
  78. }
  79. }
  80. }
  81. func safeApply(fn func() error) (err error) {
  82. defer func() {
  83. if r := recover(); r != nil {
  84. err = fmt.Errorf("traffic writer panic: %v", r)
  85. logger.Error(err.Error())
  86. }
  87. }()
  88. return fn()
  89. }
  90. func submitTrafficWrite(fn func() error) error {
  91. twMu.Lock()
  92. queue := twQueue
  93. twMu.Unlock()
  94. if queue == nil {
  95. return safeApply(fn)
  96. }
  97. req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
  98. select {
  99. case queue <- req:
  100. case <-time.After(trafficWriterSubmitTimeout):
  101. return errors.New("traffic writer queue full")
  102. }
  103. return <-req.done
  104. }