1
0

inbound_disable.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  11. "gorm.io/gorm"
  12. )
  13. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  14. now := time.Now().Unix() * 1000
  15. needRestart := false
  16. if p != nil {
  17. var tags []string
  18. err := tx.Table("inbounds").
  19. Select("inbounds.tag").
  20. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  21. Scan(&tags).Error
  22. if err != nil {
  23. return false, 0, err
  24. }
  25. s.xrayApi.Init(p.GetAPIPort())
  26. for _, tag := range tags {
  27. err1 := s.xrayApi.DelInbound(tag)
  28. if err1 == nil {
  29. logger.Debug("Inbound disabled by api:", tag)
  30. } else {
  31. logger.Debug("Error in disabling inbound by api:", err1)
  32. needRestart = true
  33. }
  34. }
  35. s.xrayApi.Close()
  36. }
  37. result := tx.Model(model.Inbound{}).
  38. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  39. Update("enable", false)
  40. err := result.Error
  41. count := result.RowsAffected
  42. return needRestart, count, err
  43. }
  44. // depletedClientsCond matches clients that exhausted their quota or expired.
  45. // Besides the local counters it also trips on the cross-panel usage a master
  46. // pushed into client_global_traffics — that's what lets a node cut a client
  47. // whose combined usage exceeds the quota even though the local share doesn't
  48. // (placeholders: now).
  49. const depletedClientsCond = `((total > 0 AND up + down >= total)
  50. OR (expiry_time > 0 AND expiry_time <= ?)
  51. OR (total > 0 AND EXISTS (
  52. SELECT 1 FROM client_global_traffics g
  53. WHERE g.email = client_traffics.email AND g.up + g.down >= client_traffics.total
  54. )))`
  55. // depletedClientsCondLocal is depletedClientsCond without the cross-panel
  56. // client_global_traffics check. The EXISTS branch is a correlated subquery that
  57. // turns every traffic poll into a full client_traffics scan; on a panel no
  58. // master pushes to (the common case) client_global_traffics is empty, so the
  59. // branch can never match and is pure CPU cost (#5392).
  60. const depletedClientsCondLocal = `((total > 0 AND up + down >= total)
  61. OR (expiry_time > 0 AND expiry_time <= ?))`
  62. // depletedCond returns the local-only predicate unless this panel actually
  63. // holds global-traffic rows, in which case the cross-panel EXISTS check is
  64. // needed to enforce combined quota. Both variants take the same single
  65. // expiry_time placeholder, so callers pass identical args either way.
  66. func depletedCond(tx *gorm.DB) string {
  67. var probe int64
  68. if err := tx.Model(&model.ClientGlobalTraffic{}).Limit(1).Count(&probe).Error; err == nil && probe > 0 {
  69. return depletedClientsCond
  70. }
  71. return depletedClientsCondLocal
  72. }
  73. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
  74. now := time.Now().Unix() * 1000
  75. needRestart := false
  76. cond := depletedCond(tx)
  77. var depletedRows []xray.ClientTraffic
  78. err := tx.Model(xray.ClientTraffic{}).
  79. Where(cond+" AND enable = ?", now, true).
  80. Find(&depletedRows).Error
  81. if err != nil {
  82. return false, 0, nil, err
  83. }
  84. if len(depletedRows) == 0 {
  85. return false, 0, nil, nil
  86. }
  87. depletedEmails := make([]string, 0, len(depletedRows))
  88. for i := range depletedRows {
  89. if depletedRows[i].Email == "" {
  90. continue
  91. }
  92. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  93. }
  94. type target struct {
  95. InboundID int `gorm:"column:inbound_id"`
  96. NodeID *int `gorm:"column:node_id"`
  97. Tag string
  98. Email string
  99. }
  100. var targets []target
  101. if len(depletedEmails) > 0 {
  102. err = tx.Raw(`
  103. SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
  104. inbounds.tag AS tag, clients.email AS email
  105. FROM clients
  106. JOIN client_inbounds ON client_inbounds.client_id = clients.id
  107. JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
  108. WHERE clients.email IN ?
  109. `, depletedEmails).Scan(&targets).Error
  110. if err != nil {
  111. return false, 0, nil, err
  112. }
  113. }
  114. var localTargets []target
  115. localByInbound := make(map[int]map[string]struct{})
  116. remoteByInbound := make(map[int][]target)
  117. for _, t := range targets {
  118. if t.NodeID == nil {
  119. localTargets = append(localTargets, t)
  120. if localByInbound[t.InboundID] == nil {
  121. localByInbound[t.InboundID] = make(map[string]struct{})
  122. }
  123. localByInbound[t.InboundID][t.Email] = struct{}{}
  124. } else {
  125. remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
  126. }
  127. }
  128. if p != nil && len(localTargets) > 0 {
  129. s.xrayApi.Init(p.GetAPIPort())
  130. for _, t := range localTargets {
  131. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  132. if err1 == nil {
  133. logger.Debug("Client disabled by api:", t.Email)
  134. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  135. logger.Debug("User is already disabled. Nothing to do more...")
  136. } else {
  137. logger.Debug("Error in disabling client by api:", err1)
  138. needRestart = true
  139. }
  140. }
  141. s.xrayApi.Close()
  142. }
  143. for inboundID, emails := range localByInbound {
  144. if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil {
  145. logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr)
  146. }
  147. }
  148. result := tx.Model(xray.ClientTraffic{}).
  149. Where(cond+" AND enable = ?", now, true).
  150. Update("enable", false)
  151. err = result.Error
  152. count := result.RowsAffected
  153. if err != nil {
  154. return needRestart, count, nil, err
  155. }
  156. if len(depletedEmails) > 0 {
  157. if err := tx.Model(&model.ClientRecord{}).
  158. Where("email IN ?", depletedEmails).
  159. Updates(map[string]any{"enable": false, "updated_at": now}).Error; err != nil {
  160. logger.Warning("disableInvalidClients update clients.enable:", err)
  161. }
  162. }
  163. disabledNodeIDs := make(map[int]struct{})
  164. for inboundID, group := range remoteByInbound {
  165. emails := make(map[string]struct{}, len(group))
  166. for _, t := range group {
  167. emails[t.Email] = struct{}{}
  168. }
  169. if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
  170. logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
  171. needRestart = true
  172. } else {
  173. for _, t := range group {
  174. if t.NodeID != nil {
  175. disabledNodeIDs[*t.NodeID] = struct{}{}
  176. }
  177. }
  178. }
  179. }
  180. nodeIDs := make([]int, 0, len(disabledNodeIDs))
  181. for nodeID := range disabledNodeIDs {
  182. nodeIDs = append(nodeIDs, nodeID)
  183. }
  184. return needRestart, count, nodeIDs, nil
  185. }
  186. // markClientsDisabledInSettings flips client.enable=false in the inbound's
  187. // stored settings JSON for the given emails and returns both the pre and
  188. // post snapshots so a caller pushing to a remote node has the diff to hand.
  189. func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) {
  190. var ib model.Inbound
  191. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
  192. return nil, nil, err
  193. }
  194. snapshot := ib
  195. settings := map[string]any{}
  196. if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
  197. return nil, nil, err
  198. }
  199. clients, _ := settings["clients"].([]any)
  200. now := time.Now().Unix() * 1000
  201. mutated := false
  202. for i := range clients {
  203. entry, ok := clients[i].(map[string]any)
  204. if !ok {
  205. continue
  206. }
  207. email, _ := entry["email"].(string)
  208. if _, hit := emails[email]; !hit {
  209. continue
  210. }
  211. if cur, _ := entry["enable"].(bool); cur == false {
  212. continue
  213. }
  214. entry["enable"] = false
  215. entry["updated_at"] = now
  216. clients[i] = entry
  217. mutated = true
  218. }
  219. if !mutated {
  220. return &snapshot, &ib, nil
  221. }
  222. settings["clients"] = clients
  223. bs, marshalErr := json.MarshalIndent(settings, "", " ")
  224. if marshalErr != nil {
  225. return nil, nil, marshalErr
  226. }
  227. ib.Settings = string(bs)
  228. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
  229. Update("settings", ib.Settings).Error; err != nil {
  230. return nil, nil, err
  231. }
  232. return &snapshot, &ib, nil
  233. }
  234. func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
  235. oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
  236. if err != nil {
  237. return err
  238. }
  239. rt, err := s.runtimeFor(ib)
  240. if err != nil {
  241. return err
  242. }
  243. if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil {
  244. return err
  245. }
  246. return nil
  247. }