traffic_writer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package service
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "gorm.io/gorm"
  11. )
  12. const (
  13. trafficWriterQueueSize = 256
  14. trafficWriterSubmitTimeout = 5 * time.Second
  15. )
  16. type trafficWriteRequest struct {
  17. apply func() error
  18. done chan error
  19. }
  20. var (
  21. twMu sync.Mutex
  22. twQueue chan *trafficWriteRequest
  23. twCtx context.Context
  24. twCancel context.CancelFunc
  25. twDone chan struct{}
  26. )
  27. // StartTrafficWriter spins up the serial writer goroutine. Safe to call again
  28. // after StopTrafficWriter — each Start/Stop cycle gets fresh channels. The
  29. // previous sync.Once-based implementation deadlocked after a SIGHUP-driven
  30. // panel restart: Stop killed the consumer goroutine but Once prevented Start
  31. // from spawning a new one, so every later submitTrafficWrite blocked forever
  32. // on <-req.done with no consumer (including the AddTraffic call inside
  33. // XrayService.GetXrayConfig that runs from startTask).
  34. func StartTrafficWriter() {
  35. twMu.Lock()
  36. defer twMu.Unlock()
  37. if twCancel != nil && twDone != nil {
  38. select {
  39. case <-twDone:
  40. clearTrafficWriterState()
  41. default:
  42. return
  43. }
  44. }
  45. queue := make(chan *trafficWriteRequest, trafficWriterQueueSize)
  46. ctx, cancel := context.WithCancel(context.Background())
  47. done := make(chan struct{})
  48. twQueue = queue
  49. twCtx = ctx
  50. twCancel = cancel
  51. twDone = done
  52. go runTrafficWriter(ctx, queue, done)
  53. }
  54. // StopTrafficWriter cancels the writer context and waits for the goroutine to
  55. // drain any pending requests before returning. Resets the package state so a
  56. // subsequent StartTrafficWriter can spawn a fresh consumer.
  57. func StopTrafficWriter() {
  58. twMu.Lock()
  59. cancel := twCancel
  60. done := twDone
  61. if cancel == nil || done == nil {
  62. twMu.Unlock()
  63. return
  64. }
  65. cancel()
  66. twMu.Unlock()
  67. <-done
  68. twMu.Lock()
  69. if twDone == done {
  70. clearTrafficWriterState()
  71. }
  72. twMu.Unlock()
  73. }
  74. func clearTrafficWriterState() {
  75. twQueue = nil
  76. twCtx = nil
  77. twCancel = nil
  78. twDone = nil
  79. }
  80. func runTrafficWriter(ctx context.Context, queue chan *trafficWriteRequest, done chan struct{}) {
  81. defer close(done)
  82. for {
  83. select {
  84. case req := <-queue:
  85. req.done <- safeApply(req.apply)
  86. case <-ctx.Done():
  87. for {
  88. select {
  89. case req := <-queue:
  90. req.done <- safeApply(req.apply)
  91. default:
  92. return
  93. }
  94. }
  95. }
  96. }
  97. }
  98. // runSerializedTx runs fn inside one DB transaction on the shared serial
  99. // traffic-writer goroutine, so it can never execute concurrently with the
  100. // @every 5s traffic poll (AddTraffic). Both touch the hot client_traffics and
  101. // inbounds rows, and they acquire them in opposite order (the poll locks
  102. // inbounds then client_traffics; an admin client/inbound mutation does the
  103. // reverse), which Postgres aborts as a deadlock (SQLSTATE 40P01). Routing every
  104. // such mutation through this single writer removes that contention entirely.
  105. //
  106. // Keep network I/O (node pushes) OUT of fn: holding the single writer across a
  107. // remote node call would stall all traffic accounting for up to the remote
  108. // timeout. Apply runtime changes after this returns.
  109. func runSerializedTx(fn func(tx *gorm.DB) error) error {
  110. return submitTrafficWrite(func() error {
  111. return database.GetDB().Transaction(fn)
  112. })
  113. }
  114. func safeApply(fn func() error) (err error) {
  115. defer func() {
  116. if r := recover(); r != nil {
  117. err = fmt.Errorf("traffic writer panic: %v", r)
  118. logger.Error(err.Error())
  119. }
  120. }()
  121. return fn()
  122. }
  123. func submitTrafficWrite(fn func() error) error {
  124. req := &trafficWriteRequest{apply: fn, done: make(chan error, 1)}
  125. twMu.Lock()
  126. queue := twQueue
  127. ctx := twCtx
  128. done := twDone
  129. if queue == nil || ctx == nil || done == nil {
  130. twMu.Unlock()
  131. return safeApply(fn)
  132. }
  133. select {
  134. case <-ctx.Done():
  135. twMu.Unlock()
  136. return safeApply(fn)
  137. default:
  138. }
  139. timer := time.NewTimer(trafficWriterSubmitTimeout)
  140. defer timer.Stop()
  141. select {
  142. case queue <- req:
  143. twMu.Unlock()
  144. case <-timer.C:
  145. twMu.Unlock()
  146. return errors.New("traffic writer queue full")
  147. }
  148. select {
  149. case err := <-req.done:
  150. return err
  151. case <-done:
  152. select {
  153. case err := <-req.done:
  154. return err
  155. default:
  156. return errors.New("traffic writer stopped before write completed")
  157. }
  158. }
  159. }