node_tree.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/database"
  7. "github.com/mhsanaei/3x-ui/v3/database/model"
  8. "github.com/mhsanaei/3x-ui/v3/web/runtime"
  9. )
  10. // LocalDescendants returns this panel's read-only summaries of the nodes it
  11. // directly manages, so a parent panel can surface them as transitive sub-nodes
  12. // (#4983). Only nodes with a known GUID are included — a stable identity is
  13. // required to attribute them one hop up. Not recursive: each panel reports its
  14. // own direct nodes, and a master walks one level via each direct node's
  15. // endpoint, which covers the Node1 -> Node2 -> Node3 case.
  16. func (s *NodeService) LocalDescendants() ([]model.NodeSummary, error) {
  17. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  18. db := database.GetDB()
  19. var nodes []*model.Node
  20. if err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error; err != nil {
  21. return nil, err
  22. }
  23. out := make([]model.NodeSummary, 0, len(nodes))
  24. for _, n := range nodes {
  25. if n.Guid == "" {
  26. continue
  27. }
  28. out = append(out, model.NodeSummary{
  29. Guid: n.Guid,
  30. ParentGuid: selfGuid,
  31. Name: n.Name,
  32. Address: n.Address,
  33. Scheme: n.Scheme,
  34. Port: n.Port,
  35. Status: n.Status,
  36. LastHeartbeat: n.LastHeartbeat,
  37. LatencyMs: n.LatencyMs,
  38. PanelVersion: n.PanelVersion,
  39. XrayVersion: n.XrayVersion,
  40. })
  41. }
  42. return out, nil
  43. }
  44. var (
  45. nodeDescendantsMu sync.RWMutex
  46. nodeDescendantsCache = map[int][]model.NodeSummary{}
  47. )
  48. // RefreshDescendants pulls a direct node's published sub-node summaries and
  49. // caches them keyed by node id. Best-effort: a fetch error keeps the last good
  50. // set (the node may be briefly unreachable). Called from the heartbeat job.
  51. func (s *NodeService) RefreshDescendants(ctx context.Context, n *model.Node) {
  52. if n == nil {
  53. return
  54. }
  55. mgr := runtime.GetManager()
  56. if mgr == nil {
  57. return
  58. }
  59. rt, err := mgr.RemoteFor(n)
  60. if err != nil {
  61. return
  62. }
  63. summaries, err := rt.GetDescendants(ctx)
  64. if err != nil {
  65. return
  66. }
  67. nodeDescendantsMu.Lock()
  68. if len(summaries) == 0 {
  69. delete(nodeDescendantsCache, n.Id)
  70. } else {
  71. nodeDescendantsCache[n.Id] = summaries
  72. }
  73. nodeDescendantsMu.Unlock()
  74. }
  75. // ClearDescendants drops a node's cached sub-node summaries (its probe failed).
  76. func (s *NodeService) ClearDescendants(nodeID int) {
  77. nodeDescendantsMu.Lock()
  78. delete(nodeDescendantsCache, nodeID)
  79. nodeDescendantsMu.Unlock()
  80. }
  81. func cachedDescendants() []model.NodeSummary {
  82. nodeDescendantsMu.RLock()
  83. defer nodeDescendantsMu.RUnlock()
  84. out := make([]model.NodeSummary, 0)
  85. for _, list := range nodeDescendantsCache {
  86. out = append(out, list...)
  87. }
  88. return out
  89. }
  90. // GetNodeTree returns the direct nodes plus any transitive sub-nodes learned
  91. // from them, with per-GUID counts so each node shows only the inbounds/online
  92. // it physically hosts (#4983). Direct nodes carry the master's own GUID as
  93. // ParentGuid; a transitive node carries its parent node's GUID. Transitive
  94. // nodes are read-only projections (Id == 0). Used by the Nodes page and the
  95. // heartbeat broadcast — never for probing/syncing, which stay on GetAll.
  96. func (s *NodeService) GetNodeTree() ([]*model.Node, error) {
  97. nodes, err := s.GetAll()
  98. if err != nil {
  99. return nodes, err
  100. }
  101. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  102. directGuids := make(map[string]struct{}, len(nodes))
  103. for _, n := range nodes {
  104. n.ParentGuid = selfGuid
  105. if n.Guid != "" {
  106. directGuids[n.Guid] = struct{}{}
  107. }
  108. }
  109. seen := make(map[string]struct{})
  110. var transitive []*model.Node
  111. for _, sum := range cachedDescendants() {
  112. if sum.Guid == "" {
  113. continue
  114. }
  115. if _, ok := directGuids[sum.Guid]; ok {
  116. continue // already shown as a direct node
  117. }
  118. if _, ok := seen[sum.Guid]; ok {
  119. continue
  120. }
  121. seen[sum.Guid] = struct{}{}
  122. transitive = append(transitive, &model.Node{
  123. Guid: sum.Guid,
  124. ParentGuid: sum.ParentGuid,
  125. Name: sum.Name,
  126. Address: sum.Address,
  127. Scheme: sum.Scheme,
  128. Port: sum.Port,
  129. Status: sum.Status,
  130. LastHeartbeat: sum.LastHeartbeat,
  131. LatencyMs: sum.LatencyMs,
  132. PanelVersion: sum.PanelVersion,
  133. XrayVersion: sum.XrayVersion,
  134. Transitive: true,
  135. })
  136. }
  137. if len(transitive) == 0 {
  138. return nodes, nil
  139. }
  140. all := make([]*model.Node, 0, len(nodes)+len(transitive))
  141. all = append(all, nodes...)
  142. all = append(all, transitive...)
  143. s.recountByGuid(all, selfGuid)
  144. return all, nil
  145. }
  146. // recountByGuid recomputes InboundCount/OnlineCount/DepletedCount for every node
  147. // in the tree, keyed by the GUID that physically hosts each inbound, so a direct
  148. // node shows only its own inbounds and each transitive node shows its own
  149. // (#4983). In a flat topology the per-GUID and per-node-id counts coincide, so
  150. // this only changes behaviour once a transitive node exists.
  151. func (s *NodeService) recountByGuid(nodes []*model.Node, selfGuid string) {
  152. db := database.GetDB()
  153. type ibRow struct {
  154. Id int
  155. NodeID *int `gorm:"column:node_id"`
  156. OriginNodeGuid string `gorm:"column:origin_node_guid"`
  157. }
  158. var ibRows []ibRow
  159. if err := db.Table("inbounds").Select("id, node_id, origin_node_guid").Scan(&ibRows).Error; err != nil {
  160. return
  161. }
  162. effByInbound := make(map[int]string, len(ibRows))
  163. inboundCountByGuid := make(map[string]int)
  164. ids := make([]int, 0, len(ibRows))
  165. for _, r := range ibRows {
  166. guid := r.OriginNodeGuid
  167. if guid == "" {
  168. if r.NodeID != nil {
  169. guid = synthNodeGuid(*r.NodeID)
  170. } else {
  171. guid = selfGuid
  172. }
  173. }
  174. effByInbound[r.Id] = guid
  175. inboundCountByGuid[guid]++
  176. ids = append(ids, r.Id)
  177. }
  178. now := time.Now().UnixMilli()
  179. depletedByGuid := make(map[string]int)
  180. if len(ids) > 0 {
  181. type tRow struct {
  182. InboundID int `gorm:"column:inbound_id"`
  183. Enable bool
  184. Total int64
  185. Up int64
  186. Down int64
  187. ExpiryTime int64 `gorm:"column:expiry_time"`
  188. }
  189. var tRows []tRow
  190. if err := db.Table("client_traffics").
  191. Select("inbound_id, enable, total, up, down, expiry_time").
  192. Where("inbound_id IN ?", ids).Scan(&tRows).Error; err == nil {
  193. for _, row := range tRows {
  194. guid, ok := effByInbound[row.InboundID]
  195. if !ok {
  196. continue
  197. }
  198. expired := row.ExpiryTime > 0 && row.ExpiryTime <= now
  199. exhausted := row.Total > 0 && row.Up+row.Down >= row.Total
  200. if expired || exhausted || !row.Enable {
  201. depletedByGuid[guid]++
  202. }
  203. }
  204. }
  205. }
  206. onlineByGuid := s.onlineEmailsByGuid()
  207. for _, n := range nodes {
  208. guid := effectiveNodeGuid(n)
  209. n.InboundCount = inboundCountByGuid[guid]
  210. n.OnlineCount = len(onlineByGuid[guid])
  211. n.DepletedCount = depletedByGuid[guid]
  212. }
  213. }