1
0

client_locks.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package service
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  6. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  7. "gorm.io/gorm"
  8. )
  9. // Short-lived tombstone of just-deleted client emails so that a node snapshot
  10. // arriving between delete and node-side processing doesn't resurrect them.
  11. var (
  12. recentlyDeletedMu sync.Mutex
  13. recentlyDeleted = map[string]time.Time{}
  14. )
  15. const deleteTombstoneTTL = 90 * time.Second
  16. var (
  17. inboundMutationLocksMu sync.Mutex
  18. inboundMutationLocks = map[int]*sync.Mutex{}
  19. )
  20. func lockInbound(inboundId int) *sync.Mutex {
  21. inboundMutationLocksMu.Lock()
  22. defer inboundMutationLocksMu.Unlock()
  23. m, ok := inboundMutationLocks[inboundId]
  24. if !ok {
  25. m = &sync.Mutex{}
  26. inboundMutationLocks[inboundId] = m
  27. }
  28. m.Lock()
  29. return m
  30. }
  31. func compactOrphans(db *gorm.DB, clients []any) []any {
  32. if len(clients) == 0 {
  33. return clients
  34. }
  35. emails := make([]string, 0, len(clients))
  36. for _, c := range clients {
  37. cm, ok := c.(map[string]any)
  38. if !ok {
  39. continue
  40. }
  41. if e, _ := cm["email"].(string); e != "" {
  42. emails = append(emails, e)
  43. }
  44. }
  45. if len(emails) == 0 {
  46. return clients
  47. }
  48. existing := make(map[string]struct{}, len(emails))
  49. const orphanChunk = 400
  50. for start := 0; start < len(emails); start += orphanChunk {
  51. end := min(start+orphanChunk, len(emails))
  52. var found []string
  53. if err := db.Model(&model.ClientRecord{}).Where("email IN ?", emails[start:end]).Pluck("email", &found).Error; err != nil {
  54. logger.Warning("compactOrphans pluck:", err)
  55. return clients
  56. }
  57. for _, e := range found {
  58. existing[e] = struct{}{}
  59. }
  60. }
  61. if len(existing) == len(emails) {
  62. return clients
  63. }
  64. out := make([]any, 0, len(existing))
  65. for _, c := range clients {
  66. cm, ok := c.(map[string]any)
  67. if !ok {
  68. out = append(out, c)
  69. continue
  70. }
  71. e, _ := cm["email"].(string)
  72. if e == "" {
  73. out = append(out, c)
  74. continue
  75. }
  76. if _, ok := existing[e]; ok {
  77. out = append(out, c)
  78. }
  79. }
  80. return out
  81. }
  82. func tombstoneClientEmail(email string) {
  83. if email == "" {
  84. return
  85. }
  86. recentlyDeletedMu.Lock()
  87. defer recentlyDeletedMu.Unlock()
  88. recentlyDeleted[email] = time.Now()
  89. cutoff := time.Now().Add(-deleteTombstoneTTL)
  90. for e, ts := range recentlyDeleted {
  91. if ts.Before(cutoff) {
  92. delete(recentlyDeleted, e)
  93. }
  94. }
  95. }
  96. func tombstoneClientEmails(emails []string) {
  97. if len(emails) == 0 {
  98. return
  99. }
  100. now := time.Now()
  101. cutoff := now.Add(-deleteTombstoneTTL)
  102. recentlyDeletedMu.Lock()
  103. defer recentlyDeletedMu.Unlock()
  104. for _, email := range emails {
  105. if email != "" {
  106. recentlyDeleted[email] = now
  107. }
  108. }
  109. for e, ts := range recentlyDeleted {
  110. if ts.Before(cutoff) {
  111. delete(recentlyDeleted, e)
  112. }
  113. }
  114. }
  115. func isClientEmailTombstoned(email string) bool {
  116. if email == "" {
  117. return false
  118. }
  119. recentlyDeletedMu.Lock()
  120. defer recentlyDeletedMu.Unlock()
  121. ts, ok := recentlyDeleted[email]
  122. if !ok {
  123. return false
  124. }
  125. if time.Since(ts) > deleteTombstoneTTL {
  126. delete(recentlyDeleted, email)
  127. return false
  128. }
  129. return true
  130. }