node.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/mhsanaei/3x-ui/v3/database"
  12. "github.com/mhsanaei/3x-ui/v3/database/model"
  13. "github.com/mhsanaei/3x-ui/v3/util/common"
  14. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  15. )
  16. type HeartbeatPatch struct {
  17. Status string
  18. LastHeartbeat int64
  19. LatencyMs int
  20. XrayVersion string
  21. CpuPct float64
  22. MemPct float64
  23. UptimeSecs uint64
  24. LastError string
  25. }
  26. type NodeService struct{}
  27. var nodeHTTPClient = &http.Client{
  28. Transport: &http.Transport{
  29. MaxIdleConns: 64,
  30. MaxIdleConnsPerHost: 4,
  31. IdleConnTimeout: 60 * time.Second,
  32. },
  33. }
  34. func (s *NodeService) GetAll() ([]*model.Node, error) {
  35. db := database.GetDB()
  36. var nodes []*model.Node
  37. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  38. return nodes, err
  39. }
  40. func (s *NodeService) GetById(id int) (*model.Node, error) {
  41. db := database.GetDB()
  42. n := &model.Node{}
  43. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  44. return nil, err
  45. }
  46. return n, nil
  47. }
  48. func normalizeBasePath(p string) string {
  49. p = strings.TrimSpace(p)
  50. if p == "" {
  51. return "/"
  52. }
  53. if !strings.HasPrefix(p, "/") {
  54. p = "/" + p
  55. }
  56. if !strings.HasSuffix(p, "/") {
  57. p = p + "/"
  58. }
  59. return p
  60. }
  61. func (s *NodeService) normalize(n *model.Node) error {
  62. n.Name = strings.TrimSpace(n.Name)
  63. n.Address = strings.TrimSpace(n.Address)
  64. n.ApiToken = strings.TrimSpace(n.ApiToken)
  65. if n.Name == "" {
  66. return common.NewError("node name is required")
  67. }
  68. if n.Address == "" {
  69. return common.NewError("node address is required")
  70. }
  71. if n.Port <= 0 || n.Port > 65535 {
  72. return common.NewError("node port must be 1-65535")
  73. }
  74. if n.Scheme != "http" && n.Scheme != "https" {
  75. n.Scheme = "https"
  76. }
  77. n.BasePath = normalizeBasePath(n.BasePath)
  78. return nil
  79. }
  80. func (s *NodeService) Create(n *model.Node) error {
  81. if err := s.normalize(n); err != nil {
  82. return err
  83. }
  84. db := database.GetDB()
  85. return db.Create(n).Error
  86. }
  87. func (s *NodeService) Update(id int, in *model.Node) error {
  88. if err := s.normalize(in); err != nil {
  89. return err
  90. }
  91. db := database.GetDB()
  92. existing := &model.Node{}
  93. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  94. return err
  95. }
  96. updates := map[string]any{
  97. "name": in.Name,
  98. "remark": in.Remark,
  99. "scheme": in.Scheme,
  100. "address": in.Address,
  101. "port": in.Port,
  102. "base_path": in.BasePath,
  103. "api_token": in.ApiToken,
  104. "enable": in.Enable,
  105. }
  106. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  107. return err
  108. }
  109. if mgr := runtime.GetManager(); mgr != nil {
  110. mgr.InvalidateNode(id)
  111. }
  112. return nil
  113. }
  114. func (s *NodeService) Delete(id int) error {
  115. db := database.GetDB()
  116. if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
  117. return err
  118. }
  119. if mgr := runtime.GetManager(); mgr != nil {
  120. mgr.InvalidateNode(id)
  121. }
  122. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  123. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  124. return nil
  125. }
  126. func (s *NodeService) SetEnable(id int, enable bool) error {
  127. db := database.GetDB()
  128. return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error
  129. }
  130. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  131. db := database.GetDB()
  132. updates := map[string]any{
  133. "status": p.Status,
  134. "last_heartbeat": p.LastHeartbeat,
  135. "latency_ms": p.LatencyMs,
  136. "xray_version": p.XrayVersion,
  137. "cpu_pct": p.CpuPct,
  138. "mem_pct": p.MemPct,
  139. "uptime_secs": p.UptimeSecs,
  140. "last_error": p.LastError,
  141. }
  142. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  143. return err
  144. }
  145. if p.Status == "online" {
  146. now := time.Unix(p.LastHeartbeat, 0)
  147. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  148. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  149. }
  150. return nil
  151. }
  152. func nodeMetricKey(id int, metric string) string {
  153. return "node:" + strconv.Itoa(id) + ":" + metric
  154. }
  155. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  156. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  157. }
  158. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  159. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  160. url := fmt.Sprintf("%s://%s:%d%spanel/api/server/status",
  161. n.Scheme, n.Address, n.Port, normalizeBasePath(n.BasePath))
  162. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  163. if err != nil {
  164. patch.LastError = err.Error()
  165. return patch, err
  166. }
  167. if n.ApiToken != "" {
  168. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  169. }
  170. req.Header.Set("Accept", "application/json")
  171. start := time.Now()
  172. resp, err := nodeHTTPClient.Do(req)
  173. if err != nil {
  174. patch.LastError = err.Error()
  175. return patch, err
  176. }
  177. defer resp.Body.Close()
  178. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  179. if resp.StatusCode != http.StatusOK {
  180. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  181. return patch, errors.New(patch.LastError)
  182. }
  183. var envelope struct {
  184. Success bool `json:"success"`
  185. Msg string `json:"msg"`
  186. Obj *struct {
  187. CpuPct float64 `json:"cpu"`
  188. Mem struct {
  189. Current uint64 `json:"current"`
  190. Total uint64 `json:"total"`
  191. } `json:"mem"`
  192. Xray struct {
  193. Version string `json:"version"`
  194. } `json:"xray"`
  195. Uptime uint64 `json:"uptime"`
  196. } `json:"obj"`
  197. }
  198. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  199. patch.LastError = "decode response: " + err.Error()
  200. return patch, err
  201. }
  202. if !envelope.Success || envelope.Obj == nil {
  203. patch.LastError = "remote returned success=false: " + envelope.Msg
  204. return patch, errors.New(patch.LastError)
  205. }
  206. o := envelope.Obj
  207. patch.CpuPct = o.CpuPct
  208. if o.Mem.Total > 0 {
  209. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  210. }
  211. patch.XrayVersion = o.Xray.Version
  212. patch.UptimeSecs = o.Uptime
  213. return patch, nil
  214. }
  215. type ProbeResultUI struct {
  216. Status string `json:"status"`
  217. LatencyMs int `json:"latencyMs"`
  218. XrayVersion string `json:"xrayVersion"`
  219. CpuPct float64 `json:"cpuPct"`
  220. MemPct float64 `json:"memPct"`
  221. UptimeSecs uint64 `json:"uptimeSecs"`
  222. Error string `json:"error"`
  223. }
  224. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  225. r := ProbeResultUI{
  226. LatencyMs: p.LatencyMs,
  227. XrayVersion: p.XrayVersion,
  228. CpuPct: p.CpuPct,
  229. MemPct: p.MemPct,
  230. UptimeSecs: p.UptimeSecs,
  231. Error: p.LastError,
  232. }
  233. if ok {
  234. r.Status = "online"
  235. } else {
  236. r.Status = "offline"
  237. }
  238. return r
  239. }