node.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/mhsanaei/3x-ui/v3/database"
  12. "github.com/mhsanaei/3x-ui/v3/database/model"
  13. "github.com/mhsanaei/3x-ui/v3/util/common"
  14. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  15. )
  16. type HeartbeatPatch struct {
  17. Status string
  18. LastHeartbeat int64
  19. LatencyMs int
  20. XrayVersion string
  21. CpuPct float64
  22. MemPct float64
  23. UptimeSecs uint64
  24. LastError string
  25. }
  26. type NodeService struct{}
  27. var nodeHTTPClient = &http.Client{
  28. Transport: &http.Transport{
  29. MaxIdleConns: 64,
  30. MaxIdleConnsPerHost: 4,
  31. IdleConnTimeout: 60 * time.Second,
  32. },
  33. }
  34. func (s *NodeService) GetAll() ([]*model.Node, error) {
  35. db := database.GetDB()
  36. var nodes []*model.Node
  37. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  38. return nodes, err
  39. }
  40. func (s *NodeService) GetById(id int) (*model.Node, error) {
  41. db := database.GetDB()
  42. n := &model.Node{}
  43. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  44. return nil, err
  45. }
  46. return n, nil
  47. }
  48. func (s *NodeService) normalize(n *model.Node) error {
  49. n.Name = strings.TrimSpace(n.Name)
  50. n.Address = strings.TrimSpace(n.Address)
  51. n.ApiToken = strings.TrimSpace(n.ApiToken)
  52. if n.Name == "" {
  53. return common.NewError("node name is required")
  54. }
  55. if n.Address == "" {
  56. return common.NewError("node address is required")
  57. }
  58. if n.Port <= 0 || n.Port > 65535 {
  59. return common.NewError("node port must be 1-65535")
  60. }
  61. if n.Scheme != "http" && n.Scheme != "https" {
  62. n.Scheme = "https"
  63. }
  64. if n.BasePath == "" {
  65. n.BasePath = "/"
  66. }
  67. if !strings.HasPrefix(n.BasePath, "/") {
  68. n.BasePath = "/" + n.BasePath
  69. }
  70. if !strings.HasSuffix(n.BasePath, "/") {
  71. n.BasePath = n.BasePath + "/"
  72. }
  73. return nil
  74. }
  75. func (s *NodeService) Create(n *model.Node) error {
  76. if err := s.normalize(n); err != nil {
  77. return err
  78. }
  79. db := database.GetDB()
  80. return db.Create(n).Error
  81. }
  82. func (s *NodeService) Update(id int, in *model.Node) error {
  83. if err := s.normalize(in); err != nil {
  84. return err
  85. }
  86. db := database.GetDB()
  87. existing := &model.Node{}
  88. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  89. return err
  90. }
  91. updates := map[string]any{
  92. "name": in.Name,
  93. "remark": in.Remark,
  94. "scheme": in.Scheme,
  95. "address": in.Address,
  96. "port": in.Port,
  97. "base_path": in.BasePath,
  98. "api_token": in.ApiToken,
  99. "enable": in.Enable,
  100. }
  101. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  102. return err
  103. }
  104. if mgr := runtime.GetManager(); mgr != nil {
  105. mgr.InvalidateNode(id)
  106. }
  107. return nil
  108. }
  109. func (s *NodeService) Delete(id int) error {
  110. db := database.GetDB()
  111. if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
  112. return err
  113. }
  114. if mgr := runtime.GetManager(); mgr != nil {
  115. mgr.InvalidateNode(id)
  116. }
  117. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  118. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  119. return nil
  120. }
  121. func (s *NodeService) SetEnable(id int, enable bool) error {
  122. db := database.GetDB()
  123. return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error
  124. }
  125. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  126. db := database.GetDB()
  127. updates := map[string]any{
  128. "status": p.Status,
  129. "last_heartbeat": p.LastHeartbeat,
  130. "latency_ms": p.LatencyMs,
  131. "xray_version": p.XrayVersion,
  132. "cpu_pct": p.CpuPct,
  133. "mem_pct": p.MemPct,
  134. "uptime_secs": p.UptimeSecs,
  135. "last_error": p.LastError,
  136. }
  137. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  138. return err
  139. }
  140. if p.Status == "online" {
  141. now := time.Unix(p.LastHeartbeat, 0)
  142. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  143. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  144. }
  145. return nil
  146. }
  147. func nodeMetricKey(id int, metric string) string {
  148. return "node:" + strconv.Itoa(id) + ":" + metric
  149. }
  150. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  151. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  152. }
  153. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  154. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  155. url := fmt.Sprintf("%s://%s:%d%spanel/api/server/status",
  156. n.Scheme, n.Address, n.Port, n.BasePath)
  157. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  158. if err != nil {
  159. patch.LastError = err.Error()
  160. return patch, err
  161. }
  162. if n.ApiToken != "" {
  163. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  164. }
  165. req.Header.Set("Accept", "application/json")
  166. start := time.Now()
  167. resp, err := nodeHTTPClient.Do(req)
  168. if err != nil {
  169. patch.LastError = err.Error()
  170. return patch, err
  171. }
  172. defer resp.Body.Close()
  173. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  174. if resp.StatusCode != http.StatusOK {
  175. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  176. return patch, errors.New(patch.LastError)
  177. }
  178. var envelope struct {
  179. Success bool `json:"success"`
  180. Msg string `json:"msg"`
  181. Obj *struct {
  182. CpuPct float64 `json:"cpu"`
  183. Mem struct {
  184. Current uint64 `json:"current"`
  185. Total uint64 `json:"total"`
  186. } `json:"mem"`
  187. Xray struct {
  188. Version string `json:"version"`
  189. } `json:"xray"`
  190. Uptime uint64 `json:"uptime"`
  191. } `json:"obj"`
  192. }
  193. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  194. patch.LastError = "decode response: " + err.Error()
  195. return patch, err
  196. }
  197. if !envelope.Success || envelope.Obj == nil {
  198. patch.LastError = "remote returned success=false: " + envelope.Msg
  199. return patch, errors.New(patch.LastError)
  200. }
  201. o := envelope.Obj
  202. patch.CpuPct = o.CpuPct
  203. if o.Mem.Total > 0 {
  204. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  205. }
  206. patch.XrayVersion = o.Xray.Version
  207. patch.UptimeSecs = o.Uptime
  208. return patch, nil
  209. }
  210. type ProbeResultUI struct {
  211. Status string `json:"status"`
  212. LatencyMs int `json:"latencyMs"`
  213. XrayVersion string `json:"xrayVersion"`
  214. CpuPct float64 `json:"cpuPct"`
  215. MemPct float64 `json:"memPct"`
  216. UptimeSecs uint64 `json:"uptimeSecs"`
  217. Error string `json:"error"`
  218. }
  219. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  220. r := ProbeResultUI{
  221. LatencyMs: p.LatencyMs,
  222. XrayVersion: p.XrayVersion,
  223. CpuPct: p.CpuPct,
  224. MemPct: p.MemPct,
  225. UptimeSecs: p.UptimeSecs,
  226. Error: p.LastError,
  227. }
  228. if ok {
  229. r.Status = "online"
  230. } else {
  231. r.Status = "offline"
  232. }
  233. return r
  234. }