1
0

node_traffic_sync_job.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. "github.com/mhsanaei/3x-ui/v3/web/service"
  10. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  11. )
  12. // nodeTrafficSyncConcurrency caps how many nodes we sync simultaneously.
  13. // Each sync does three HTTP calls in series, so the wall-clock budget
  14. // per node is the request timeout below — keeping the cap modest avoids
  15. // flooding the network while still getting through dozens of nodes
  16. // inside a 10s tick.
  17. const nodeTrafficSyncConcurrency = 8
  18. // nodeTrafficSyncRequestTimeout bounds the per-node sync. Three probes
  19. // in series at 8s each would blow past the cron interval, so the budget
  20. // here covers the whole snapshot — FetchTrafficSnapshot internally caps
  21. // each HTTP call at the runtime's own 10s ceiling but uses ctx for the
  22. // outer total.
  23. const nodeTrafficSyncRequestTimeout = 8 * time.Second
  24. // NodeTrafficSyncJob pulls absolute traffic + online stats from every
  25. // enabled, currently-online remote node and merges them into the central
  26. // DB. Mirrors NodeHeartbeatJob's structure: TryLock to skip pile-ups,
  27. // errgroup-style fan-out with a concurrency cap, per-node ctx timeout.
  28. //
  29. // Offline nodes are skipped entirely — the heartbeat job already owns
  30. // status tracking, and we'd just waste sockets retrying a node we know
  31. // is unreachable. As soon as heartbeat marks a node online again, the
  32. // next traffic tick picks it up.
  33. type NodeTrafficSyncJob struct {
  34. nodeService service.NodeService
  35. inboundService service.InboundService
  36. // Coarse mutex prevents two ticks running concurrently if a single
  37. // sync stalls past the 10s cron interval (rare but possible when
  38. // many nodes are slow simultaneously).
  39. running sync.Mutex
  40. }
  41. // NewNodeTrafficSyncJob builds a singleton sync job. Cron hands the same
  42. // instance to every tick so the running mutex is preserved across runs.
  43. func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
  44. return &NodeTrafficSyncJob{}
  45. }
  46. func (j *NodeTrafficSyncJob) Run() {
  47. if !j.running.TryLock() {
  48. return
  49. }
  50. defer j.running.Unlock()
  51. mgr := runtime.GetManager()
  52. if mgr == nil {
  53. // Server still booting — pre-Manager runs are normal during
  54. // the first few seconds of startup.
  55. return
  56. }
  57. nodes, err := j.nodeService.GetAll()
  58. if err != nil {
  59. logger.Warning("node traffic sync: load nodes failed:", err)
  60. return
  61. }
  62. if len(nodes) == 0 {
  63. return
  64. }
  65. sem := make(chan struct{}, nodeTrafficSyncConcurrency)
  66. var wg sync.WaitGroup
  67. for _, n := range nodes {
  68. if !n.Enable || n.Status != "online" {
  69. continue
  70. }
  71. wg.Add(1)
  72. sem <- struct{}{}
  73. go func(n *model.Node) {
  74. defer wg.Done()
  75. defer func() { <-sem }()
  76. j.syncOne(mgr, n)
  77. }(n)
  78. }
  79. wg.Wait()
  80. // One broadcast per tick, batched across all nodes — frontend code
  81. // is invariant to whether the rows came from local xray or a node,
  82. // so we reuse the same WebSocket envelope XrayTrafficJob uses.
  83. if websocket.HasClients() {
  84. online := j.inboundService.GetOnlineClients()
  85. if online == nil {
  86. online = []string{}
  87. }
  88. lastOnline, err := j.inboundService.GetClientsLastOnline()
  89. if err != nil {
  90. logger.Warning("node traffic sync: get last-online failed:", err)
  91. }
  92. if lastOnline == nil {
  93. lastOnline = map[string]int64{}
  94. }
  95. websocket.BroadcastTraffic(map[string]any{
  96. "onlineClients": online,
  97. "lastOnlineMap": lastOnline,
  98. })
  99. }
  100. }
  101. // syncOne fetches and merges one node's snapshot. Errors are logged
  102. // per-node and don't propagate; one slow node shouldn't keep the rest
  103. // from running.
  104. func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
  105. ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
  106. defer cancel()
  107. rt, err := mgr.RemoteFor(n)
  108. if err != nil {
  109. logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
  110. return
  111. }
  112. snap, err := rt.FetchTrafficSnapshot(ctx)
  113. if err != nil {
  114. logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
  115. // Drop node-online contribution so a hiccup doesn't leave the
  116. // online filter showing stale clients indefinitely.
  117. j.inboundService.ClearNodeOnlineClients(n.Id)
  118. return
  119. }
  120. if err := j.inboundService.SetRemoteTraffic(n.Id, snap); err != nil {
  121. logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
  122. }
  123. }