node.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/mhsanaei/3x-ui/v3/database"
  14. "github.com/mhsanaei/3x-ui/v3/database/model"
  15. "github.com/mhsanaei/3x-ui/v3/util/common"
  16. "github.com/mhsanaei/3x-ui/v3/util/netsafe"
  17. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  18. )
  19. type HeartbeatPatch struct {
  20. Status string
  21. LastHeartbeat int64
  22. LatencyMs int
  23. XrayVersion string
  24. CpuPct float64
  25. MemPct float64
  26. UptimeSecs uint64
  27. LastError string
  28. }
  29. type NodeService struct{}
  30. var nodeHTTPClient = &http.Client{
  31. Transport: &http.Transport{
  32. MaxIdleConns: 64,
  33. MaxIdleConnsPerHost: 4,
  34. IdleConnTimeout: 60 * time.Second,
  35. DialContext: netsafe.SSRFGuardedDialContext,
  36. },
  37. }
  38. func (s *NodeService) GetAll() ([]*model.Node, error) {
  39. db := database.GetDB()
  40. var nodes []*model.Node
  41. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  42. return nodes, err
  43. }
  44. func (s *NodeService) GetById(id int) (*model.Node, error) {
  45. db := database.GetDB()
  46. n := &model.Node{}
  47. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  48. return nil, err
  49. }
  50. return n, nil
  51. }
  52. func normalizeBasePath(p string) string {
  53. p = strings.TrimSpace(p)
  54. if p == "" {
  55. return "/"
  56. }
  57. if !strings.HasPrefix(p, "/") {
  58. p = "/" + p
  59. }
  60. if !strings.HasSuffix(p, "/") {
  61. p = p + "/"
  62. }
  63. return p
  64. }
  65. func (s *NodeService) normalize(n *model.Node) error {
  66. n.Name = strings.TrimSpace(n.Name)
  67. n.ApiToken = strings.TrimSpace(n.ApiToken)
  68. if n.Name == "" {
  69. return common.NewError("node name is required")
  70. }
  71. addr, err := netsafe.NormalizeHost(n.Address)
  72. if err != nil {
  73. return common.NewError(err.Error())
  74. }
  75. n.Address = addr
  76. if n.Port <= 0 || n.Port > 65535 {
  77. return common.NewError("node port must be 1-65535")
  78. }
  79. if n.Scheme != "http" && n.Scheme != "https" {
  80. n.Scheme = "https"
  81. }
  82. n.BasePath = normalizeBasePath(n.BasePath)
  83. return nil
  84. }
  85. func (s *NodeService) Create(n *model.Node) error {
  86. if err := s.normalize(n); err != nil {
  87. return err
  88. }
  89. db := database.GetDB()
  90. return db.Create(n).Error
  91. }
  92. func (s *NodeService) Update(id int, in *model.Node) error {
  93. if err := s.normalize(in); err != nil {
  94. return err
  95. }
  96. db := database.GetDB()
  97. existing := &model.Node{}
  98. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  99. return err
  100. }
  101. updates := map[string]any{
  102. "name": in.Name,
  103. "remark": in.Remark,
  104. "scheme": in.Scheme,
  105. "address": in.Address,
  106. "port": in.Port,
  107. "base_path": in.BasePath,
  108. "api_token": in.ApiToken,
  109. "enable": in.Enable,
  110. "allow_private_address": in.AllowPrivateAddress,
  111. }
  112. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  113. return err
  114. }
  115. if mgr := runtime.GetManager(); mgr != nil {
  116. mgr.InvalidateNode(id)
  117. }
  118. return nil
  119. }
  120. func (s *NodeService) Delete(id int) error {
  121. db := database.GetDB()
  122. if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
  123. return err
  124. }
  125. if mgr := runtime.GetManager(); mgr != nil {
  126. mgr.InvalidateNode(id)
  127. }
  128. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  129. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  130. return nil
  131. }
  132. func (s *NodeService) SetEnable(id int, enable bool) error {
  133. db := database.GetDB()
  134. return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error
  135. }
  136. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  137. db := database.GetDB()
  138. updates := map[string]any{
  139. "status": p.Status,
  140. "last_heartbeat": p.LastHeartbeat,
  141. "latency_ms": p.LatencyMs,
  142. "xray_version": p.XrayVersion,
  143. "cpu_pct": p.CpuPct,
  144. "mem_pct": p.MemPct,
  145. "uptime_secs": p.UptimeSecs,
  146. "last_error": p.LastError,
  147. }
  148. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  149. return err
  150. }
  151. if p.Status == "online" {
  152. now := time.Unix(p.LastHeartbeat, 0)
  153. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  154. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  155. }
  156. return nil
  157. }
  158. func nodeMetricKey(id int, metric string) string {
  159. return "node:" + strconv.Itoa(id) + ":" + metric
  160. }
  161. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  162. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  163. }
  164. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  165. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  166. addr, err := netsafe.NormalizeHost(n.Address)
  167. if err != nil {
  168. patch.LastError = err.Error()
  169. return patch, err
  170. }
  171. scheme := n.Scheme
  172. if scheme != "http" && scheme != "https" {
  173. scheme = "https"
  174. }
  175. if n.Port <= 0 || n.Port > 65535 {
  176. patch.LastError = "node port must be 1-65535"
  177. return patch, errors.New(patch.LastError)
  178. }
  179. probeURL := &url.URL{
  180. Scheme: scheme,
  181. Host: net.JoinHostPort(addr, strconv.Itoa(n.Port)),
  182. Path: normalizeBasePath(n.BasePath) + "panel/api/server/status",
  183. }
  184. req, err := http.NewRequestWithContext(
  185. netsafe.ContextWithAllowPrivate(ctx, n.AllowPrivateAddress),
  186. http.MethodGet, probeURL.String(), nil)
  187. if err != nil {
  188. patch.LastError = err.Error()
  189. return patch, err
  190. }
  191. if n.ApiToken != "" {
  192. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  193. }
  194. req.Header.Set("Accept", "application/json")
  195. start := time.Now()
  196. resp, err := nodeHTTPClient.Do(req)
  197. if err != nil {
  198. patch.LastError = err.Error()
  199. return patch, err
  200. }
  201. defer resp.Body.Close()
  202. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  203. if resp.StatusCode != http.StatusOK {
  204. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  205. return patch, errors.New(patch.LastError)
  206. }
  207. var envelope struct {
  208. Success bool `json:"success"`
  209. Msg string `json:"msg"`
  210. Obj *struct {
  211. CpuPct float64 `json:"cpu"`
  212. Mem struct {
  213. Current uint64 `json:"current"`
  214. Total uint64 `json:"total"`
  215. } `json:"mem"`
  216. Xray struct {
  217. Version string `json:"version"`
  218. } `json:"xray"`
  219. Uptime uint64 `json:"uptime"`
  220. } `json:"obj"`
  221. }
  222. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  223. patch.LastError = "decode response: " + err.Error()
  224. return patch, err
  225. }
  226. if !envelope.Success || envelope.Obj == nil {
  227. patch.LastError = "remote returned success=false: " + envelope.Msg
  228. return patch, errors.New(patch.LastError)
  229. }
  230. o := envelope.Obj
  231. patch.CpuPct = o.CpuPct
  232. if o.Mem.Total > 0 {
  233. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  234. }
  235. patch.XrayVersion = o.Xray.Version
  236. patch.UptimeSecs = o.Uptime
  237. return patch, nil
  238. }
  239. type ProbeResultUI struct {
  240. Status string `json:"status"`
  241. LatencyMs int `json:"latencyMs"`
  242. XrayVersion string `json:"xrayVersion"`
  243. CpuPct float64 `json:"cpuPct"`
  244. MemPct float64 `json:"memPct"`
  245. UptimeSecs uint64 `json:"uptimeSecs"`
  246. Error string `json:"error"`
  247. }
  248. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  249. r := ProbeResultUI{
  250. LatencyMs: p.LatencyMs,
  251. XrayVersion: p.XrayVersion,
  252. CpuPct: p.CpuPct,
  253. MemPct: p.MemPct,
  254. UptimeSecs: p.UptimeSecs,
  255. Error: p.LastError,
  256. }
  257. if ok {
  258. r.Status = "online"
  259. } else {
  260. r.Status = "offline"
  261. }
  262. return r
  263. }