node_traffic_sync_job.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. const (
  13. nodeTrafficSyncConcurrency = 8
  14. nodeTrafficSyncRequestTimeout = 4 * time.Second
  15. )
  16. type NodeTrafficSyncJob struct {
  17. nodeService service.NodeService
  18. inboundService service.InboundService
  19. running sync.Mutex
  20. structural atomicBool
  21. }
  22. type atomicBool struct {
  23. mu sync.Mutex
  24. v bool
  25. }
  26. func (a *atomicBool) set() {
  27. a.mu.Lock()
  28. a.v = true
  29. a.mu.Unlock()
  30. }
  31. func (a *atomicBool) takeAndReset() bool {
  32. a.mu.Lock()
  33. v := a.v
  34. a.v = false
  35. a.mu.Unlock()
  36. return v
  37. }
  38. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  39. return &NodeTrafficSyncJob{}
  40. }
  41. func (j *NodeTrafficSyncJob) Run() {
  42. if !j.running.TryLock() {
  43. return
  44. }
  45. defer j.running.Unlock()
  46. mgr := runtime.GetManager()
  47. if mgr == nil {
  48. return
  49. }
  50. nodes, err := j.nodeService.GetAll()
  51. if err != nil {
  52. logger.Warning("node traffic sync: load nodes failed:", err)
  53. return
  54. }
  55. if len(nodes) == 0 {
  56. return
  57. }
  58. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  59. var wg sync.WaitGroup
  60. for _, n := range nodes {
  61. if !n.Enable || n.Status != "online" {
  62. continue
  63. }
  64. wg.Add(1)
  65. sem <- struct{}{}
  66. go func(n *model.Node) {
  67. defer wg.Done()
  68. defer func() { <-sem }()
  69. j.syncOne(mgr, n)
  70. }(n)
  71. }
  72. wg.Wait()
  73. if !websocket.HasClients() {
  74. return
  75. }
  76. lastOnline, err := j.inboundService.GetClientsLastOnline()
  77. if err != nil {
  78. logger.Warning("node traffic sync: get last-online failed:", err)
  79. }
  80. if lastOnline == nil {
  81. lastOnline = map[string]int64{}
  82. }
  83. j.inboundService.RefreshOnlineClientsFromMap(lastOnline)
  84. online := j.inboundService.GetOnlineClients()
  85. if online == nil {
  86. online = []string{}
  87. }
  88. websocket.BroadcastTraffic(map[string]any{
  89. "onlineClients": online,
  90. "lastOnlineMap": lastOnline,
  91. })
  92. clientStats := map[string]any{}
  93. if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
  94. logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
  95. } else if len(stats) > 0 {
  96. clientStats["clients"] = stats
  97. }
  98. if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
  99. logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
  100. } else if len(summary) > 0 {
  101. clientStats["inbounds"] = summary
  102. }
  103. if len(clientStats) > 0 {
  104. websocket.BroadcastClientStats(clientStats)
  105. }
  106. if j.structural.takeAndReset() {
  107. websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
  108. }
  109. }
  110. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  111. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  112. defer cancel()
  113. rt, err := mgr.RemoteFor(n)
  114. if err != nil {
  115. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  116. return
  117. }
  118. snap, err := rt.FetchTrafficSnapshot(ctx)
  119. if err != nil {
  120. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  121. j.inboundService.ClearNodeOnlineClients(n.Id)
  122. return
  123. }
  124. changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
  125. if err != nil {
  126. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  127. return
  128. }
  129. if changed {
  130. j.structural.set()
  131. }
  132. }