1
0

inbound_disable.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "slices"
  7. "strings"
  8. "time"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  10. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  11. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  12. "gorm.io/gorm"
  13. )
  14. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  15. now := time.Now().Unix() * 1000
  16. needRestart := false
  17. if p != nil {
  18. var tags []string
  19. err := tx.Table("inbounds").
  20. Select("inbounds.tag").
  21. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  22. Scan(&tags).Error
  23. if err != nil {
  24. return false, 0, err
  25. }
  26. _ = s.xrayApi.Init(p.GetAPIPort())
  27. for _, tag := range tags {
  28. err1 := s.xrayApi.DelInbound(tag)
  29. if err1 == nil {
  30. logger.Debug("Inbound disabled by api:", tag)
  31. } else {
  32. logger.Debug("Error in disabling inbound by api:", err1)
  33. needRestart = true
  34. }
  35. }
  36. s.xrayApi.Close()
  37. }
  38. result := tx.Model(model.Inbound{}).
  39. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  40. Update("enable", false)
  41. err := result.Error
  42. count := result.RowsAffected
  43. return needRestart, count, err
  44. }
  45. // depletedClientsCond matches clients that exhausted their quota or expired.
  46. // Besides the local counters it also trips on the cross-panel usage a master
  47. // pushed into client_global_traffics — that's what lets a node cut a client
  48. // whose combined usage exceeds the quota even though the local share doesn't
  49. // (placeholders: now).
  50. const depletedClientsCond = `((total > 0 AND up + down >= total)
  51. OR (expiry_time > 0 AND expiry_time <= ?)
  52. OR (total > 0 AND EXISTS (
  53. SELECT 1 FROM client_global_traffics g
  54. WHERE g.email = client_traffics.email AND g.up + g.down >= client_traffics.total
  55. )))`
  56. // depletedClientsCondLocal is depletedClientsCond without the cross-panel
  57. // client_global_traffics check. The EXISTS branch is a correlated subquery that
  58. // turns every traffic poll into a full client_traffics scan; on a panel no
  59. // master pushes to (the common case) client_global_traffics is empty, so the
  60. // branch can never match and is pure CPU cost (#5392).
  61. const depletedClientsCondLocal = `((total > 0 AND up + down >= total)
  62. OR (expiry_time > 0 AND expiry_time <= ?))`
  63. // depletedCond returns the local-only predicate unless this panel actually
  64. // holds global-traffic rows, in which case the cross-panel EXISTS check is
  65. // needed to enforce combined quota. Both variants take the same single
  66. // expiry_time placeholder, so callers pass identical args either way.
  67. func depletedCond(tx *gorm.DB) string {
  68. var probe int64
  69. if err := tx.Model(&model.ClientGlobalTraffic{}).Limit(1).Count(&probe).Error; err == nil && probe > 0 {
  70. return depletedClientsCond
  71. }
  72. return depletedClientsCondLocal
  73. }
  74. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
  75. now := time.Now().Unix() * 1000
  76. needRestart := false
  77. cond := depletedCond(tx)
  78. var depletedRows []xray.ClientTraffic
  79. err := tx.Model(xray.ClientTraffic{}).
  80. Where(cond+" AND enable = ?", now, true).
  81. Find(&depletedRows).Error
  82. if err != nil {
  83. return false, 0, err
  84. }
  85. if len(depletedRows) == 0 {
  86. return false, 0, nil
  87. }
  88. depletedEmails := make([]string, 0, len(depletedRows))
  89. for i := range depletedRows {
  90. if depletedRows[i].Email == "" {
  91. continue
  92. }
  93. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  94. }
  95. type target struct {
  96. InboundID int `gorm:"column:inbound_id"`
  97. NodeID *int `gorm:"column:node_id"`
  98. Tag string
  99. Email string
  100. }
  101. var targets []target
  102. if len(depletedEmails) > 0 {
  103. err = tx.Raw(`
  104. SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
  105. inbounds.tag AS tag, clients.email AS email
  106. FROM clients
  107. JOIN client_inbounds ON client_inbounds.client_id = clients.id
  108. JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
  109. WHERE clients.email IN ?
  110. `, depletedEmails).Scan(&targets).Error
  111. if err != nil {
  112. return false, 0, err
  113. }
  114. }
  115. var localTargets []target
  116. localByInbound := make(map[int]map[string]struct{})
  117. remoteByInbound := make(map[int][]target)
  118. for _, t := range targets {
  119. if t.NodeID == nil {
  120. localTargets = append(localTargets, t)
  121. if localByInbound[t.InboundID] == nil {
  122. localByInbound[t.InboundID] = make(map[string]struct{})
  123. }
  124. localByInbound[t.InboundID][t.Email] = struct{}{}
  125. } else {
  126. remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
  127. }
  128. }
  129. if p != nil && len(localTargets) > 0 {
  130. _ = s.xrayApi.Init(p.GetAPIPort())
  131. for _, t := range localTargets {
  132. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  133. if err1 == nil {
  134. logger.Debug("Client disabled by api:", t.Email)
  135. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  136. logger.Debug("User is already disabled. Nothing to do more...")
  137. } else {
  138. logger.Debug("Error in disabling client by api:", err1)
  139. needRestart = true
  140. }
  141. }
  142. s.xrayApi.Close()
  143. }
  144. for inboundID, emails := range localByInbound {
  145. if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil {
  146. logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr)
  147. }
  148. }
  149. // Flip the rows already collected above by primary key instead of
  150. // re-evaluating the depleted predicate, which was a second full scan of
  151. // client_traffics on every poll. Sorted ids keep the lock order stable.
  152. ids := make([]int, 0, len(depletedRows))
  153. for i := range depletedRows {
  154. ids = append(ids, depletedRows[i].Id)
  155. }
  156. slices.Sort(ids)
  157. var count int64
  158. for _, batch := range chunkInts(ids, sqlInChunk) {
  159. result := tx.Model(xray.ClientTraffic{}).
  160. Where("id IN ? AND enable = ?", batch, true).
  161. Update("enable", false)
  162. if result.Error != nil {
  163. return needRestart, count, result.Error
  164. }
  165. count += result.RowsAffected
  166. }
  167. if len(depletedEmails) > 0 {
  168. if err := tx.Model(&model.ClientRecord{}).
  169. Where("email IN ?", depletedEmails).
  170. Updates(map[string]any{"enable": false, "updated_at": now}).Error; err != nil {
  171. logger.Warning("disableInvalidClients update clients.enable:", err)
  172. }
  173. }
  174. for inboundID, group := range remoteByInbound {
  175. emails := make(map[string]struct{}, len(group))
  176. for _, t := range group {
  177. emails[t.Email] = struct{}{}
  178. }
  179. if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
  180. logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
  181. needRestart = true
  182. }
  183. }
  184. return needRestart, count, nil
  185. }
  186. // markClientsDisabledInSettings flips client.enable=false in the inbound's
  187. // stored settings JSON for the given emails and returns both the pre and
  188. // post snapshots so a caller pushing to a remote node has the diff to hand.
  189. func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) {
  190. var ib model.Inbound
  191. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
  192. return nil, nil, err
  193. }
  194. snapshot := ib
  195. settings := map[string]any{}
  196. if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
  197. return nil, nil, err
  198. }
  199. clients, _ := settings["clients"].([]any)
  200. now := time.Now().Unix() * 1000
  201. mutated := false
  202. for i := range clients {
  203. entry, ok := clients[i].(map[string]any)
  204. if !ok {
  205. continue
  206. }
  207. email, _ := entry["email"].(string)
  208. if _, hit := emails[email]; !hit {
  209. continue
  210. }
  211. if cur, _ := entry["enable"].(bool); !cur {
  212. continue
  213. }
  214. entry["enable"] = false
  215. entry["updated_at"] = now
  216. clients[i] = entry
  217. mutated = true
  218. }
  219. if !mutated {
  220. return &snapshot, &ib, nil
  221. }
  222. settings["clients"] = clients
  223. bs, marshalErr := json.MarshalIndent(settings, "", " ")
  224. if marshalErr != nil {
  225. return nil, nil, marshalErr
  226. }
  227. ib.Settings = string(bs)
  228. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
  229. Update("settings", ib.Settings).Error; err != nil {
  230. return nil, nil, err
  231. }
  232. return &snapshot, &ib, nil
  233. }
  234. // disableRemoteClients flips the clients off in the inbound's stored settings
  235. // and pushes the updated inbound to its node, which applies it to its own
  236. // running Xray. That push is the whole reconcile — restarting the node's Xray
  237. // afterwards would drop every live connection on the node for nothing (#5740).
  238. func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
  239. oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
  240. if err != nil {
  241. return err
  242. }
  243. rt, err := s.runtimeFor(ib)
  244. if err != nil {
  245. return err
  246. }
  247. if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil {
  248. return err
  249. }
  250. return nil
  251. }