inbound_client_ips.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package service
  2. import (
  3. "encoding/json"
  4. "sort"
  5. "time"
  6. "github.com/mhsanaei/3x-ui/v3/internal/database"
  7. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  8. "gorm.io/gorm"
  9. "gorm.io/gorm/clause"
  10. )
  11. func (s *InboundService) GetAllInboundClientIps() ([]model.InboundClientIps, error) {
  12. db := database.GetDB()
  13. var ips []model.InboundClientIps
  14. err := db.Model(&model.InboundClientIps{}).Find(&ips).Error
  15. return ips, err
  16. }
  17. // clientIpStaleAfterSeconds mirrors job.ipStaleAfterSeconds: client IPs older than
  18. // 30 minutes are evicted. Applying the same cutoff inside the cross-node merge keeps
  19. // the synced blob bounded and stops the master's push-back from resurrecting IPs that
  20. // a node has already pruned (otherwise the merge defeats the eviction cluster-wide).
  21. const clientIpStaleAfterSeconds = int64(30 * 60)
  22. // clientIpEntry is the on-disk shape of each element of InboundClientIps.Ips. Tags
  23. // match job.IPWithTimestamp so the blob round-trips with the access.log scanner.
  24. type clientIpEntry struct {
  25. IP string `json:"ip"`
  26. Timestamp int64 `json:"timestamp"`
  27. }
  28. // mergeClientIpEntries unions old and incoming IP observations, dropping anything
  29. // older than cutoff, keeping the most recent timestamp per IP, and returning the
  30. // result sorted newest-first.
  31. func mergeClientIpEntries(old, incoming []clientIpEntry, cutoff int64) []clientIpEntry {
  32. ipMap := make(map[string]int64, len(old)+len(incoming))
  33. for _, e := range old {
  34. if e.Timestamp < cutoff {
  35. continue
  36. }
  37. ipMap[e.IP] = e.Timestamp
  38. }
  39. for _, e := range incoming {
  40. if e.Timestamp < cutoff {
  41. continue
  42. }
  43. if cur, ok := ipMap[e.IP]; !ok || e.Timestamp > cur {
  44. ipMap[e.IP] = e.Timestamp
  45. }
  46. }
  47. out := make([]clientIpEntry, 0, len(ipMap))
  48. for ip, ts := range ipMap {
  49. out = append(out, clientIpEntry{IP: ip, Timestamp: ts})
  50. }
  51. sort.Slice(out, func(i, j int) bool { return out[i].Timestamp > out[j].Timestamp })
  52. return out
  53. }
  54. // MergeInboundClientIps folds client IPs synced from another node into the local
  55. // inbound_client_ips table without double-counting an IP seen on multiple nodes and
  56. // without resurrecting stale entries. Existing rows are updated in place; brand-new
  57. // clients (typically node-only clients with no local row) are created with a fresh
  58. // local id.
  59. func (s *InboundService) MergeInboundClientIps(incomingIps []model.InboundClientIps) error {
  60. db := database.GetDB()
  61. var currentIps []model.InboundClientIps
  62. if err := db.Model(&model.InboundClientIps{}).Find(&currentIps).Error; err != nil {
  63. return err
  64. }
  65. currentMap := make(map[string]*model.InboundClientIps, len(currentIps))
  66. for i := range currentIps {
  67. currentMap[currentIps[i].ClientEmail] = &currentIps[i]
  68. }
  69. now := time.Now().Unix()
  70. cutoff := now - clientIpStaleAfterSeconds
  71. tx := db.Begin()
  72. defer func() {
  73. if r := recover(); r != nil {
  74. tx.Rollback()
  75. }
  76. }()
  77. for _, incoming := range incomingIps {
  78. if incoming.ClientEmail == "" || incoming.Ips == "" {
  79. continue
  80. }
  81. var incomingEntries []clientIpEntry
  82. _ = json.Unmarshal([]byte(incoming.Ips), &incomingEntries)
  83. current, exists := currentMap[incoming.ClientEmail]
  84. if !exists {
  85. // New client we've never seen locally. Drop stale entries up front and
  86. // skip the row entirely if nothing is fresh, so we don't persist a row
  87. // that is dead on arrival.
  88. fresh := mergeClientIpEntries(nil, incomingEntries, cutoff)
  89. if len(fresh) == 0 {
  90. continue
  91. }
  92. b, _ := json.Marshal(fresh)
  93. incoming.Ips = string(b)
  94. // Never carry the remote node's primary key into the local table: id
  95. // spaces are independent across nodes and the remote id would collide
  96. // with an unrelated local row. OnConflict guards the race where
  97. // check_client_ip_job creates the same brand-new email between the
  98. // snapshot above and this insert.
  99. incoming.Id = 0
  100. if err := tx.Clauses(clause.OnConflict{
  101. Columns: []clause.Column{{Name: "client_email"}},
  102. DoNothing: true,
  103. }).Create(&incoming).Error; err != nil {
  104. tx.Rollback()
  105. return err
  106. }
  107. continue
  108. }
  109. var oldEntries []clientIpEntry
  110. if current.Ips != "" {
  111. _ = json.Unmarshal([]byte(current.Ips), &oldEntries)
  112. }
  113. merged := mergeClientIpEntries(oldEntries, incomingEntries, cutoff)
  114. b, _ := json.Marshal(merged)
  115. mergedStr := string(b)
  116. // A concurrent check_client_ip_job db.Save on the same row can interleave
  117. // with this update (benign last-writer-wins; any dropped IP reappears on the
  118. // next scan/sync), so only write when the blob actually changed.
  119. if current.Ips != mergedStr {
  120. if err := tx.Model(&model.InboundClientIps{}).Where("id = ?", current.Id).Update("ips", mergedStr).Error; err != nil {
  121. tx.Rollback()
  122. return err
  123. }
  124. }
  125. }
  126. return tx.Commit().Error
  127. }
  128. func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail string) error {
  129. return tx.Model(model.InboundClientIps{}).Where("client_email = ?", oldEmail).Update("client_email", newEmail).Error
  130. }
  131. func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
  132. return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
  133. }
  134. func (s *InboundService) delClientIPsByEmails(tx *gorm.DB, emails []string) error {
  135. const chunk = 400
  136. for start := 0; start < len(emails); start += chunk {
  137. end := min(start+chunk, len(emails))
  138. if err := tx.Where("client_email IN ?", emails[start:end]).Delete(model.InboundClientIps{}).Error; err != nil {
  139. return err
  140. }
  141. }
  142. return nil
  143. }
  144. func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error) {
  145. db := database.GetDB()
  146. InboundClientIps := &model.InboundClientIps{}
  147. err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
  148. if err != nil {
  149. return "", err
  150. }
  151. if InboundClientIps.Ips == "" {
  152. return "", nil
  153. }
  154. // Try to parse as new format (with timestamps)
  155. type IPWithTimestamp struct {
  156. IP string `json:"ip"`
  157. Timestamp int64 `json:"timestamp"`
  158. }
  159. var ipsWithTime []IPWithTimestamp
  160. err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
  161. // If successfully parsed as new format, return with timestamps
  162. if err == nil && len(ipsWithTime) > 0 {
  163. return InboundClientIps.Ips, nil
  164. }
  165. // Otherwise, assume it's old format (simple string array)
  166. // Try to parse as simple array and convert to new format
  167. var oldIps []string
  168. err = json.Unmarshal([]byte(InboundClientIps.Ips), &oldIps)
  169. if err == nil && len(oldIps) > 0 {
  170. // Convert old format to new format with current timestamp
  171. newIpsWithTime := make([]IPWithTimestamp, len(oldIps))
  172. for i, ip := range oldIps {
  173. newIpsWithTime[i] = IPWithTimestamp{
  174. IP: ip,
  175. Timestamp: time.Now().Unix(),
  176. }
  177. }
  178. result, _ := json.Marshal(newIpsWithTime)
  179. return string(result), nil
  180. }
  181. // Return as-is if parsing fails
  182. return InboundClientIps.Ips, nil
  183. }
  184. func (s *InboundService) ClearClientIps(clientEmail string) error {
  185. db := database.GetDB()
  186. result := db.Model(model.InboundClientIps{}).
  187. Where("client_email = ?", clientEmail).
  188. Update("ips", "")
  189. err := result.Error
  190. if err != nil {
  191. return err
  192. }
  193. return nil
  194. }