1
0

node.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/mhsanaei/3x-ui/v3/database"
  12. "github.com/mhsanaei/3x-ui/v3/database/model"
  13. "github.com/mhsanaei/3x-ui/v3/util/common"
  14. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  15. )
  16. // HeartbeatPatch is the slice of fields a single Probe() result writes
  17. // back to a Node row. We pass it as a struct (not a *model.Node) so the
  18. // heartbeat path can't accidentally clobber configuration columns the
  19. // user just edited.
  20. type HeartbeatPatch struct {
  21. Status string
  22. LastHeartbeat int64
  23. LatencyMs int
  24. XrayVersion string
  25. CpuPct float64
  26. MemPct float64
  27. UptimeSecs uint64
  28. LastError string
  29. }
  30. // NodeService manages remote 3x-ui nodes registered with this panel.
  31. // It owns CRUD for the Node model and the HTTP probe used by both the
  32. // heartbeat job and the on-demand "test connection" UI action.
  33. type NodeService struct{}
  34. // httpClient is shared so repeated probes reuse TCP/TLS connections.
  35. // Timeout is per-request, set on each Do() via context.
  36. var nodeHTTPClient = &http.Client{
  37. Transport: &http.Transport{
  38. MaxIdleConns: 64,
  39. MaxIdleConnsPerHost: 4,
  40. IdleConnTimeout: 60 * time.Second,
  41. },
  42. }
  43. func (s *NodeService) GetAll() ([]*model.Node, error) {
  44. db := database.GetDB()
  45. var nodes []*model.Node
  46. err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error
  47. return nodes, err
  48. }
  49. func (s *NodeService) GetById(id int) (*model.Node, error) {
  50. db := database.GetDB()
  51. n := &model.Node{}
  52. if err := db.Model(model.Node{}).Where("id = ?", id).First(n).Error; err != nil {
  53. return nil, err
  54. }
  55. return n, nil
  56. }
  57. // normalize fills in defaults and trims accidental whitespace before save.
  58. // Pulled out so Create and Update share the same rules.
  59. func (s *NodeService) normalize(n *model.Node) error {
  60. n.Name = strings.TrimSpace(n.Name)
  61. n.Address = strings.TrimSpace(n.Address)
  62. n.ApiToken = strings.TrimSpace(n.ApiToken)
  63. if n.Name == "" {
  64. return common.NewError("node name is required")
  65. }
  66. if n.Address == "" {
  67. return common.NewError("node address is required")
  68. }
  69. if n.Port <= 0 || n.Port > 65535 {
  70. return common.NewError("node port must be 1-65535")
  71. }
  72. if n.Scheme != "http" && n.Scheme != "https" {
  73. n.Scheme = "https"
  74. }
  75. if n.BasePath == "" {
  76. n.BasePath = "/"
  77. }
  78. if !strings.HasPrefix(n.BasePath, "/") {
  79. n.BasePath = "/" + n.BasePath
  80. }
  81. if !strings.HasSuffix(n.BasePath, "/") {
  82. n.BasePath = n.BasePath + "/"
  83. }
  84. return nil
  85. }
  86. func (s *NodeService) Create(n *model.Node) error {
  87. if err := s.normalize(n); err != nil {
  88. return err
  89. }
  90. db := database.GetDB()
  91. return db.Create(n).Error
  92. }
  93. func (s *NodeService) Update(id int, in *model.Node) error {
  94. if err := s.normalize(in); err != nil {
  95. return err
  96. }
  97. db := database.GetDB()
  98. existing := &model.Node{}
  99. if err := db.Where("id = ?", id).First(existing).Error; err != nil {
  100. return err
  101. }
  102. // Only persist user-controlled columns. Heartbeat fields stay where
  103. // the heartbeat job last wrote them so a no-op edit doesn't blank
  104. // the dashboard out for ten seconds.
  105. updates := map[string]any{
  106. "name": in.Name,
  107. "remark": in.Remark,
  108. "scheme": in.Scheme,
  109. "address": in.Address,
  110. "port": in.Port,
  111. "base_path": in.BasePath,
  112. "api_token": in.ApiToken,
  113. "enable": in.Enable,
  114. }
  115. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  116. return err
  117. }
  118. // Drop any cached Remote so the next inbound op picks up the fresh
  119. // address/token. Cheap to do unconditionally — the next miss rebuilds.
  120. if mgr := runtime.GetManager(); mgr != nil {
  121. mgr.InvalidateNode(id)
  122. }
  123. return nil
  124. }
  125. func (s *NodeService) Delete(id int) error {
  126. db := database.GetDB()
  127. if err := db.Where("id = ?", id).Delete(model.Node{}).Error; err != nil {
  128. return err
  129. }
  130. if mgr := runtime.GetManager(); mgr != nil {
  131. mgr.InvalidateNode(id)
  132. }
  133. // Drop in-memory series so a freshly created node with the same id
  134. // doesn't inherit stale points (sqlite reuses ids freely).
  135. nodeMetrics.drop(nodeMetricKey(id, "cpu"))
  136. nodeMetrics.drop(nodeMetricKey(id, "mem"))
  137. return nil
  138. }
  139. func (s *NodeService) SetEnable(id int, enable bool) error {
  140. db := database.GetDB()
  141. return db.Model(model.Node{}).Where("id = ?", id).Update("enable", enable).Error
  142. }
  143. // UpdateHeartbeat persists the slice of fields written by a probe. We
  144. // don't touch updated_at via gorm autoUpdateTime here — that field is
  145. // reserved for user-driven config edits.
  146. func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
  147. db := database.GetDB()
  148. updates := map[string]any{
  149. "status": p.Status,
  150. "last_heartbeat": p.LastHeartbeat,
  151. "latency_ms": p.LatencyMs,
  152. "xray_version": p.XrayVersion,
  153. "cpu_pct": p.CpuPct,
  154. "mem_pct": p.MemPct,
  155. "uptime_secs": p.UptimeSecs,
  156. "last_error": p.LastError,
  157. }
  158. if err := db.Model(model.Node{}).Where("id = ?", id).Updates(updates).Error; err != nil {
  159. return err
  160. }
  161. // Only record online ticks. Offline probes carry zeroed cpu/mem and
  162. // would draw a misleading dip on the chart; the gap on the x-axis is
  163. // the truthful representation of "we couldn't reach the node".
  164. if p.Status == "online" {
  165. now := time.Unix(p.LastHeartbeat, 0)
  166. nodeMetrics.append(nodeMetricKey(id, "cpu"), now, p.CpuPct)
  167. nodeMetrics.append(nodeMetricKey(id, "mem"), now, p.MemPct)
  168. }
  169. return nil
  170. }
  171. // nodeMetricKey is the namespacing used inside the singleton ring buffer
  172. // so per-node metrics don't collide with each other or with the system
  173. // metrics in the sibling singleton.
  174. func nodeMetricKey(id int, metric string) string {
  175. return "node:" + strconv.Itoa(id) + ":" + metric
  176. }
  177. // AggregateNodeMetric returns up to maxPoints averaged buckets for one
  178. // node's metric (currently "cpu" or "mem"). Output shape matches
  179. // AggregateSystemMetric: {"t": unixSec, "v": value}.
  180. func (s *NodeService) AggregateNodeMetric(id int, metric string, bucketSeconds int, maxPoints int) []map[string]any {
  181. return nodeMetrics.aggregate(nodeMetricKey(id, metric), bucketSeconds, maxPoints)
  182. }
  183. // Probe issues a single GET to the node's /panel/api/server/status and
  184. // returns a HeartbeatPatch. On error the patch is zero-valued except
  185. // for LastError; the caller is responsible for setting Status="offline".
  186. //
  187. // The remote endpoint requires authentication: we send the per-node
  188. // ApiToken as a Bearer token, which the remote APIController.checkAPIAuth
  189. // validates. Calls without a token would just get a 404, which masks
  190. // the existence of the API entirely.
  191. func (s *NodeService) Probe(ctx context.Context, n *model.Node) (HeartbeatPatch, error) {
  192. patch := HeartbeatPatch{LastHeartbeat: time.Now().Unix()}
  193. url := fmt.Sprintf("%s://%s:%d%spanel/api/server/status",
  194. n.Scheme, n.Address, n.Port, n.BasePath)
  195. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  196. if err != nil {
  197. patch.LastError = err.Error()
  198. return patch, err
  199. }
  200. if n.ApiToken != "" {
  201. req.Header.Set("Authorization", "Bearer "+n.ApiToken)
  202. }
  203. req.Header.Set("Accept", "application/json")
  204. start := time.Now()
  205. resp, err := nodeHTTPClient.Do(req)
  206. if err != nil {
  207. patch.LastError = err.Error()
  208. return patch, err
  209. }
  210. defer resp.Body.Close()
  211. patch.LatencyMs = int(time.Since(start) / time.Millisecond)
  212. if resp.StatusCode != http.StatusOK {
  213. patch.LastError = fmt.Sprintf("HTTP %d from remote panel", resp.StatusCode)
  214. return patch, errors.New(patch.LastError)
  215. }
  216. // The remote wraps Status in entity.Msg. We decode into a typed
  217. // envelope rather than map[string]any so a schema change on the
  218. // remote shows up as a Go error instead of a silent zero-fill.
  219. var envelope struct {
  220. Success bool `json:"success"`
  221. Msg string `json:"msg"`
  222. Obj *struct {
  223. Cpu uint64 `json:"-"`
  224. // Status fields we care about. Decode CPU/Mem nested
  225. // structs minimally — anything else gets discarded.
  226. CpuPct float64 `json:"cpu"`
  227. Mem struct {
  228. Current uint64 `json:"current"`
  229. Total uint64 `json:"total"`
  230. } `json:"mem"`
  231. Xray struct {
  232. Version string `json:"version"`
  233. } `json:"xray"`
  234. Uptime uint64 `json:"uptime"`
  235. } `json:"obj"`
  236. }
  237. if err := json.NewDecoder(resp.Body).Decode(&envelope); err != nil {
  238. patch.LastError = "decode response: " + err.Error()
  239. return patch, err
  240. }
  241. if !envelope.Success || envelope.Obj == nil {
  242. patch.LastError = "remote returned success=false: " + envelope.Msg
  243. return patch, errors.New(patch.LastError)
  244. }
  245. o := envelope.Obj
  246. patch.CpuPct = o.CpuPct
  247. if o.Mem.Total > 0 {
  248. patch.MemPct = float64(o.Mem.Current) * 100.0 / float64(o.Mem.Total)
  249. }
  250. patch.XrayVersion = o.Xray.Version
  251. patch.UptimeSecs = o.Uptime
  252. return patch, nil
  253. }
  254. // EnvelopeForUI is the shape a frontend test-connection action expects.
  255. // Pulling it out keeps the controller dumb.
  256. type ProbeResultUI struct {
  257. Status string `json:"status"`
  258. LatencyMs int `json:"latencyMs"`
  259. XrayVersion string `json:"xrayVersion"`
  260. CpuPct float64 `json:"cpuPct"`
  261. MemPct float64 `json:"memPct"`
  262. UptimeSecs uint64 `json:"uptimeSecs"`
  263. Error string `json:"error"`
  264. }
  265. func (p HeartbeatPatch) ToUI(ok bool) ProbeResultUI {
  266. r := ProbeResultUI{
  267. LatencyMs: p.LatencyMs,
  268. XrayVersion: p.XrayVersion,
  269. CpuPct: p.CpuPct,
  270. MemPct: p.MemPct,
  271. UptimeSecs: p.UptimeSecs,
  272. Error: p.LastError,
  273. }
  274. if ok {
  275. r.Status = "online"
  276. } else {
  277. r.Status = "offline"
  278. }
  279. return r
  280. }