node_heartbeat_job.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package job
  2. import (
  3. "context"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/internal/eventbus"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  11. "github.com/mhsanaei/3x-ui/v3/internal/web/service"
  12. "github.com/mhsanaei/3x-ui/v3/internal/web/websocket"
  13. )
  14. const (
  15. nodeHeartbeatConcurrency = 32
  16. nodeHeartbeatRequestTimeout = 4 * time.Second
  17. )
  18. type NodeHeartbeatJob struct {
  19. nodeService service.NodeService
  20. running sync.Mutex
  21. }
  22. func NewNodeHeartbeatJob() *NodeHeartbeatJob {
  23. return &NodeHeartbeatJob{}
  24. }
  25. func (j *NodeHeartbeatJob) Run() {
  26. if !j.running.TryLock() {
  27. return
  28. }
  29. defer j.running.Unlock()
  30. nodes, err := j.nodeService.GetAll()
  31. if err != nil {
  32. logger.Warning("node heartbeat: load nodes failed:", err)
  33. return
  34. }
  35. if len(nodes) == 0 {
  36. return
  37. }
  38. sem := make(chan struct{}, nodeHeartbeatConcurrency)
  39. var wg sync.WaitGroup
  40. for _, n := range nodes {
  41. if !n.Enable {
  42. continue
  43. }
  44. wg.Add(1)
  45. sem <- struct{}{}
  46. n := n
  47. common.GoRecover("node-heartbeat:"+n.Name, func() {
  48. defer wg.Done()
  49. defer func() { <-sem }()
  50. j.probeOne(n)
  51. })
  52. }
  53. wg.Wait()
  54. if !websocket.HasClients() {
  55. return
  56. }
  57. updated, err := j.nodeService.GetNodeTree()
  58. if err != nil {
  59. logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
  60. return
  61. }
  62. websocket.BroadcastNodes(updated)
  63. }
  64. func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
  65. ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  66. defer cancel()
  67. prevStatus := n.Status
  68. patch, err := j.nodeService.Probe(ctx, n)
  69. if err != nil {
  70. patch.Status = "offline"
  71. } else {
  72. patch.Status = "online"
  73. }
  74. if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
  75. logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
  76. }
  77. publishNodeTransition(n, prevStatus, patch)
  78. // Learn the nodes this node manages so the panel can surface them as
  79. // transitive sub-nodes (#4983). Fresh context — the probe budget above may
  80. // be spent. Drop them when the node is unreachable.
  81. if patch.Status == "online" {
  82. dctx, dcancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  83. j.nodeService.RefreshDescendants(dctx, n)
  84. dcancel()
  85. } else {
  86. j.nodeService.ClearDescendants(n.Id)
  87. }
  88. }
  89. // publishNodeTransition emits node.down / node.up only on a genuine state change.
  90. // An "unknown"/empty previous status (fresh start) is treated as not-online, so a
  91. // node coming up for the first time fires node.up but never a spurious node.down.
  92. func publishNodeTransition(n *model.Node, prevStatus string, patch service.HeartbeatPatch) {
  93. if EventBus == nil {
  94. return
  95. }
  96. var eventType eventbus.EventType
  97. switch {
  98. case prevStatus == "online" && patch.Status == "offline":
  99. eventType = eventbus.EventNodeDown
  100. case prevStatus != "online" && patch.Status == "online":
  101. eventType = eventbus.EventNodeUp
  102. default:
  103. return
  104. }
  105. source := n.Name
  106. if source == "" {
  107. source = "node-" + strconv.Itoa(n.Id)
  108. }
  109. EventBus.Publish(eventbus.Event{
  110. Type: eventType,
  111. Source: source,
  112. Data: &eventbus.NodeHealthData{
  113. NodeId: n.Id,
  114. LatencyMs: patch.LatencyMs,
  115. CpuPct: patch.CpuPct,
  116. MemPct: patch.MemPct,
  117. XrayState: patch.XrayState,
  118. XrayError: patch.XrayError,
  119. },
  120. })
  121. }