node_heartbeat_job.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package job
  2. import (
  3. "context"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
  12. )
  13. const (
  14. nodeHeartbeatConcurrency = 32
  15. nodeHeartbeatRequestTimeout = 4 * time.Second
  16. )
  17. type NodeHeartbeatJob struct {
  18. nodeService service.NodeService
  19. running sync.Mutex
  20. }
  21. func NewNodeHeartbeatJob() *NodeHeartbeatJob {
  22. return &NodeHeartbeatJob{}
  23. }
  24. func (j *NodeHeartbeatJob) Run() {
  25. if !j.running.TryLock() {
  26. return
  27. }
  28. defer j.running.Unlock()
  29. nodes, err := j.nodeService.GetAll()
  30. if err != nil {
  31. logger.Warning("node heartbeat: load nodes failed:", err)
  32. return
  33. }
  34. if len(nodes) == 0 {
  35. return
  36. }
  37. sem := make(chan struct{}, nodeHeartbeatConcurrency)
  38. var wg sync.WaitGroup
  39. for _, n := range nodes {
  40. if !n.Enable {
  41. continue
  42. }
  43. wg.Add(1)
  44. sem <- struct{}{}
  45. go func(n *model.Node) {
  46. defer wg.Done()
  47. defer func() { <-sem }()
  48. j.probeOne(n)
  49. }(n)
  50. }
  51. wg.Wait()
  52. if !websocket.HasClients() {
  53. return
  54. }
  55. updated, err := j.nodeService.GetNodeTree()
  56. if err != nil {
  57. logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
  58. return
  59. }
  60. websocket.BroadcastNodes(updated)
  61. }
  62. func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
  63. ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  64. defer cancel()
  65. prevStatus := n.Status
  66. patch, err := j.nodeService.Probe(ctx, n)
  67. if err != nil {
  68. patch.Status = "offline"
  69. } else {
  70. patch.Status = "online"
  71. }
  72. if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
  73. logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
  74. }
  75. publishNodeTransition(n, prevStatus, patch)
  76. // Learn the nodes this node manages so the panel can surface them as
  77. // transitive sub-nodes (#4983). Fresh context — the probe budget above may
  78. // be spent. Drop them when the node is unreachable.
  79. if patch.Status == "online" {
  80. dctx, dcancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  81. j.nodeService.RefreshDescendants(dctx, n)
  82. dcancel()
  83. } else {
  84. j.nodeService.ClearDescendants(n.Id)
  85. }
  86. }
  87. // publishNodeTransition emits node.down / node.up only on a genuine state change.
  88. // An "unknown"/empty previous status (fresh start) is treated as not-online, so a
  89. // node coming up for the first time fires node.up but never a spurious node.down.
  90. func publishNodeTransition(n *model.Node, prevStatus string, patch service.HeartbeatPatch) {
  91. if EventBus == nil {
  92. return
  93. }
  94. var eventType eventbus.EventType
  95. switch {
  96. case prevStatus == "online" && patch.Status == "offline":
  97. eventType = eventbus.EventNodeDown
  98. case prevStatus != "online" && patch.Status == "online":
  99. eventType = eventbus.EventNodeUp
  100. default:
  101. return
  102. }
  103. source := n.Name
  104. if source == "" {
  105. source = "node-" + strconv.Itoa(n.Id)
  106. }
  107. EventBus.Publish(eventbus.Event{
  108. Type: eventType,
  109. Source: source,
  110. Data: &eventbus.NodeHealthData{
  111. NodeId: n.Id,
  112. LatencyMs: patch.LatencyMs,
  113. CpuPct: patch.CpuPct,
  114. MemPct: patch.MemPct,
  115. XrayState: patch.XrayState,
  116. XrayError: patch.XrayError,
  117. },
  118. })
  119. }