1
0

node_tree.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package service
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/mhsanaei/3x-ui/v3/internal/database"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  7. "github.com/mhsanaei/3x-ui/v3/internal/web/runtime"
  8. )
  9. // LocalDescendants returns this panel's read-only summaries of the nodes it
  10. // directly manages, so a parent panel can surface them as transitive sub-nodes
  11. // (#4983). Only nodes with a known GUID are included — a stable identity is
  12. // required to attribute them one hop up. Not recursive: each panel reports its
  13. // own direct nodes, and a master walks one level via each direct node's
  14. // endpoint, which covers the Node1 -> Node2 -> Node3 case.
  15. func (s *NodeService) LocalDescendants() ([]model.NodeSummary, error) {
  16. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  17. db := database.GetDB()
  18. var nodes []*model.Node
  19. if err := db.Model(model.Node{}).Order("id asc").Find(&nodes).Error; err != nil {
  20. return nil, err
  21. }
  22. out := make([]model.NodeSummary, 0, len(nodes))
  23. for _, n := range nodes {
  24. if n.Guid == "" {
  25. continue
  26. }
  27. out = append(out, model.NodeSummary{
  28. Guid: n.Guid,
  29. ParentGuid: selfGuid,
  30. Name: n.Name,
  31. Address: n.Address,
  32. Scheme: n.Scheme,
  33. Port: n.Port,
  34. Status: n.Status,
  35. LastHeartbeat: n.LastHeartbeat,
  36. LatencyMs: n.LatencyMs,
  37. PanelVersion: n.PanelVersion,
  38. XrayVersion: n.XrayVersion,
  39. XrayState: n.XrayState,
  40. XrayError: n.XrayError,
  41. })
  42. }
  43. return out, nil
  44. }
  45. var (
  46. nodeDescendantsMu sync.RWMutex
  47. nodeDescendantsCache = map[int][]model.NodeSummary{}
  48. )
  49. // RefreshDescendants pulls a direct node's published sub-node summaries and
  50. // caches them keyed by node id. Best-effort: a fetch error keeps the last good
  51. // set (the node may be briefly unreachable). Called from the heartbeat job.
  52. func (s *NodeService) RefreshDescendants(ctx context.Context, n *model.Node) {
  53. if n == nil {
  54. return
  55. }
  56. mgr := runtime.GetManager()
  57. if mgr == nil {
  58. return
  59. }
  60. rt, err := mgr.RemoteFor(n)
  61. if err != nil {
  62. return
  63. }
  64. summaries, err := rt.GetDescendants(ctx)
  65. if err != nil {
  66. return
  67. }
  68. nodeDescendantsMu.Lock()
  69. if len(summaries) == 0 {
  70. delete(nodeDescendantsCache, n.Id)
  71. } else {
  72. nodeDescendantsCache[n.Id] = summaries
  73. }
  74. nodeDescendantsMu.Unlock()
  75. }
  76. // ClearDescendants drops a node's cached sub-node summaries (its probe failed).
  77. func (s *NodeService) ClearDescendants(nodeID int) {
  78. nodeDescendantsMu.Lock()
  79. delete(nodeDescendantsCache, nodeID)
  80. nodeDescendantsMu.Unlock()
  81. }
  82. func cachedDescendants() []model.NodeSummary {
  83. nodeDescendantsMu.RLock()
  84. defer nodeDescendantsMu.RUnlock()
  85. out := make([]model.NodeSummary, 0)
  86. for _, list := range nodeDescendantsCache {
  87. out = append(out, list...)
  88. }
  89. return out
  90. }
  91. // GetNodeTree returns the direct nodes plus any transitive sub-nodes learned
  92. // from them, with per-GUID counts so each node shows only the inbounds/online
  93. // it physically hosts (#4983). Direct nodes carry the master's own GUID as
  94. // ParentGuid; a transitive node carries its parent node's GUID. Transitive
  95. // nodes are read-only projections (Id == 0). Used by the Nodes page and the
  96. // heartbeat broadcast — never for probing/syncing, which stay on GetAll.
  97. func (s *NodeService) GetNodeTree() ([]*model.Node, error) {
  98. nodes, err := s.GetAll()
  99. if err != nil {
  100. return nodes, err
  101. }
  102. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  103. directGuids := make(map[string]struct{}, len(nodes))
  104. for _, n := range nodes {
  105. n.ParentGuid = selfGuid
  106. if n.Guid != "" {
  107. directGuids[n.Guid] = struct{}{}
  108. }
  109. }
  110. seen := make(map[string]struct{})
  111. var transitive []*model.Node
  112. for _, sum := range cachedDescendants() {
  113. if sum.Guid == "" {
  114. continue
  115. }
  116. if _, ok := directGuids[sum.Guid]; ok {
  117. continue // already shown as a direct node
  118. }
  119. if _, ok := seen[sum.Guid]; ok {
  120. continue
  121. }
  122. seen[sum.Guid] = struct{}{}
  123. transitive = append(transitive, &model.Node{
  124. Guid: sum.Guid,
  125. ParentGuid: sum.ParentGuid,
  126. Name: sum.Name,
  127. Address: sum.Address,
  128. Scheme: sum.Scheme,
  129. Port: sum.Port,
  130. Status: sum.Status,
  131. LastHeartbeat: sum.LastHeartbeat,
  132. LatencyMs: sum.LatencyMs,
  133. PanelVersion: sum.PanelVersion,
  134. XrayVersion: sum.XrayVersion,
  135. XrayState: sum.XrayState,
  136. XrayError: sum.XrayError,
  137. Transitive: true,
  138. })
  139. }
  140. if len(transitive) == 0 {
  141. return nodes, nil
  142. }
  143. all := make([]*model.Node, 0, len(nodes)+len(transitive))
  144. all = append(all, nodes...)
  145. all = append(all, transitive...)
  146. s.recountByGuid(all, selfGuid)
  147. return all, nil
  148. }
  149. // recountByGuid recomputes InboundCount/OnlineCount/DepletedCount for every node
  150. // in the tree, keyed by the GUID that physically hosts each inbound, so a direct
  151. // node shows only its own inbounds and each transitive node shows its own
  152. // (#4983). In a flat topology the per-GUID and per-node-id counts coincide, so
  153. // this only changes behaviour once a transitive node exists.
  154. func (s *NodeService) recountByGuid(nodes []*model.Node, selfGuid string) {
  155. db := database.GetDB()
  156. type ibRow struct {
  157. Id int
  158. NodeID *int `gorm:"column:node_id"`
  159. OriginNodeGuid string `gorm:"column:origin_node_guid"`
  160. }
  161. var ibRows []ibRow
  162. if err := db.Table("inbounds").Select("id, node_id, origin_node_guid").Scan(&ibRows).Error; err != nil {
  163. return
  164. }
  165. ambiguous := ambiguousNodeGuids(nodes, selfGuid)
  166. effByInbound := make(map[int]string, len(ibRows))
  167. inboundCountByGuid := make(map[string]int)
  168. for _, r := range ibRows {
  169. guid := r.OriginNodeGuid
  170. if guid == "" {
  171. if r.NodeID != nil {
  172. guid = synthNodeGuid(*r.NodeID)
  173. } else {
  174. guid = selfGuid
  175. }
  176. } else if r.NodeID != nil {
  177. // Origin still holds an ambiguous GUID (cloned server / master-shared,
  178. // not yet re-attributed): bucket under the hosting node's unique id so
  179. // the clones don't merge.
  180. if _, bad := ambiguous[guid]; bad {
  181. guid = synthNodeGuid(*r.NodeID)
  182. }
  183. }
  184. effByInbound[r.Id] = guid
  185. inboundCountByGuid[guid]++
  186. }
  187. // Classify by EMAIL (not the stale client_traffics.inbound_id) and bucket
  188. // each client under its inbound's effective attribution GUID, deduping a
  189. // client attached to several inbounds under the same GUID.
  190. depletedByGuid := make(map[string]int)
  191. disabledByGuid := make(map[string]int)
  192. activeByGuid := make(map[string]int)
  193. if statuses, err := s.nodeClientStatuses(); err == nil {
  194. seen := make(map[string]map[int]struct{})
  195. for _, st := range statuses {
  196. guid, ok := effByInbound[st.InboundID]
  197. if !ok {
  198. continue
  199. }
  200. clientsSeen := seen[guid]
  201. if clientsSeen == nil {
  202. clientsSeen = make(map[int]struct{})
  203. seen[guid] = clientsSeen
  204. }
  205. if _, dup := clientsSeen[st.ClientID]; dup {
  206. continue
  207. }
  208. clientsSeen[st.ClientID] = struct{}{}
  209. switch {
  210. case st.Depleted:
  211. depletedByGuid[guid]++
  212. case st.Disabled:
  213. disabledByGuid[guid]++
  214. default:
  215. activeByGuid[guid]++
  216. }
  217. }
  218. }
  219. onlineByGuid := s.onlineEmailsByGuid()
  220. for _, n := range nodes {
  221. guid := effectiveNodeGuid(n, ambiguous)
  222. n.InboundCount = inboundCountByGuid[guid]
  223. n.OnlineCount = len(onlineByGuid[guid])
  224. n.DepletedCount = depletedByGuid[guid]
  225. n.DisabledCount = disabledByGuid[guid]
  226. n.ActiveCount = activeByGuid[guid]
  227. }
  228. }