node_traffic_sync_job.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. )
  16. type NodeTrafficSyncJob struct {
  17. nodeService service.NodeService
  18. inboundService service.InboundService
  19. running sync.Mutex
  20. structural atomicBool
  21. }
  22. type atomicBool struct {
  23. mu sync.Mutex
  24. v bool
  25. }
  26. func (a *atomicBool) set() {
  27. a.mu.Lock()
  28. a.v = true
  29. a.mu.Unlock()
  30. }
  31. func (a *atomicBool) takeAndReset() bool {
  32. a.mu.Lock()
  33. v := a.v
  34. a.v = false
  35. a.mu.Unlock()
  36. return v
  37. }
  38. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  39. return &NodeTrafficSyncJob{}
  40. }
  41. func (j *NodeTrafficSyncJob) Run() {
  42. if !j.running.TryLock() {
  43. return
  44. }
  45. defer j.running.Unlock()
  46. mgr := runtime.GetManager()
  47. if mgr == nil {
  48. return
  49. }
  50. nodes, err := j.nodeService.GetAll()
  51. if err != nil {
  52. logger.Warning("node traffic sync: load nodes failed:", err)
  53. return
  54. }
  55. if len(nodes) == 0 {
  56. return
  57. }
  58. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  59. var wg sync.WaitGroup
  60. for _, n := range nodes {
  61. if !n.Enable || n.Status != "online" {
  62. continue
  63. }
  64. wg.Add(1)
  65. sem <- struct{}{}
  66. go func(n *model.Node) {
  67. defer wg.Done()
  68. defer func() { <-sem }()
  69. j.syncOne(mgr, n)
  70. }(n)
  71. }
  72. wg.Wait()
  73. if !websocket.HasClients() {
  74. return
  75. }
  76. online := j.inboundService.GetOnlineClients()
  77. if online == nil {
  78. online = []string{}
  79. }
  80. lastOnline, err := j.inboundService.GetClientsLastOnline()
  81. if err != nil {
  82. logger.Warning("node traffic sync: get last-online failed:", err)
  83. }
  84. if lastOnline == nil {
  85. lastOnline = map[string]int64{}
  86. }
  87. websocket.BroadcastTraffic(map[string]any{
  88. "onlineClients": online,
  89. "lastOnlineMap": lastOnline,
  90. })
  91. clientStats := map[string]any{}
  92. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  93. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  94. } else if len(stats) > 0 {
  95. clientStats["clients"] = stats
  96. }
  97. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  98. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  99. } else if len(summary) > 0 {
  100. clientStats["inbounds"] = summary
  101. }
  102. if len(clientStats) > 0 {
  103. websocket.BroadcastClientStats(clientStats)
  104. }
  105. if j.structural.takeAndReset() {
  106. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  107. }
  108. }
  109. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  110. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  111. defer cancel()
  112. rt, err := mgr.RemoteFor(n)
  113. if err != nil {
  114. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  115. return
  116. }
  117. snap, err := rt.FetchTrafficSnapshot(ctx)
  118. if err != nil {
  119. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  120. j.inboundService.ClearNodeOnlineClients(n.Id)
  121. return
  122. }
  123. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
  124. if err != nil {
  125. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  126. return
  127. }
  128. if changed {
  129. j.structural.set()
  130. }
  131. }