1
0

node_traffic_sync_job.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. )
  16. type NodeTrafficSyncJob struct {
  17. nodeService service.NodeService
  18. inboundService service.InboundService
  19. settingService service.SettingService
  20. xrayService service.XrayService
  21. running sync.Mutex
  22. structural atomicBool
  23. }
  24. type atomicBool struct {
  25. mu sync.Mutex
  26. v bool
  27. }
  28. func (a *atomicBool) set() {
  29. a.mu.Lock()
  30. a.v = true
  31. a.mu.Unlock()
  32. }
  33. func (a *atomicBool) takeAndReset() bool {
  34. a.mu.Lock()
  35. v := a.v
  36. a.v = false
  37. a.mu.Unlock()
  38. return v
  39. }
  40. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  41. return &NodeTrafficSyncJob{}
  42. }
  43. func (j *NodeTrafficSyncJob) Run() {
  44. if !j.running.TryLock() {
  45. return
  46. }
  47. defer j.running.Unlock()
  48. mgr := runtime.GetManager()
  49. if mgr == nil {
  50. return
  51. }
  52. nodes, err := j.nodeService.GetAll()
  53. if err != nil {
  54. logger.Warning("node traffic sync: load nodes failed:", err)
  55. return
  56. }
  57. if len(nodes) == 0 {
  58. return
  59. }
  60. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  61. var wg sync.WaitGroup
  62. for _, n := range nodes {
  63. if !n.Enable || n.Status != "online" {
  64. continue
  65. }
  66. wg.Add(1)
  67. sem <- struct{}{}
  68. go func(n *model.Node) {
  69. defer wg.Done()
  70. defer func() { <-sem }()
  71. j.syncOne(mgr, n)
  72. }(n)
  73. }
  74. wg.Wait()
  75. _, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
  76. if err != nil {
  77. logger.Warning("node traffic sync: depletion check failed:", err)
  78. }
  79. if clientsDisabled {
  80. if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
  81. if err := j.xrayService.RestartXray(true); err != nil {
  82. logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
  83. j.xrayService.SetToNeedRestart()
  84. }
  85. } else if settingErr != nil {
  86. logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
  87. }
  88. j.structural.set()
  89. }
  90. if !websocket.HasClients() {
  91. return
  92. }
  93. lastOnline, err := j.inboundService.GetClientsLastOnline()
  94. if err != nil {
  95. logger.Warning("node traffic sync: get last-online failed:", err)
  96. }
  97. if lastOnline == nil {
  98. lastOnline = map[string]int64{}
  99. }
  100. j.inboundService.RefreshOnlineClientsFromMap(lastOnline)
  101. online := j.inboundService.GetOnlineClients()
  102. if online == nil {
  103. online = []string{}
  104. }
  105. websocket.BroadcastTraffic(map[string]any{
  106. "onlineClients": online,
  107. "lastOnlineMap": lastOnline,
  108. })
  109. clientStats := map[string]any{}
  110. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  111. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  112. } else if len(stats) > 0 {
  113. clientStats["clients"] = stats
  114. }
  115. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  116. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  117. } else if len(summary) > 0 {
  118. clientStats["inbounds"] = summary
  119. }
  120. if len(clientStats) > 0 {
  121. websocket.BroadcastClientStats(clientStats)
  122. }
  123. if j.structural.takeAndReset() {
  124. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  125. websocket.BroadcastInvalidate(websocket.MessageTypeClients)
  126. }
  127. }
  128. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  129. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  130. defer cancel()
  131. rt, err := mgr.RemoteFor(n)
  132. if err != nil {
  133. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  134. return
  135. }
  136. snap, err := rt.FetchTrafficSnapshot(ctx)
  137. if err != nil {
  138. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  139. j.inboundService.ClearNodeOnlineClients(n.Id)
  140. return
  141. }
  142. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
  143. if err != nil {
  144. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  145. return
  146. }
  147. if changed {
  148. j.structural.set()
  149. }
  150. }