1
0

node_traffic_sync_job.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. nodeReconcileTimeout = 30 * time.Second
  16. nodeClientIpSyncInterval = 10 * time.Second
  17. )
  18. type NodeTrafficSyncJob struct {
  19. nodeService service.NodeService
  20. inboundService service.InboundService
  21. settingService service.SettingService
  22. xrayService service.XrayService
  23. running sync.Mutex
  24. structural atomicBool
  25. ipSyncMu sync.Mutex
  26. lastIpSync int64
  27. }
  28. type atomicBool struct {
  29. mu sync.Mutex
  30. v bool
  31. }
  32. func (a *atomicBool) set() {
  33. a.mu.Lock()
  34. a.v = true
  35. a.mu.Unlock()
  36. }
  37. func (a *atomicBool) takeAndReset() bool {
  38. a.mu.Lock()
  39. v := a.v
  40. a.v = false
  41. a.mu.Unlock()
  42. return v
  43. }
  44. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  45. return &NodeTrafficSyncJob{}
  46. }
  47. func (j *NodeTrafficSyncJob) Run() {
  48. if !j.running.TryLock() {
  49. return
  50. }
  51. defer j.running.Unlock()
  52. mgr := runtime.GetManager()
  53. if mgr == nil {
  54. return
  55. }
  56. nodes, err := j.nodeService.GetAll()
  57. if err != nil {
  58. logger.Warning("node traffic sync: load nodes failed:", err)
  59. return
  60. }
  61. if len(nodes) == 0 {
  62. return
  63. }
  64. // Decide once per tick whether this run also syncs client IPs, and stamp the
  65. // clock before the loop so two back-to-back 5s ticks can't both qualify.
  66. doIpSync := false
  67. j.ipSyncMu.Lock()
  68. if now := time.Now().Unix(); now-j.lastIpSync >= int64(nodeClientIpSyncInterval/time.Second) {
  69. doIpSync = true
  70. j.lastIpSync = now
  71. }
  72. j.ipSyncMu.Unlock()
  73. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  74. var wg sync.WaitGroup
  75. for _, n := range nodes {
  76. if !n.Enable || n.Status != "online" {
  77. continue
  78. }
  79. wg.Add(1)
  80. sem <- struct{}{}
  81. go func(n *model.Node) {
  82. defer wg.Done()
  83. defer func() { <-sem }()
  84. j.syncOne(mgr, n, doIpSync)
  85. }(n)
  86. }
  87. wg.Wait()
  88. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  89. if err != nil {
  90. logger.Warning("node traffic sync: depletion check failed:", err)
  91. }
  92. if clientsDisabled {
  93. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  94. if err := j.xrayService.RestartXray(true); err != nil {
  95. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  96. j.xrayService.SetToNeedRestart()
  97. }
  98. } else if settingErr != nil {
  99. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  100. }
  101. j.structural.set()
  102. }
  103. lastOnline, err := j.inboundService.GetClientsLastOnline()
  104. if err != nil {
  105. logger.Warning("node traffic sync: get last-online failed:", err)
  106. }
  107. if lastOnline == nil {
  108. lastOnline = map[string]int64{}
  109. }
  110. // Prune stale local-online entries (no local active emails or inbound tags
  111. // to add here — only the local xray poll feeds those) so a stopped local
  112. // xray's clients and inbounds still age out between traffic polls.
  113. j.inboundService.RefreshLocalOnlineClients(nil, nil)
  114. if !websocket.HasClients() {
  115. return
  116. }
  117. online := j.inboundService.GetOnlineClients()
  118. if online == nil {
  119. online = []string{}
  120. }
  121. websocket.BroadcastTraffic(map[string]any{
  122. "onlineClients": online,
  123. "onlineByGuid": j.inboundService.GetOnlineClientsByGuid(),
  124. "activeInbounds": j.inboundService.GetActiveInboundsByGuid(),
  125. "lastOnlineMap": lastOnline,
  126. })
  127. clientStats := map[string]any{}
  128. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  129. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  130. } else if len(stats) > 0 {
  131. clientStats["clients"] = stats
  132. }
  133. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  134. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  135. } else if len(summary) > 0 {
  136. clientStats["inbounds"] = summary
  137. }
  138. if len(clientStats) > 0 {
  139. websocket.BroadcastClientStats(clientStats)
  140. }
  141. if j.structural.takeAndReset() {
  142. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  143. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  144. }
  145. }
  146. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, doIpSync bool) {
  147. rt, err := mgr.RemoteFor(n)
  148. if err != nil {
  149. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  150. return
  151. }
  152. if n.ConfigDirty {
  153. reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
  154. reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n.Id)
  155. reconcileCancel()
  156. if reconcileErr != nil {
  157. logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr)
  158. return
  159. }
  160. if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
  161. logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr)
  162. }
  163. j.structural.set()
  164. }
  165. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  166. defer cancel()
  167. snap, err := rt.FetchTrafficSnapshot(ctx)
  168. if err != nil {
  169. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  170. j.inboundService.ClearNodeOnlineClients(n.Id)
  171. return
  172. }
  173. _, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
  174. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
  175. if err != nil {
  176. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  177. return
  178. }
  179. if changed {
  180. j.structural.set()
  181. }
  182. if !doIpSync {
  183. return
  184. }
  185. nodeIps, err := rt.FetchAllClientIps(ctx)
  186. if err == nil && len(nodeIps) > 0 {
  187. if err := j.inboundService.MergeInboundClientIps(nodeIps); err != nil {
  188. logger.Warning("node traffic sync: merge client ips from", n.Name, "failed:", err)
  189. }
  190. } else if err != nil {
  191. logger.Warning("node traffic sync: fetch client ips from", n.Name, "failed:", err)
  192. }
  193. masterIps, err := j.inboundService.GetAllInboundClientIps()
  194. if err != nil {
  195. logger.Warning("node traffic sync: load client ips for push to", n.Name, "failed:", err)
  196. return
  197. }
  198. if len(masterIps) > 0 {
  199. if err := rt.PushAllClientIps(ctx, masterIps); err != nil {
  200. logger.Warning("node traffic sync: push client ips to", n.Name, "failed:", err)
  201. }
  202. }
  203. }