inbound_node_ips.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package service
  2. import (
  3. "encoding/json"
  4. "sort"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "gorm.io/gorm/clause"
  9. )
  10. // node_client_ips.go implements per-node client-IP attribution. The flat
  11. // inbound_client_ips table is a cluster-wide union (used for IP-limit counting
  12. // and pushed back to every node), so it cannot tell which node a given IP is
  13. // on. NodeClientIp keeps that attribution: each panel records its own Xray
  14. // observations under its panelGuid, and the master merges every node's
  15. // guid-keyed report — never mixing in IPs a parent pushed down.
  16. // mergeModelClientIpEntries unions old and incoming observations, drops anything
  17. // older than cutoff, keeps the newest timestamp per IP, and sorts newest-first.
  18. // It mirrors mergeClientIpEntries but operates on the exported wire type.
  19. func mergeModelClientIpEntries(old, incoming []model.ClientIpEntry, cutoff int64) []model.ClientIpEntry {
  20. ipMap := make(map[string]int64, len(old)+len(incoming))
  21. for _, e := range old {
  22. if e.IP == "" || e.Timestamp < cutoff {
  23. continue
  24. }
  25. ipMap[e.IP] = e.Timestamp
  26. }
  27. for _, e := range incoming {
  28. if e.IP == "" || e.Timestamp < cutoff {
  29. continue
  30. }
  31. if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
  32. ipMap[e.IP] = e.Timestamp
  33. }
  34. }
  35. out := make([]model.ClientIpEntry, 0, len(ipMap))
  36. for ip, ts := range ipMap {
  37. out = append(out, model.ClientIpEntry{IP: ip, Timestamp: ts})
  38. }
  39. sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
  40. return out
  41. }
  42. // upsertNodeClientIps folds a guid's per-email observations into NodeClientIp,
  43. // merging with whatever is already stored for that (guid, email) and dropping
  44. // stale entries. Empty merged results delete the row so the table stays bounded.
  45. func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry) error {
  46. if guid == "" || len(perEmail) == 0 {
  47. return nil
  48. }
  49. db := database.GetDB()
  50. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  51. var existing []model.NodeClientIp
  52. if err := db.Where("node_guid = ?", guid).Find(&existing).Error; err != nil {
  53. return err
  54. }
  55. existingByEmail := make(map[string]*model.NodeClientIp, len(existing))
  56. for i := range existing {
  57. existingByEmail[existing[i].Email] = &existing[i]
  58. }
  59. tx := db.Begin()
  60. defer func() {
  61. if r := recover(); r != nil {
  62. tx.Rollback()
  63. }
  64. }()
  65. for email, incoming := range perEmail {
  66. if email == "" {
  67. continue
  68. }
  69. var old []model.ClientIpEntry
  70. if cur, ok := existingByEmail[email]; ok && cur.Ips != "" {
  71. _ = json.Unmarshal([]byte(cur.Ips), &old)
  72. }
  73. merged := mergeModelClientIpEntries(old, incoming, cutoff)
  74. if len(merged) == 0 {
  75. // Nothing fresh: drop any stale row so attribution doesn't linger.
  76. if _, ok := existingByEmail[email]; ok {
  77. if err := tx.Where("node_guid = ? AND email = ?", guid, email).
  78. Delete(&model.NodeClientIp{}).Error; err != nil {
  79. tx.Rollback()
  80. return err
  81. }
  82. }
  83. continue
  84. }
  85. b, _ := json.Marshal(merged)
  86. row := model.NodeClientIp{NodeGuid: guid, Email: email, Ips: string(b)}
  87. if err := tx.Clauses(clause.OnConflict{
  88. Columns: []clause.Column{{Name: "node_guid"}, {Name: "email"}},
  89. DoUpdates: clause.AssignmentColumns([]string{"ips"}),
  90. }).Create(&row).Error; err != nil {
  91. tx.Rollback()
  92. return err
  93. }
  94. }
  95. return tx.Commit().Error
  96. }
  97. // RecordLocalClientIps stores this panel's own Xray observations under its
  98. // panelGuid. Called by check_client_ip_job each scan with the live per-email IPs
  99. // the local core reported.
  100. func (s *InboundService) RecordLocalClientIps(panelGuid string, observed map[string][]model.ClientIpEntry) error {
  101. return upsertNodeClientIps(panelGuid, observed)
  102. }
  103. // MergeClientIpsByGuid folds a node's guid-keyed attribution report (its own
  104. // panelGuid subtree plus any descendants) into the local table, preserving which
  105. // physical node each IP is on across a chain. When node is non-nil and its own
  106. // panelGuid is ambiguous (shared with another node or the master — a cloned
  107. // server), the node's own subtree is remapped to its node-unique key so two
  108. // clones don't collapse into one attribution row; descendant subtrees keep their
  109. // distinct GUIDs. A nil node merges the report verbatim.
  110. func (s *InboundService) MergeClientIpsByGuid(node *model.Node, trees map[string]map[string][]model.ClientIpEntry) error {
  111. if node != nil && node.Guid != "" {
  112. if eff := effectiveNodeKey(node); eff != node.Guid {
  113. if sub, ok := trees[node.Guid]; ok {
  114. delete(trees, node.Guid)
  115. if existing, ok := trees[eff]; ok {
  116. for email, ips := range sub {
  117. existing[email] = append(existing[email], ips...)
  118. }
  119. } else {
  120. trees[eff] = sub
  121. }
  122. }
  123. }
  124. }
  125. for guid, perEmail := range trees {
  126. if err := upsertNodeClientIps(guid, perEmail); err != nil {
  127. return err
  128. }
  129. }
  130. return nil
  131. }
  132. // GetClientIpsByGuid returns this panel's full attribution subtree (guid -> email
  133. // -> fresh IPs), dropping stale entries. It is what the clientIpsByGuid endpoint
  134. // serves to a parent panel.
  135. func (s *InboundService) GetClientIpsByGuid() (map[string]map[string][]model.ClientIpEntry, error) {
  136. db := database.GetDB()
  137. var rows []model.NodeClientIp
  138. if err := db.Find(&rows).Error; err != nil {
  139. return nil, err
  140. }
  141. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  142. out := make(map[string]map[string][]model.ClientIpEntry)
  143. for _, row := range rows {
  144. if row.NodeGuid == "" || row.Email == "" || row.Ips == "" {
  145. continue
  146. }
  147. var entries []model.ClientIpEntry
  148. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  149. continue
  150. }
  151. fresh := mergeModelClientIpEntries(nil, entries, cutoff)
  152. if len(fresh) == 0 {
  153. continue
  154. }
  155. if out[row.NodeGuid] == nil {
  156. out[row.NodeGuid] = make(map[string][]model.ClientIpEntry)
  157. }
  158. out[row.NodeGuid][row.Email] = fresh
  159. }
  160. return out, nil
  161. }
  162. // GetClientIpNodeAttribution returns, for one client email, a map of IP -> the
  163. // guid that most recently observed it (within the stale window). Used to label
  164. // each IP in the panel with the node it is connecting to.
  165. func (s *InboundService) GetClientIpNodeAttribution(email string) (map[string]string, error) {
  166. db := database.GetDB()
  167. var rows []model.NodeClientIp
  168. if err := db.Where("email = ?", email).Find(&rows).Error; err != nil {
  169. return nil, err
  170. }
  171. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  172. ipGuid := make(map[string]string)
  173. ipTs := make(map[string]int64)
  174. for _, row := range rows {
  175. if row.NodeGuid == "" || row.Ips == "" {
  176. continue
  177. }
  178. var entries []model.ClientIpEntry
  179. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  180. continue
  181. }
  182. for _, e := range entries {
  183. if e.IP == "" || e.Timestamp < cutoff {
  184. continue
  185. }
  186. if cur, ok := ipTs[e.IP]; !ok || e.Timestamp > cur {
  187. ipTs[e.IP] = e.Timestamp
  188. ipGuid[e.IP] = row.NodeGuid
  189. }
  190. }
  191. }
  192. return ipGuid, nil
  193. }
  194. // ClientIpInfo is one IP shown in the panel's per-client IP log, labelled with
  195. // the node it is connecting through ("" = this local panel).
  196. type ClientIpInfo struct {
  197. IP string `json:"ip"`
  198. Time string `json:"time"`
  199. Node string `json:"node"`
  200. }
  201. // GetClientIpsWithNodes returns a client's recorded IPs (from the flat
  202. // inbound_client_ips display set) annotated with the node each IP is on, using
  203. // the per-node attribution table. Local IPs (and any IP without attribution)
  204. // carry an empty Node.
  205. func (s *InboundService) GetClientIpsWithNodes(email string) ([]ClientIpInfo, error) {
  206. raw, err := s.GetInboundClientIps(email)
  207. if err != nil || raw == "" {
  208. // Record-not-found (or empty) is "no IPs", not an error for the UI.
  209. return []ClientIpInfo{}, nil
  210. }
  211. var entries []model.ClientIpEntry
  212. if jerr := json.Unmarshal([]byte(raw), &entries); jerr != nil || len(entries) == 0 {
  213. // Legacy shape: a plain JSON array of IP strings.
  214. var oldIps []string
  215. if json.Unmarshal([]byte(raw), &oldIps) == nil {
  216. entries = entries[:0]
  217. for _, ip := range oldIps {
  218. entries = append(entries, model.ClientIpEntry{IP: ip})
  219. }
  220. }
  221. }
  222. if len(entries) == 0 {
  223. return []ClientIpInfo{}, nil
  224. }
  225. attr, _ := s.GetClientIpNodeAttribution(email)
  226. guidName := s.nodeGuidNameMap()
  227. localGuid, _ := (&SettingService{}).GetPanelGuid()
  228. out := make([]ClientIpInfo, 0, len(entries))
  229. for _, e := range entries {
  230. if e.IP == "" {
  231. continue
  232. }
  233. info := ClientIpInfo{IP: e.IP}
  234. if e.Timestamp > 0 {
  235. info.Time = time.Unix(e.Timestamp, 0).Local().Format("2006-01-02 15:04:05")
  236. }
  237. if guid, ok := attr[e.IP]; ok && guid != "" && guid != localGuid {
  238. info.Node = guidName[guid]
  239. }
  240. out = append(out, info)
  241. }
  242. return out, nil
  243. }
  244. // nodeGuidNameMap maps each known node's attribution key to its display name,
  245. // keyed by effectiveNodeGuid so a cloned node's IPs (stored under its node-unique
  246. // key) still resolve to the right name instead of colliding under a shared GUID.
  247. func (s *InboundService) nodeGuidNameMap() map[string]string {
  248. db := database.GetDB()
  249. var nodes []model.Node
  250. if err := db.Model(&model.Node{}).Find(&nodes).Error; err != nil {
  251. return map[string]string{}
  252. }
  253. ptrs := make([]*model.Node, len(nodes))
  254. for i := range nodes {
  255. ptrs[i] = &nodes[i]
  256. }
  257. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  258. ambiguous := ambiguousNodeGuids(ptrs, selfGuid)
  259. m := make(map[string]string, len(nodes))
  260. for i := range nodes {
  261. m[effectiveNodeGuid(&nodes[i], ambiguous)] = nodes[i].Name
  262. }
  263. return m
  264. }
  265. // DeleteNodeClientIpsByGuid removes all attribution rows for a guid (e.g. when a
  266. // node is deleted) so its IPs stop being reported and counted.
  267. func (s *InboundService) DeleteNodeClientIpsByGuid(guid string) error {
  268. if guid == "" {
  269. return nil
  270. }
  271. db := database.GetDB()
  272. return db.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error
  273. }