node_traffic_sync_job.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. nodeReconcileTimeout = 30 * time.Second
  16. )
  17. type NodeTrafficSyncJob struct {
  18. nodeService service.NodeService
  19. inboundService service.InboundService
  20. settingService service.SettingService
  21. xrayService service.XrayService
  22. running sync.Mutex
  23. structural atomicBool
  24. }
  25. type atomicBool struct {
  26. mu sync.Mutex
  27. v bool
  28. }
  29. func (a *atomicBool) set() {
  30. a.mu.Lock()
  31. a.v = true
  32. a.mu.Unlock()
  33. }
  34. func (a *atomicBool) takeAndReset() bool {
  35. a.mu.Lock()
  36. v := a.v
  37. a.v = false
  38. a.mu.Unlock()
  39. return v
  40. }
  41. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  42. return &NodeTrafficSyncJob{}
  43. }
  44. func (j *NodeTrafficSyncJob) Run() {
  45. if !j.running.TryLock() {
  46. return
  47. }
  48. defer j.running.Unlock()
  49. mgr := runtime.GetManager()
  50. if mgr == nil {
  51. return
  52. }
  53. nodes, err := j.nodeService.GetAll()
  54. if err != nil {
  55. logger.Warning("node traffic sync: load nodes failed:", err)
  56. return
  57. }
  58. if len(nodes) == 0 {
  59. return
  60. }
  61. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  62. var wg sync.WaitGroup
  63. for _, n := range nodes {
  64. if !n.Enable || n.Status != "online" {
  65. continue
  66. }
  67. wg.Add(1)
  68. sem <- struct{}{}
  69. go func(n *model.Node) {
  70. defer wg.Done()
  71. defer func() { <-sem }()
  72. j.syncOne(mgr, n)
  73. }(n)
  74. }
  75. wg.Wait()
  76. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  77. if err != nil {
  78. logger.Warning("node traffic sync: depletion check failed:", err)
  79. }
  80. if clientsDisabled {
  81. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  82. if err := j.xrayService.RestartXray(true); err != nil {
  83. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  84. j.xrayService.SetToNeedRestart()
  85. }
  86. } else if settingErr != nil {
  87. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  88. }
  89. j.structural.set()
  90. }
  91. lastOnline, err := j.inboundService.GetClientsLastOnline()
  92. if err != nil {
  93. logger.Warning("node traffic sync: get last-online failed:", err)
  94. }
  95. if lastOnline == nil {
  96. lastOnline = map[string]int64{}
  97. }
  98. // Prune stale local-online entries (no local active emails or inbound tags
  99. // to add here — only the local xray poll feeds those) so a stopped local
  100. // xray's clients and inbounds still age out between traffic polls.
  101. j.inboundService.RefreshLocalOnlineClients(nil, nil)
  102. if !websocket.HasClients() {
  103. return
  104. }
  105. online := j.inboundService.GetOnlineClients()
  106. if online == nil {
  107. online = []string{}
  108. }
  109. websocket.BroadcastTraffic(map[string]any{
  110. "onlineClients": online,
  111. "onlineByNode": j.inboundService.GetOnlineClientsByNode(),
  112. "activeInbounds": j.inboundService.GetActiveInboundsByNode(),
  113. "lastOnlineMap": lastOnline,
  114. })
  115. clientStats := map[string]any{}
  116. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  117. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  118. } else if len(stats) > 0 {
  119. clientStats["clients"] = stats
  120. }
  121. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  122. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  123. } else if len(summary) > 0 {
  124. clientStats["inbounds"] = summary
  125. }
  126. if len(clientStats) > 0 {
  127. websocket.BroadcastClientStats(clientStats)
  128. }
  129. if j.structural.takeAndReset() {
  130. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  131. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  132. }
  133. }
  134. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  135. rt, err := mgr.RemoteFor(n)
  136. if err != nil {
  137. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  138. return
  139. }
  140. if n.ConfigDirty {
  141. reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
  142. reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n.Id)
  143. reconcileCancel()
  144. if reconcileErr != nil {
  145. logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr)
  146. return
  147. }
  148. if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
  149. logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr)
  150. }
  151. j.structural.set()
  152. }
  153. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  154. defer cancel()
  155. snap, err := rt.FetchTrafficSnapshot(ctx)
  156. if err != nil {
  157. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  158. j.inboundService.ClearNodeOnlineClients(n.Id)
  159. return
  160. }
  161. _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
  162. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
  163. if err != nil {
  164. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  165. return
  166. }
  167. if changed {
  168. j.structural.set()
  169. }
  170. }