inbound_node_ips.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package service
  2. import (
  3. "encoding/json"
  4. "sort"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "gorm.io/gorm/clause"
  9. )
  10. // node_client_ips.go implements per-node client-IP attribution. The flat
  11. // inbound_client_ips table is a cluster-wide union (used for IP-limit counting
  12. // and pushed back to every node), so it cannot tell which node a given IP is
  13. // on. NodeClientIp keeps that attribution: each panel records its own Xray
  14. // observations under its panelGuid, and the master merges every node's
  15. // guid-keyed report — never mixing in IPs a parent pushed down.
  16. // mergeModelClientIpEntries unions old and incoming observations, drops anything
  17. // older than cutoff, keeps the newest timestamp per IP, and sorts newest-first.
  18. // It mirrors mergeClientIpEntries but operates on the exported wire type.
  19. func mergeModelClientIpEntries(old, incoming []model.ClientIpEntry, cutoff int64) []model.ClientIpEntry {
  20. ipMap := make(map[string]int64, len(old)+len(incoming))
  21. for _, e := range old {
  22. if e.IP == "" || e.Timestamp < cutoff {
  23. continue
  24. }
  25. ipMap[e.IP] = e.Timestamp
  26. }
  27. for _, e := range incoming {
  28. if e.IP == "" || e.Timestamp < cutoff {
  29. continue
  30. }
  31. if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
  32. ipMap[e.IP] = e.Timestamp
  33. }
  34. }
  35. out := make([]model.ClientIpEntry, 0, len(ipMap))
  36. for ip, ts := range ipMap {
  37. out = append(out, model.ClientIpEntry{IP: ip, Timestamp: ts})
  38. }
  39. sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
  40. return out
  41. }
  42. // upsertNodeClientIps folds a guid's per-email observations into NodeClientIp,
  43. // merging with whatever is already stored for that (guid, email) and dropping
  44. // stale entries. Empty merged results delete the row so the table stays bounded.
  45. func upsertNodeClientIps(guid string, perEmail map[string][]model.ClientIpEntry) error {
  46. if guid == "" || len(perEmail) == 0 {
  47. return nil
  48. }
  49. db := database.GetDB()
  50. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  51. var existing []model.NodeClientIp
  52. if err := db.Where("node_guid = ?", guid).Find(&existing).Error; err != nil {
  53. return err
  54. }
  55. existingByEmail := make(map[string]*model.NodeClientIp, len(existing))
  56. for i := range existing {
  57. existingByEmail[existing[i].Email] = &existing[i]
  58. }
  59. tx := db.Begin()
  60. defer func() {
  61. if r := recover(); r != nil {
  62. tx.Rollback()
  63. }
  64. }()
  65. for email, incoming := range perEmail {
  66. if email == "" {
  67. continue
  68. }
  69. var old []model.ClientIpEntry
  70. if cur, ok := existingByEmail[email]; ok && cur.Ips != "" {
  71. _ = json.Unmarshal([]byte(cur.Ips), &old)
  72. }
  73. merged := mergeModelClientIpEntries(old, incoming, cutoff)
  74. if len(merged) == 0 {
  75. // Nothing fresh: drop any stale row so attribution doesn't linger.
  76. if _, ok := existingByEmail[email]; ok {
  77. if err := tx.Where("node_guid = ? AND email = ?", guid, email).
  78. Delete(&model.NodeClientIp{}).Error; err != nil {
  79. tx.Rollback()
  80. return err
  81. }
  82. }
  83. continue
  84. }
  85. b, _ := json.Marshal(merged)
  86. row := model.NodeClientIp{NodeGuid: guid, Email: email, Ips: string(b)}
  87. if err := tx.Clauses(clause.OnConflict{
  88. Columns: []clause.Column{{Name: "node_guid"}, {Name: "email"}},
  89. DoUpdates: clause.AssignmentColumns([]string{"ips"}),
  90. }).Create(&row).Error; err != nil {
  91. tx.Rollback()
  92. return err
  93. }
  94. }
  95. return tx.Commit().Error
  96. }
  97. // RecordLocalClientIps stores this panel's own Xray observations under its
  98. // panelGuid. Called by check_client_ip_job each scan with the live per-email IPs
  99. // the local core reported.
  100. func (s *InboundService) RecordLocalClientIps(panelGuid string, observed map[string][]model.ClientIpEntry) error {
  101. return upsertNodeClientIps(panelGuid, observed)
  102. }
  103. // MergeClientIpsByGuid folds a node's guid-keyed attribution report (its own
  104. // panelGuid subtree plus any descendants) into the local table, preserving which
  105. // physical node each IP is on across a chain.
  106. func (s *InboundService) MergeClientIpsByGuid(trees map[string]map[string][]model.ClientIpEntry) error {
  107. for guid, perEmail := range trees {
  108. if err := upsertNodeClientIps(guid, perEmail); err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. }
  114. // GetClientIpsByGuid returns this panel's full attribution subtree (guid -> email
  115. // -> fresh IPs), dropping stale entries. It is what the clientIpsByGuid endpoint
  116. // serves to a parent panel.
  117. func (s *InboundService) GetClientIpsByGuid() (map[string]map[string][]model.ClientIpEntry, error) {
  118. db := database.GetDB()
  119. var rows []model.NodeClientIp
  120. if err := db.Find(&rows).Error; err != nil {
  121. return nil, err
  122. }
  123. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  124. out := make(map[string]map[string][]model.ClientIpEntry)
  125. for _, row := range rows {
  126. if row.NodeGuid == "" || row.Email == "" || row.Ips == "" {
  127. continue
  128. }
  129. var entries []model.ClientIpEntry
  130. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  131. continue
  132. }
  133. fresh := mergeModelClientIpEntries(nil, entries, cutoff)
  134. if len(fresh) == 0 {
  135. continue
  136. }
  137. if out[row.NodeGuid] == nil {
  138. out[row.NodeGuid] = make(map[string][]model.ClientIpEntry)
  139. }
  140. out[row.NodeGuid][row.Email] = fresh
  141. }
  142. return out, nil
  143. }
  144. // GetClientIpNodeAttribution returns, for one client email, a map of IP -> the
  145. // guid that most recently observed it (within the stale window). Used to label
  146. // each IP in the panel with the node it is connecting to.
  147. func (s *InboundService) GetClientIpNodeAttribution(email string) (map[string]string, error) {
  148. db := database.GetDB()
  149. var rows []model.NodeClientIp
  150. if err := db.Where("email = ?", email).Find(&rows).Error; err != nil {
  151. return nil, err
  152. }
  153. cutoff := time.Now().Unix() - clientIpStaleAfterSeconds
  154. ipGuid := make(map[string]string)
  155. ipTs := make(map[string]int64)
  156. for _, row := range rows {
  157. if row.NodeGuid == "" || row.Ips == "" {
  158. continue
  159. }
  160. var entries []model.ClientIpEntry
  161. if err := json.Unmarshal([]byte(row.Ips), &entries); err != nil {
  162. continue
  163. }
  164. for _, e := range entries {
  165. if e.IP == "" || e.Timestamp < cutoff {
  166. continue
  167. }
  168. if cur, ok := ipTs[e.IP]; !ok || e.Timestamp > cur {
  169. ipTs[e.IP] = e.Timestamp
  170. ipGuid[e.IP] = row.NodeGuid
  171. }
  172. }
  173. }
  174. return ipGuid, nil
  175. }
  176. // ClientIpInfo is one IP shown in the panel's per-client IP log, labelled with
  177. // the node it is connecting through ("" = this local panel).
  178. type ClientIpInfo struct {
  179. IP string `json:"ip"`
  180. Time string `json:"time"`
  181. Node string `json:"node"`
  182. }
  183. // GetClientIpsWithNodes returns a client's recorded IPs (from the flat
  184. // inbound_client_ips display set) annotated with the node each IP is on, using
  185. // the per-node attribution table. Local IPs (and any IP without attribution)
  186. // carry an empty Node.
  187. func (s *InboundService) GetClientIpsWithNodes(email string) ([]ClientIpInfo, error) {
  188. raw, err := s.GetInboundClientIps(email)
  189. if err != nil || raw == "" {
  190. // Record-not-found (or empty) is "no IPs", not an error for the UI.
  191. return []ClientIpInfo{}, nil
  192. }
  193. var entries []model.ClientIpEntry
  194. if jerr := json.Unmarshal([]byte(raw), &entries); jerr != nil || len(entries) == 0 {
  195. // Legacy shape: a plain JSON array of IP strings.
  196. var oldIps []string
  197. if json.Unmarshal([]byte(raw), &oldIps) == nil {
  198. entries = entries[:0]
  199. for _, ip := range oldIps {
  200. entries = append(entries, model.ClientIpEntry{IP: ip})
  201. }
  202. }
  203. }
  204. if len(entries) == 0 {
  205. return []ClientIpInfo{}, nil
  206. }
  207. attr, _ := s.GetClientIpNodeAttribution(email)
  208. guidName := s.nodeGuidNameMap()
  209. localGuid, _ := (&SettingService{}).GetPanelGuid()
  210. out := make([]ClientIpInfo, 0, len(entries))
  211. for _, e := range entries {
  212. if e.IP == "" {
  213. continue
  214. }
  215. info := ClientIpInfo{IP: e.IP}
  216. if e.Timestamp > 0 {
  217. info.Time = time.Unix(e.Timestamp, 0).Local().Format("2006-01-02 15:04:05")
  218. }
  219. if guid, ok := attr[e.IP]; ok && guid != "" && guid != localGuid {
  220. info.Node = guidName[guid]
  221. }
  222. out = append(out, info)
  223. }
  224. return out, nil
  225. }
  226. // nodeGuidNameMap maps each known node's stable guid to its display name.
  227. func (s *InboundService) nodeGuidNameMap() map[string]string {
  228. db := database.GetDB()
  229. var nodes []model.Node
  230. if err := db.Model(&model.Node{}).Find(&nodes).Error; err != nil {
  231. return map[string]string{}
  232. }
  233. m := make(map[string]string, len(nodes))
  234. for _, n := range nodes {
  235. if n.Guid != "" {
  236. m[n.Guid] = n.Name
  237. }
  238. }
  239. return m
  240. }
  241. // DeleteNodeClientIpsByGuid removes all attribution rows for a guid (e.g. when a
  242. // node is deleted) so its IPs stop being reported and counted.
  243. func (s *InboundService) DeleteNodeClientIpsByGuid(guid string) error {
  244. if guid == "" {
  245. return nil
  246. }
  247. db := database.GetDB()
  248. return db.Where("node_guid = ?", guid).Delete(&model.NodeClientIp{}).Error
  249. }