1
0

node_traffic_sync_job.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. )
  16. type NodeTrafficSyncJob struct {
  17. nodeService service.NodeService
  18. inboundService service.InboundService
  19. settingService service.SettingService
  20. xrayService service.XrayService
  21. running sync.Mutex
  22. structural atomicBool
  23. }
  24. type atomicBool struct {
  25. mu sync.Mutex
  26. v bool
  27. }
  28. func (a *atomicBool) set() {
  29. a.mu.Lock()
  30. a.v = true
  31. a.mu.Unlock()
  32. }
  33. func (a *atomicBool) takeAndReset() bool {
  34. a.mu.Lock()
  35. v := a.v
  36. a.v = false
  37. a.mu.Unlock()
  38. return v
  39. }
  40. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  41. return &NodeTrafficSyncJob{}
  42. }
  43. func (j *NodeTrafficSyncJob) Run() {
  44. if !j.running.TryLock() {
  45. return
  46. }
  47. defer j.running.Unlock()
  48. mgr := runtime.GetManager()
  49. if mgr == nil {
  50. return
  51. }
  52. nodes, err := j.nodeService.GetAll()
  53. if err != nil {
  54. logger.Warning("node traffic sync: load nodes failed:", err)
  55. return
  56. }
  57. if len(nodes) == 0 {
  58. return
  59. }
  60. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  61. var wg sync.WaitGroup
  62. for _, n := range nodes {
  63. if !n.Enable || n.Status != "online" {
  64. continue
  65. }
  66. wg.Add(1)
  67. sem <- struct{}{}
  68. go func(n *model.Node) {
  69. defer wg.Done()
  70. defer func() { <-sem }()
  71. j.syncOne(mgr, n)
  72. }(n)
  73. }
  74. wg.Wait()
  75. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  76. if err != nil {
  77. logger.Warning("node traffic sync: depletion check failed:", err)
  78. }
  79. if clientsDisabled {
  80. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  81. if err := j.xrayService.RestartXray(true); err != nil {
  82. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  83. j.xrayService.SetToNeedRestart()
  84. }
  85. } else if settingErr != nil {
  86. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  87. }
  88. j.structural.set()
  89. }
  90. lastOnline, err := j.inboundService.GetClientsLastOnline()
  91. if err != nil {
  92. logger.Warning("node traffic sync: get last-online failed:", err)
  93. }
  94. if lastOnline == nil {
  95. lastOnline = map[string]int64{}
  96. }
  97. // Prune stale local-online entries (no local active emails to add here —
  98. // only the local xray poll feeds those) so a stopped local xray's clients
  99. // still age out between traffic polls.
  100. j.inboundService.RefreshLocalOnlineClients(nil)
  101. if !websocket.HasClients() {
  102. return
  103. }
  104. online := j.inboundService.GetOnlineClients()
  105. if online == nil {
  106. online = []string{}
  107. }
  108. websocket.BroadcastTraffic(map[string]any{
  109. "onlineClients": online,
  110. "onlineByNode": j.inboundService.GetOnlineClientsByNode(),
  111. "lastOnlineMap": lastOnline,
  112. })
  113. clientStats := map[string]any{}
  114. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  115. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  116. } else if len(stats) > 0 {
  117. clientStats["clients"] = stats
  118. }
  119. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  120. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  121. } else if len(summary) > 0 {
  122. clientStats["inbounds"] = summary
  123. }
  124. if len(clientStats) > 0 {
  125. websocket.BroadcastClientStats(clientStats)
  126. }
  127. if j.structural.takeAndReset() {
  128. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  129. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  130. }
  131. }
  132. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  133. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  134. defer cancel()
  135. rt, err := mgr.RemoteFor(n)
  136. if err != nil {
  137. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  138. return
  139. }
  140. snap, err := rt.FetchTrafficSnapshot(ctx)
  141. if err != nil {
  142. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  143. j.inboundService.ClearNodeOnlineClients(n.Id)
  144. return
  145. }
  146. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
  147. if err != nil {
  148. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  149. return
  150. }
  151. if changed {
  152. j.structural.set()
  153. }
  154. }