inbound_disable.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  11. "gorm.io/gorm"
  12. )
  13. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  14. now := time.Now().Unix() * 1000
  15. needRestart := false
  16. if p != nil {
  17. var tags []string
  18. err := tx.Table("inbounds").
  19. Select("inbounds.tag").
  20. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  21. Scan(&tags).Error
  22. if err != nil {
  23. return false, 0, err
  24. }
  25. s.xrayApi.Init(p.GetAPIPort())
  26. for _, tag := range tags {
  27. err1 := s.xrayApi.DelInbound(tag)
  28. if err1 == nil {
  29. logger.Debug("Inbound disabled by api:", tag)
  30. } else {
  31. logger.Debug("Error in disabling inbound by api:", err1)
  32. needRestart = true
  33. }
  34. }
  35. s.xrayApi.Close()
  36. }
  37. result := tx.Model(model.Inbound{}).
  38. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  39. Update("enable", false)
  40. err := result.Error
  41. count := result.RowsAffected
  42. return needRestart, count, err
  43. }
  44. // depletedClientsCond matches clients that exhausted their quota or expired.
  45. // Besides the local counters it also trips on the cross-panel usage a master
  46. // pushed into client_global_traffics — that's what lets a node cut a client
  47. // whose combined usage exceeds the quota even though the local share doesn't
  48. // (placeholders: now).
  49. const depletedClientsCond = `((total > 0 AND up + down >= total)
  50. OR (expiry_time > 0 AND expiry_time <= ?)
  51. OR (total > 0 AND EXISTS (
  52. SELECT 1 FROM client_global_traffics g
  53. WHERE g.email = client_traffics.email AND g.up + g.down >= client_traffics.total
  54. )))`
  55. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
  56. now := time.Now().Unix() * 1000
  57. needRestart := false
  58. var depletedRows []xray.ClientTraffic
  59. err := tx.Model(xray.ClientTraffic{}).
  60. Where(depletedClientsCond+" AND enable = ?", now, true).
  61. Find(&depletedRows).Error
  62. if err != nil {
  63. return false, 0, nil, err
  64. }
  65. if len(depletedRows) == 0 {
  66. return false, 0, nil, nil
  67. }
  68. depletedEmails := make([]string, 0, len(depletedRows))
  69. for i := range depletedRows {
  70. if depletedRows[i].Email == "" {
  71. continue
  72. }
  73. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  74. }
  75. type target struct {
  76. InboundID int `gorm:"column:inbound_id"`
  77. NodeID *int `gorm:"column:node_id"`
  78. Tag string
  79. Email string
  80. }
  81. var targets []target
  82. if len(depletedEmails) > 0 {
  83. err = tx.Raw(`
  84. SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
  85. inbounds.tag AS tag, clients.email AS email
  86. FROM clients
  87. JOIN client_inbounds ON client_inbounds.client_id = clients.id
  88. JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
  89. WHERE clients.email IN ?
  90. `, depletedEmails).Scan(&targets).Error
  91. if err != nil {
  92. return false, 0, nil, err
  93. }
  94. }
  95. var localTargets []target
  96. localByInbound := make(map[int]map[string]struct{})
  97. remoteByInbound := make(map[int][]target)
  98. for _, t := range targets {
  99. if t.NodeID == nil {
  100. localTargets = append(localTargets, t)
  101. if localByInbound[t.InboundID] == nil {
  102. localByInbound[t.InboundID] = make(map[string]struct{})
  103. }
  104. localByInbound[t.InboundID][t.Email] = struct{}{}
  105. } else {
  106. remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
  107. }
  108. }
  109. if p != nil && len(localTargets) > 0 {
  110. s.xrayApi.Init(p.GetAPIPort())
  111. for _, t := range localTargets {
  112. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  113. if err1 == nil {
  114. logger.Debug("Client disabled by api:", t.Email)
  115. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  116. logger.Debug("User is already disabled. Nothing to do more...")
  117. } else {
  118. logger.Debug("Error in disabling client by api:", err1)
  119. needRestart = true
  120. }
  121. }
  122. s.xrayApi.Close()
  123. }
  124. for inboundID, emails := range localByInbound {
  125. if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil {
  126. logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr)
  127. }
  128. }
  129. result := tx.Model(xray.ClientTraffic{}).
  130. Where(depletedClientsCond+" AND enable = ?", now, true).
  131. Update("enable", false)
  132. err = result.Error
  133. count := result.RowsAffected
  134. if err != nil {
  135. return needRestart, count, nil, err
  136. }
  137. if len(depletedEmails) > 0 {
  138. if err := tx.Model(&model.ClientRecord{}).
  139. Where("email IN ?", depletedEmails).
  140. Updates(map[string]any{"enable": false, "updated_at": now}).Error; err != nil {
  141. logger.Warning("disableInvalidClients update clients.enable:", err)
  142. }
  143. }
  144. disabledNodeIDs := make(map[int]struct{})
  145. for inboundID, group := range remoteByInbound {
  146. emails := make(map[string]struct{}, len(group))
  147. for _, t := range group {
  148. emails[t.Email] = struct{}{}
  149. }
  150. if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
  151. logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
  152. needRestart = true
  153. } else {
  154. for _, t := range group {
  155. if t.NodeID != nil {
  156. disabledNodeIDs[*t.NodeID] = struct{}{}
  157. }
  158. }
  159. }
  160. }
  161. nodeIDs := make([]int, 0, len(disabledNodeIDs))
  162. for nodeID := range disabledNodeIDs {
  163. nodeIDs = append(nodeIDs, nodeID)
  164. }
  165. return needRestart, count, nodeIDs, nil
  166. }
  167. // markClientsDisabledInSettings flips client.enable=false in the inbound's
  168. // stored settings JSON for the given emails and returns both the pre and
  169. // post snapshots so a caller pushing to a remote node has the diff to hand.
  170. func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) {
  171. var ib model.Inbound
  172. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
  173. return nil, nil, err
  174. }
  175. snapshot := ib
  176. settings := map[string]any{}
  177. if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
  178. return nil, nil, err
  179. }
  180. clients, _ := settings["clients"].([]any)
  181. now := time.Now().Unix() * 1000
  182. mutated := false
  183. for i := range clients {
  184. entry, ok := clients[i].(map[string]any)
  185. if !ok {
  186. continue
  187. }
  188. email, _ := entry["email"].(string)
  189. if _, hit := emails[email]; !hit {
  190. continue
  191. }
  192. if cur, _ := entry["enable"].(bool); cur == false {
  193. continue
  194. }
  195. entry["enable"] = false
  196. entry["updated_at"] = now
  197. clients[i] = entry
  198. mutated = true
  199. }
  200. if !mutated {
  201. return &snapshot, &ib, nil
  202. }
  203. settings["clients"] = clients
  204. bs, marshalErr := json.MarshalIndent(settings, "", " ")
  205. if marshalErr != nil {
  206. return nil, nil, marshalErr
  207. }
  208. ib.Settings = string(bs)
  209. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
  210. Update("settings", ib.Settings).Error; err != nil {
  211. return nil, nil, err
  212. }
  213. return &snapshot, &ib, nil
  214. }
  215. func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
  216. oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
  217. if err != nil {
  218. return err
  219. }
  220. rt, err := s.runtimeFor(ib)
  221. if err != nil {
  222. return err
  223. }
  224. if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil {
  225. return err
  226. }
  227. return nil
  228. }