node_traffic_sync_job.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. )
  16. type NodeTrafficSyncJob struct {
  17. nodeService service.NodeService
  18. inboundService service.InboundService
  19. running sync.Mutex
  20. structural atomicBool
  21. }
  22. type atomicBool struct {
  23. mu sync.Mutex
  24. v bool
  25. }
  26. func (a *atomicBool) set() {
  27. a.mu.Lock()
  28. a.v = true
  29. a.mu.Unlock()
  30. }
  31. func (a *atomicBool) takeAndReset() bool {
  32. a.mu.Lock()
  33. v := a.v
  34. a.v = false
  35. a.mu.Unlock()
  36. return v
  37. }
  38. type emailSet struct {
  39. mu sync.Mutex
  40. m map[string]struct{}
  41. }
  42. func newEmailSet() *emailSet { return &emailSet{m: make(map[string]struct{})} }
  43. func (s *emailSet) addAll(emails []string) {
  44. if len(emails) == 0 {
  45. return
  46. }
  47. s.mu.Lock()
  48. for _, e := range emails {
  49. if e != "" {
  50. s.m[e] = struct{}{}
  51. }
  52. }
  53. s.mu.Unlock()
  54. }
  55. func (s *emailSet) slice() []string {
  56. s.mu.Lock()
  57. defer s.mu.Unlock()
  58. out := make([]string, 0, len(s.m))
  59. for e := range s.m {
  60. out = append(out, e)
  61. }
  62. return out
  63. }
  64. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  65. return &NodeTrafficSyncJob{}
  66. }
  67. func (j *NodeTrafficSyncJob) Run() {
  68. if !j.running.TryLock() {
  69. return
  70. }
  71. defer j.running.Unlock()
  72. mgr := runtime.GetManager()
  73. if mgr == nil {
  74. return
  75. }
  76. nodes, err := j.nodeService.GetAll()
  77. if err != nil {
  78. logger.Warning("node traffic sync: load nodes failed:", err)
  79. return
  80. }
  81. if len(nodes) == 0 {
  82. return
  83. }
  84. touched := newEmailSet()
  85. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  86. var wg sync.WaitGroup
  87. for _, n := range nodes {
  88. if !n.Enable || n.Status != "online" {
  89. continue
  90. }
  91. wg.Add(1)
  92. sem <- struct{}{}
  93. go func(n *model.Node) {
  94. defer wg.Done()
  95. defer func() { <-sem }()
  96. j.syncOne(mgr, n, touched)
  97. }(n)
  98. }
  99. wg.Wait()
  100. if !websocket.HasClients() {
  101. return
  102. }
  103. online := j.inboundService.GetOnlineClients()
  104. if online == nil {
  105. online = []string{}
  106. }
  107. lastOnline, err := j.inboundService.GetClientsLastOnline()
  108. if err != nil {
  109. logger.Warning("node traffic sync: get last-online failed:", err)
  110. }
  111. if lastOnline == nil {
  112. lastOnline = map[string]int64{}
  113. }
  114. websocket.BroadcastTraffic(map[string]any{
  115. "onlineClients": online,
  116. "lastOnlineMap": lastOnline,
  117. })
  118. clientStats := map[string]any{}
  119. if emails := touched.slice(); len(emails) > 0 {
  120. if stats, err := j.inboundService.GetActiveClientTraffics(emails); err != nil {
  121. logger.Warning("node traffic sync: get client traffics for websocket failed:", err)
  122. } else if len(stats) > 0 {
  123. clientStats["clients"] = stats
  124. }
  125. }
  126. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  127. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  128. } else if len(summary) > 0 {
  129. clientStats["inbounds"] = summary
  130. }
  131. if len(clientStats) > 0 {
  132. websocket.BroadcastClientStats(clientStats)
  133. }
  134. if j.structural.takeAndReset() {
  135. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  136. }
  137. }
  138. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node, touched *emailSet) {
  139. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  140. defer cancel()
  141. rt, err := mgr.RemoteFor(n)
  142. if err != nil {
  143. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  144. return
  145. }
  146. snap, err := rt.FetchTrafficSnapshot(ctx)
  147. if err != nil {
  148. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  149. j.inboundService.ClearNodeOnlineClients(n.Id)
  150. return
  151. }
  152. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
  153. if err != nil {
  154. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  155. return
  156. }
  157. if changed {
  158. j.structural.set()
  159. }
  160. for _, ib := range snap.Inbounds {
  161. if ib == nil {
  162. continue
  163. }
  164. emails := make([]string, 0, len(ib.ClientStats))
  165. for _, cs := range ib.ClientStats {
  166. if cs.Email != "" {
  167. emails = append(emails, cs.Email)
  168. }
  169. }
  170. touched.addAll(emails)
  171. }
  172. }