inbound_node_ips.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package service
  2. import (
  3. "encoding/json"
  4. "sort"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "gorm.io/gorm/clause"
  9. )
  10. // node_client_ips.go implements per-node client-IP attribution. The flat
  11. // inbound_client_ips table is a cluster-wide union (used for IP-limit counting
  12. // and pushed back to every node), so it cannot tell which node a given IP is
  13. // on. NodeClientIp keeps that attribution: each panel records its own Xray
  14. // observations under its panelGuid, and the master merges every node's
  15. // guid-keyed report — never mixing in IPs a parent pushed down.
  16. // mergeModelClientIpEntries unions old and incoming observations, drops anything
  17. // older than cutoff, keeps the newest timestamp per IP, and sorts newest-first.
  18. // It mirrors mergeClientIpEntries but operates on the exported wire type.
  19. func mergeModelClientIpEntries(old, incoming []model.ClientIpEntry, cutoff int64) []model.ClientIpEntry {
  20. ipMap := make(map[string]int64, len(old)+len(incoming))
  21. for _, e := range old {
  22. if e.IP == "" || e.Timestamp < cutoff {
  23. continue
  24. }
  25. ipMap[e.IP] = e.Timestamp
  26. }
  27. for _, e := range incoming {
  28. if e.IP == "" || e.Timestamp < cutoff {
  29. continue
  30. }
  31. if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
  32. ipMap[e.IP] = e.Timestamp
  33. }
  34. }
  35. out := make([]model.ClientIpEntry, 0, len(ipMap))
  36. for ip, ts := range ipMap {
  37. out = append(out, model.ClientIpEntry{IP: ip, Timestamp: ts})
  38. }
  39. sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
  40. return out
  41. }
  42. // upsertNodeClientIps folds a guid's per-email observations into NodeClientIp,
  43. // merging with whatever is already stored for that (guid, email) and dropping
  44. // stale entries. Empty merged results delete the row so the table stays bounded.
  45. func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry) error {
  46. if guid == "" || len(perEmail) == 0 {
  47. return nil
  48. }
  49. db := database.GetDB()
  50. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  51. var existing []model.NodeClientIp
  52. if err := db.Where("node_guid = ?", guid).Find(&existing).Error; err != nil {
  53. return err
  54. }
  55. existingByEmail := make(map[string]*model.NodeClientIp, len(existing))
  56. for i := range existing {
  57. existingByEmail[existing[i].Email] = &existing[i]
  58. }
  59. // Deterministic row order keeps concurrent guid merges from deadlocking on
  60. // Postgres (40P01) — same discipline as MergeInboundClientIps.
  61. emails := make([]string, 0, len(perEmail))
  62. for email := range perEmail {
  63. if email != "" {
  64. emails = append(emails, email)
  65. }
  66. }
  67. sort.Strings(emails)
  68. tx := db.Begin()
  69. defer func() {
  70. if r := recover(); r != nil {
  71. tx.Rollback()
  72. }
  73. }()
  74. for _, email := range emails {
  75. incoming := perEmail[email]
  76. var old []model.ClientIpEntry
  77. if cur, ok := existingByEmail[email]; ok && cur.Ips != "" {
  78. _ = json.Unmarshal([]byte(cur.Ips), &old)
  79. }
  80. merged := mergeModelClientIpEntries(old, incoming, cutoff)
  81. if len(merged) == 0 {
  82. // Nothing fresh: drop any stale row so attribution doesn't linger.
  83. if _, ok := existingByEmail[email]; ok {
  84. if err := tx.Where("node_guid = ? AND email = ?", guid, email).
  85. Delete(&model.NodeClientIp{}).Error; err != nil {
  86. tx.Rollback()
  87. return err
  88. }
  89. }
  90. continue
  91. }
  92. b, _ := json.Marshal(merged)
  93. row := model.NodeClientIp{NodeGuid: guid, Email: email, Ips: string(b)}
  94. if err := tx.Clauses(clause.OnConflict{
  95. Columns: []clause.Column{{Name: "node_guid"}, {Name: "email"}},
  96. DoUpdates: clause.AssignmentColumns([]string{"ips"}),
  97. }).Create(&row).Error; err != nil {
  98. tx.Rollback()
  99. return err
  100. }
  101. }
  102. return tx.Commit().Error
  103. }
  104. // RecordLocalClientIps stores this panel's own Xray observations under its
  105. // panelGuid. Called by check_client_ip_job each scan with the live per-email IPs
  106. // the local core reported.
  107. func (s *InboundService) RecordLocalClientIps(panelGuid string, observed map[string][]model.ClientIpEntry) error {
  108. return upsertNodeClientIps(panelGuid, observed)
  109. }
  110. // MergeClientIpsByGuid folds a node's guid-keyed attribution report (its own
  111. // panelGuid subtree plus any descendants) into the local table, preserving which
  112. // physical node each IP is on across a chain. When node is non-nil and its own
  113. // panelGuid is ambiguous (shared with another node or the master — a cloned
  114. // server), the node's own subtree is remapped to its node-unique key so two
  115. // clones don't collapse into one attribution row; descendant subtrees keep their
  116. // distinct GUIDs. A nil node merges the report verbatim.
  117. func (s *InboundService) MergeClientIpsByGuid(node *model.Node, trees map[string]map[string][]model.ClientIpEntry) error {
  118. if node != nil && node.Guid != "" {
  119. if eff := effectiveNodeKey(node); eff != node.Guid {
  120. if sub, ok := trees[node.Guid]; ok {
  121. delete(trees, node.Guid)
  122. if existing, ok := trees[eff]; ok {
  123. for email, ips := range sub {
  124. existing[email] = append(existing[email], ips...)
  125. }
  126. } else {
  127. trees[eff] = sub
  128. }
  129. }
  130. }
  131. }
  132. for guid, perEmail := range trees {
  133. if err := upsertNodeClientIps(guid, perEmail); err != nil {
  134. return err
  135. }
  136. }
  137. return nil
  138. }
  139. // GetClientIpsByGuid returns this panel's full attribution subtree (guid -> email
  140. // -> fresh IPs), dropping stale entries. It is what the clientIpsByGuid endpoint
  141. // serves to a parent panel.
  142. func (s *InboundService) GetClientIpsByGuid() (map[string]map[string][]model.ClientIpEntry, error) {
  143. db := database.GetDB()
  144. var rows []model.NodeClientIp
  145. if err := db.Find(&rows).Error; err != nil {
  146. return nil, err
  147. }
  148. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  149. out := make(map[string]map[string][]model.ClientIpEntry)
  150. for _, row := range rows {
  151. if row.NodeGuid == "" || row.Email == "" || row.Ips == "" {
  152. continue
  153. }
  154. var entries []model.ClientIpEntry
  155. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  156. continue
  157. }
  158. fresh := mergeModelClientIpEntries(nil, entries, cutoff)
  159. if len(fresh) == 0 {
  160. continue
  161. }
  162. if out[row.NodeGuid] == nil {
  163. out[row.NodeGuid] = make(map[string][]model.ClientIpEntry)
  164. }
  165. out[row.NodeGuid][row.Email] = fresh
  166. }
  167. return out, nil
  168. }
  169. // GetClientIpNodeAttribution returns, for one client email, a map of IP -> the
  170. // guid that most recently observed it (within the stale window). Used to label
  171. // each IP in the panel with the node it is connecting to.
  172. func (s *InboundService) GetClientIpNodeAttribution(email string) (map[string]string, error) {
  173. db := database.GetDB()
  174. var rows []model.NodeClientIp
  175. if err := db.Where("email = ?", email).Find(&rows).Error; err != nil {
  176. return nil, err
  177. }
  178. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  179. ipGuid := make(map[string]string)
  180. ipTs := make(map[string]int64)
  181. for _, row := range rows {
  182. if row.NodeGuid == "" || row.Ips == "" {
  183. continue
  184. }
  185. var entries []model.ClientIpEntry
  186. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  187. continue
  188. }
  189. for _, e := range entries {
  190. if e.IP == "" || e.Timestamp < cutoff {
  191. continue
  192. }
  193. if cur, ok := ipTs[e.IP]; !ok || e.Timestamp > cur {
  194. ipTs[e.IP] = e.Timestamp
  195. ipGuid[e.IP] = row.NodeGuid
  196. }
  197. }
  198. }
  199. return ipGuid, nil
  200. }
  201. // ClientIpInfo is one IP shown in the panel's per-client IP log, labelled with
  202. // the node it is connecting through ("" = this local panel).
  203. type ClientIpInfo struct {
  204. IP string `json:"ip"`
  205. Time string `json:"time"`
  206. Node string `json:"node"`
  207. }
  208. // GetClientIpsWithNodes returns a client's recorded IPs (from the flat
  209. // inbound_client_ips display set) annotated with the node each IP is on, using
  210. // the per-node attribution table. Local IPs (and any IP without attribution)
  211. // carry an empty Node.
  212. func (s *InboundService) GetClientIpsWithNodes(email string) ([]ClientIpInfo, error) {
  213. raw, err := s.GetInboundClientIps(email)
  214. if err != nil || raw == "" {
  215. // Record-not-found (or empty) is "no IPs", not an error for the UI.
  216. return []ClientIpInfo{}, nil
  217. }
  218. var entries []model.ClientIpEntry
  219. if jerr := json.Unmarshal([]byte(raw), &entries); jerr != nil || len(entries) == 0 {
  220. // Legacy shape: a plain JSON array of IP strings.
  221. var oldIps []string
  222. if json.Unmarshal([]byte(raw), &oldIps) == nil {
  223. entries = entries[:0]
  224. for _, ip := range oldIps {
  225. entries = append(entries, model.ClientIpEntry{IP: ip})
  226. }
  227. }
  228. }
  229. if len(entries) == 0 {
  230. return []ClientIpInfo{}, nil
  231. }
  232. attr, _ := s.GetClientIpNodeAttribution(email)
  233. guidName := s.nodeGuidNameMap()
  234. localGuid, _ := (&SettingService{}).GetPanelGuid()
  235. out := make([]ClientIpInfo, 0, len(entries))
  236. for _, e := range entries {
  237. if e.IP == "" {
  238. continue
  239. }
  240. info := ClientIpInfo{IP: e.IP}
  241. if e.Timestamp > 0 {
  242. info.Time = time.Unix(e.Timestamp, 0).Local().Format("2006-01-02 15:04:05")
  243. }
  244. if guid, ok := attr[e.IP]; ok && guid != "" && guid != localGuid {
  245. info.Node = guidName[guid]
  246. }
  247. out = append(out, info)
  248. }
  249. return out, nil
  250. }
  251. // nodeGuidNameMap maps each known node's attribution key to its display name,
  252. // keyed by effectiveNodeGuid so a cloned node's IPs (stored under its node-unique
  253. // key) still resolve to the right name instead of colliding under a shared GUID.
  254. func (s *InboundService) nodeGuidNameMap() map[string]string {
  255. db := database.GetDB()
  256. var nodes []model.Node
  257. if err := db.Model(&model.Node{}).Find(&nodes).Error; err != nil {
  258. return map[string]string{}
  259. }
  260. ptrs := make([]*model.Node, len(nodes))
  261. for i := range nodes {
  262. ptrs[i] = &nodes[i]
  263. }
  264. selfGuid, _ := (&SettingService{}).GetPanelGuid()
  265. ambiguous := ambiguousNodeGuids(ptrs, selfGuid)
  266. m := make(map[string]string, len(nodes))
  267. for i := range nodes {
  268. m[effectiveNodeGuid(&nodes[i], ambiguous)] = nodes[i].Name
  269. }
  270. return m
  271. }
  272. // DeleteNodeClientIpsByGuid removes all attribution rows for a guid (e.g. when a
  273. // node is deleted) so its IPs stop being reported and counted.
  274. func (s *InboundService) DeleteNodeClientIpsByGuid(guid string) error {
  275. if guid == "" {
  276. return nil
  277. }
  278. db := database.GetDB()
  279. return db.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error
  280. }