1
0

traffic_writer.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/logger"
  9. )
  10. const (
  11. trafficWriterQueueSize = 256
  12. trafficWriterSubmitTimeout = 5 * time.Second
  13. )
  14. type trafficWriteRequest struct {
  15. apply func() error
  16. done chan error
  17. }
  18. var (
  19. twMu sync.Mutex
  20. twQueue chan *trafficWriteRequest
  21. twCtx context.Context
  22. twCancel context.CancelFunc
  23. twDone chan struct{}
  24. )
  25. // StartTrafficWriter spins up the serial writer goroutine. Safe to call again
  26. // after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The
  27. // previous sync.Once-based implementation deadlocked after a SIGHUP-driven
  28. // panel restart: Stop killed the consumer goroutine but Once prevented Start
  29. // from spawning a new one, so every later submitTrafficWrite blocked forever
  30. // on <-req.done with no consumer (including the AddTraffic call inside
  31. // XrayService.GetXrayConfig that runs from startTask).
  32. func StartTrafficWriter() {
  33. twMu.Lock()
  34. defer twMu.Unlock()
  35. if twCancel != nil && twDone != nil {
  36. select {
  37. case <-twDone:
  38. clearTrafficWriterState()
  39. default:
  40. return
  41. }
  42. }
  43. queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
  44. ctx, cancel := context.WithCancel(context.Background())
  45. done := make(chan struct{})
  46. twQueue = queue
  47. twCtx = ctx
  48. twCancel = cancel
  49. twDone = done
  50. go runTrafficWriter(ctx, queue, done)
  51. }
  52. // StopTrafficWriter cancels the writer context and waits for the goroutine to
  53. // drain any pending requests before returning. Resets the package state so a
  54. // subsequent StartTrafficWriter can spawn a fresh consumer.
  55. func StopTrafficWriter() {
  56. twMu.Lock()
  57. cancel := twCancel
  58. done := twDone
  59. if cancel == nil || done == nil {
  60. twMu.Unlock()
  61. return
  62. }
  63. cancel()
  64. twMu.Unlock()
  65. <-done
  66. twMu.Lock()
  67. if twDone == done {
  68. clearTrafficWriterState()
  69. }
  70. twMu.Unlock()
  71. }
  72. func clearTrafficWriterState() {
  73. twQueue = nil
  74. twCtx = nil
  75. twCancel = nil
  76. twDone = nil
  77. }
  78. func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done chan struct{}) {
  79. defer close(done)
  80. for {
  81. select {
  82. case req := <-queue:
  83. req.done <- safeApply(req.apply)
  84. case <-ctx.Done():
  85. for {
  86. select {
  87. case req := <-queue:
  88. req.done <- safeApply(req.apply)
  89. default:
  90. return
  91. }
  92. }
  93. }
  94. }
  95. }
  96. func safeApply(fn func() error) (err error) {
  97. defer func() {
  98. if r := recover(); r != nil {
  99. err = fmt.Errorf("traffic writer panic: %v", r)
  100. logger.Error(err.Error())
  101. }
  102. }()
  103. return fn()
  104. }
  105. func submitTrafficWrite(fn func() error) error {
  106. req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
  107. twMu.Lock()
  108. queue := twQueue
  109. ctx := twCtx
  110. done := twDone
  111. if queue == nil || ctx == nil || done == nil {
  112. twMu.Unlock()
  113. return safeApply(fn)
  114. }
  115. select {
  116. case <-ctx.Done():
  117. twMu.Unlock()
  118. return safeApply(fn)
  119. default:
  120. }
  121. timer := time.NewTimer(trafficWriterSubmitTimeout)
  122. defer timer.Stop()
  123. select {
  124. case queue <- req:
  125. twMu.Unlock()
  126. case <-timer.C:
  127. twMu.Unlock()
  128. return errors.New("traffic writer queue full")
  129. }
  130. select {
  131. case err := <-req.done:
  132. return err
  133. case <-done:
  134. select {
  135. case err := <-req.done:
  136. return err
  137. default:
  138. return errors.New("traffic writer stopped before write completed")
  139. }
  140. }
  141. }