node_heartbeat_job.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/service"
  9. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  10. )
  11. // nodeHeartbeatConcurrency caps how many remote panels we probe at once.
  12. // Plenty of headroom for typical deployments (tens of nodes) without
  13. // letting a misconfigured run open thousands of sockets at once.
  14. const nodeHeartbeatConcurrency = 32
  15. // nodeHeartbeatRequestTimeout bounds a single probe. The cron is @every 10s,
  16. // so this needs to stay well under that to avoid run pile-up.
  17. const nodeHeartbeatRequestTimeout = 6 * time.Second
  18. // NodeHeartbeatJob probes every enabled remote node once per cron tick
  19. // and persists the result. Disabled nodes are skipped entirely so a
  20. // long-broken node can be parked without burning sockets every 10s.
  21. type NodeHeartbeatJob struct {
  22. nodeService service.NodeService
  23. // Coarse mutex prevents two ticks running concurrently if probes
  24. // pile up under network failure. The next tick simply skips when
  25. // the previous one is still draining.
  26. running sync.Mutex
  27. }
  28. // NewNodeHeartbeatJob constructs a heartbeat job. The robfig/cron
  29. // scheduler will hand the same instance to every tick, so the
  30. // running mutex carries across runs as intended.
  31. func NewNodeHeartbeatJob() *NodeHeartbeatJob {
  32. return &NodeHeartbeatJob{}
  33. }
  34. func (j *NodeHeartbeatJob) Run() {
  35. if !j.running.TryLock() {
  36. // Previous tick still in flight — skip this one.
  37. return
  38. }
  39. defer j.running.Unlock()
  40. nodes, err := j.nodeService.GetAll()
  41. if err != nil {
  42. logger.Warning("node heartbeat: load nodes failed:", err)
  43. return
  44. }
  45. if len(nodes) == 0 {
  46. return
  47. }
  48. sem := make(chan struct{}, nodeHeartbeatConcurrency)
  49. var wg sync.WaitGroup
  50. for _, n := range nodes {
  51. if !n.Enable {
  52. continue
  53. }
  54. wg.Add(1)
  55. sem <- struct{}{}
  56. go func(n *model.Node) {
  57. defer wg.Done()
  58. defer func() { <-sem }()
  59. j.probeOne(n)
  60. }(n)
  61. }
  62. wg.Wait()
  63. // Push the fresh list to any open Nodes page over WebSocket so the
  64. // status / latency / cpu / mem cells update without the user clicking
  65. // refresh. Skip the DB read entirely when no browser is connected —
  66. // matches the gating pattern in xray_traffic_job.
  67. if !websocket.HasClients() {
  68. return
  69. }
  70. updated, err := j.nodeService.GetAll()
  71. if err != nil {
  72. logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
  73. return
  74. }
  75. websocket.BroadcastNodes(updated)
  76. }
  77. // probeOne runs a single probe and persists the result. We deliberately
  78. // don't return errors — partial failures across the node set should not
  79. // abort other probes, and the LastError column carries the message for
  80. // the UI to surface.
  81. func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
  82. ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  83. defer cancel()
  84. patch, err := j.nodeService.Probe(ctx, n)
  85. if err != nil {
  86. patch.Status = "offline"
  87. } else {
  88. patch.Status = "online"
  89. }
  90. if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
  91. // A row deleted mid-tick produces "rows affected = 0", which
  92. // gorm reports as nil — so any error we get here is real.
  93. logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
  94. }
  95. }