node_heartbeat_job.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package job
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/logger"
  8. "github.com/mhsanaei/3x-ui/v3/web/service"
  9. "github.com/mhsanaei/3x-ui/v3/web/websocket"
  10. )
  11. const (
  12. nodeHeartbeatConcurrency = 32
  13. nodeHeartbeatRequestTimeout = 4 * time.Second
  14. )
  15. type NodeHeartbeatJob struct {
  16. nodeService service.NodeService
  17. running sync.Mutex
  18. }
  19. func NewNodeHeartbeatJob() *NodeHeartbeatJob {
  20. return &NodeHeartbeatJob{}
  21. }
  22. func (j *NodeHeartbeatJob) Run() {
  23. if !j.running.TryLock() {
  24. return
  25. }
  26. defer j.running.Unlock()
  27. nodes, err := j.nodeService.GetAll()
  28. if err != nil {
  29. logger.Warning("node heartbeat: load nodes failed:", err)
  30. return
  31. }
  32. if len(nodes) == 0 {
  33. return
  34. }
  35. sem := make(chan struct{}, nodeHeartbeatConcurrency)
  36. var wg sync.WaitGroup
  37. for _, n := range nodes {
  38. if !n.Enable {
  39. continue
  40. }
  41. wg.Add(1)
  42. sem <- struct{}{}
  43. go func(n *model.Node) {
  44. defer wg.Done()
  45. defer func() { <-sem }()
  46. j.probeOne(n)
  47. }(n)
  48. }
  49. wg.Wait()
  50. if !websocket.HasClients() {
  51. return
  52. }
  53. updated, err := j.nodeService.GetAll()
  54. if err != nil {
  55. logger.Warning("node heartbeat: load nodes for broadcast failed:", err)
  56. return
  57. }
  58. websocket.BroadcastNodes(updated)
  59. }
  60. func (j *NodeHeartbeatJob) probeOne(n *model.Node) {
  61. ctx, cancel := context.WithTimeout(context.Background(), nodeHeartbeatRequestTimeout)
  62. defer cancel()
  63. patch, err := j.nodeService.Probe(ctx, n)
  64. if err != nil {
  65. patch.Status = "offline"
  66. } else {
  67. patch.Status = "online"
  68. }
  69. if updErr := j.nodeService.UpdateHeartbeat(n.Id, patch); updErr != nil {
  70. logger.Warning("node heartbeat: update node", n.Id, "failed:", updErr)
  71. }
  72. }