node.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/mhsanaei/3x-ui/v3/database"
  12. "github.com/mhsanaei/3x-ui/v3/database/model"
  13. "github.com/mhsanaei/3x-ui/v3/util/common"
  14. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  15. )
  16. type HeartbeatPatch struct {
  17. Status string
  18. LastHeartbeat int64
  19. LatencyMs int
  20. XrayVersion string
  21. CpuPct float64
  22. MemPct float64
  23. UptimeSecs uint64
  24. LastError string
  25. }
  26. type NodeService struct{}
  27. var nodeHTTPClient = &http.Client{
  28. Transport: &http.Transport{
  29. MaxIdleConns: 64,
  30. MaxIdleConnsPerHost: 4,
  31. IdleConnTimeout: 60 * time.Second,
  32. },
  33. }
  34. func (s *NodeService) GetAll() ([]*model.Node, error) {
  35. db := database.GetDB()
  36. var nodes []*model.Node
  37. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  38. return nodes, err
  39. }
  40. func (s *NodeService) GetById(id int) (*model.Node, error) {
  41. db := database.GetDB()
  42. n := &model.Node{}
  43. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  44. return nil, err
  45. }
  46. return n, nil
  47. }
  48. func normalizeBasePath(p string) string {
  49. p = strings.TrimSpace(p)
  50. if p == "" {
  51. return "/"
  52. }
  53. if !strings.HasPrefix(p, "/") {
  54. p = "/" + p
  55. }
  56. if !strings.HasSuffix(p, "/") {
  57. p = p + "/"
  58. }
  59. return p
  60. }
  61. func (s *NodeService) normalize(n *model.Node) error {
  62. n.Name = strings.TrimSpace(n.Name)
  63. n.Address = strings.TrimSpace(n.Address)
  64. n.ApiToken = strings.TrimSpace(n.ApiToken)
  65. if n.Name == "" {
  66. return common.NewError("node name is required")
  67. }
  68. if n.Address == "" {
  69. return common.NewError("node address is required")
  70. }
  71. if n.Port <= 0 || n.Port > 65535 {
  72. return common.NewError("node port must be 1-65535")
  73. }
  74. if n.Scheme != "http" && n.Scheme != "https" {
  75. n.Scheme = "https"
  76. }
  77. n.BasePath = normalizeBasePath(n.BasePath)
  78. return nil
  79. }
  80. func (s *NodeService) Create(n *model.Node) error {
  81. if err := s.normalize(n); err != nil {
  82. return err
  83. }
  84. db := database.GetDB()
  85. return db.Create(n).Error
  86. }
  87. func (s *NodeService) Update(id int, in *model.Node) error {
  88. if err := s.normalize(in); err != nil {
  89. return err
  90. }
  91. db := database.GetDB()
  92. existing := &model.Node{}
  93. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  94. return err
  95. }
  96. updates := map[string]any{
  97. "name": in.Name,
  98. "remark": in.Remark,
  99. "scheme": in.Scheme,
  100. "address": in.Address,
  101. "port": in.Port,
  102. "base_path": in.BasePath,
  103. "api_token": in.ApiToken,
  104. "enable": in.Enable,
  105. "allow_private_address": in.AllowPrivateAddress,
  106. }
  107. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  108. return err
  109. }
  110. if mgr := runtime.GetManager(); mgr != nil {
  111. mgr.InvalidateNode(id)
  112. }
  113. return nil
  114. }
  115. func (s *NodeService) Delete(id int) error {
  116. db := database.GetDB()
  117. if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
  118. return err
  119. }
  120. if mgr := runtime.GetManager(); mgr != nil {
  121. mgr.InvalidateNode(id)
  122. }
  123. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  124. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  125. return nil
  126. }
  127. func (s *NodeService) SetEnable(id int, enable bool) error {
  128. db := database.GetDB()
  129. return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error
  130. }
  131. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  132. db := database.GetDB()
  133. updates := map[string]any{
  134. "status": p.Status,
  135. "last_heartbeat": p.LastHeartbeat,
  136. "latency_ms": p.LatencyMs,
  137. "xray_version": p.XrayVersion,
  138. "cpu_pct": p.CpuPct,
  139. "mem_pct": p.MemPct,
  140. "uptime_secs": p.UptimeSecs,
  141. "last_error": p.LastError,
  142. }
  143. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  144. return err
  145. }
  146. if p.Status == "online" {
  147. now := time.Unix(p.LastHeartbeat, 0)
  148. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  149. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  150. }
  151. return nil
  152. }
  153. func nodeMetricKey(id int, metric string) string {
  154. return "node:" + strconv.Itoa(id) + ":" + metric
  155. }
  156. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  157. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  158. }
  159. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  160. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  161. url := fmt.Sprintf("%s://%s:%d%spanel/api/server/status",
  162. n.Scheme, n.Address, n.Port, normalizeBasePath(n.BasePath))
  163. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  164. if err != nil {
  165. patch.LastError = err.Error()
  166. return patch, err
  167. }
  168. if n.ApiToken != "" {
  169. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  170. }
  171. req.Header.Set("Accept", "application/json")
  172. start := time.Now()
  173. resp, err := nodeHTTPClient.Do(req)
  174. if err != nil {
  175. patch.LastError = err.Error()
  176. return patch, err
  177. }
  178. defer resp.Body.Close()
  179. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  180. if resp.StatusCode != http.StatusOK {
  181. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  182. return patch, errors.New(patch.LastError)
  183. }
  184. var envelope struct {
  185. Success bool `json:"success"`
  186. Msg string `json:"msg"`
  187. Obj *struct {
  188. CpuPct float64 `json:"cpu"`
  189. Mem struct {
  190. Current uint64 `json:"current"`
  191. Total uint64 `json:"total"`
  192. } `json:"mem"`
  193. Xray struct {
  194. Version string `json:"version"`
  195. } `json:"xray"`
  196. Uptime uint64 `json:"uptime"`
  197. } `json:"obj"`
  198. }
  199. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  200. patch.LastError = "decode response: " + err.Error()
  201. return patch, err
  202. }
  203. if !envelope.Success || envelope.Obj == nil {
  204. patch.LastError = "remote returned success=false: " + envelope.Msg
  205. return patch, errors.New(patch.LastError)
  206. }
  207. o := envelope.Obj
  208. patch.CpuPct = o.CpuPct
  209. if o.Mem.Total > 0 {
  210. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  211. }
  212. patch.XrayVersion = o.Xray.Version
  213. patch.UptimeSecs = o.Uptime
  214. return patch, nil
  215. }
  216. type ProbeResultUI struct {
  217. Status string `json:"status"`
  218. LatencyMs int `json:"latencyMs"`
  219. XrayVersion string `json:"xrayVersion"`
  220. CpuPct float64 `json:"cpuPct"`
  221. MemPct float64 `json:"memPct"`
  222. UptimeSecs uint64 `json:"uptimeSecs"`
  223. Error string `json:"error"`
  224. }
  225. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  226. r := ProbeResultUI{
  227. LatencyMs: p.LatencyMs,
  228. XrayVersion: p.XrayVersion,
  229. CpuPct: p.CpuPct,
  230. MemPct: p.MemPct,
  231. UptimeSecs: p.UptimeSecs,
  232. Error: p.LastError,
  233. }
  234. if ok {
  235. r.Status = "online"
  236. } else {
  237. r.Status = "offline"
  238. }
  239. return r
  240. }