inbound_disable.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  9. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  10. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  11. "gorm.io/gorm"
  12. )
  13. func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
  14. now := time.Now().Unix() * 1000
  15. needRestart := false
  16. if p != nil {
  17. var tags []string
  18. err := tx.Table("inbounds").
  19. Select("inbounds.tag").
  20. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  21. Scan(&tags).Error
  22. if err != nil {
  23. return false, 0, err
  24. }
  25. s.xrayApi.Init(p.GetAPIPort())
  26. for _, tag := range tags {
  27. err1 := s.xrayApi.DelInbound(tag)
  28. if err1 == nil {
  29. logger.Debug("Inbound disabled by api:", tag)
  30. } else {
  31. logger.Debug("Error in disabling inbound by api:", err1)
  32. needRestart = true
  33. }
  34. }
  35. s.xrayApi.Close()
  36. }
  37. result := tx.Model(model.Inbound{}).
  38. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
  39. Update("enable", false)
  40. err := result.Error
  41. count := result.RowsAffected
  42. return needRestart, count, err
  43. }
  44. func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, []int, error) {
  45. now := time.Now().Unix() * 1000
  46. needRestart := false
  47. var depletedRows []xray.ClientTraffic
  48. err := tx.Model(xray.ClientTraffic{}).
  49. Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
  50. Find(&depletedRows).Error
  51. if err != nil {
  52. return false, 0, nil, err
  53. }
  54. if len(depletedRows) == 0 {
  55. return false, 0, nil, nil
  56. }
  57. depletedEmails := make([]string, 0, len(depletedRows))
  58. for i := range depletedRows {
  59. if depletedRows[i].Email == "" {
  60. continue
  61. }
  62. depletedEmails = append(depletedEmails, depletedRows[i].Email)
  63. }
  64. type target struct {
  65. InboundID int `gorm:"column:inbound_id"`
  66. NodeID *int `gorm:"column:node_id"`
  67. Tag string
  68. Email string
  69. }
  70. var targets []target
  71. if len(depletedEmails) > 0 {
  72. err = tx.Raw(`
  73. SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
  74. inbounds.tag AS tag, clients.email AS email
  75. FROM clients
  76. JOIN client_inbounds ON client_inbounds.client_id = clients.id
  77. JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
  78. WHERE clients.email IN ?
  79. `, depletedEmails).Scan(&targets).Error
  80. if err != nil {
  81. return false, 0, nil, err
  82. }
  83. }
  84. var localTargets []target
  85. localByInbound := make(map[int]map[string]struct{})
  86. remoteByInbound := make(map[int][]target)
  87. for _, t := range targets {
  88. if t.NodeID == nil {
  89. localTargets = append(localTargets, t)
  90. if localByInbound[t.InboundID] == nil {
  91. localByInbound[t.InboundID] = make(map[string]struct{})
  92. }
  93. localByInbound[t.InboundID][t.Email] = struct{}{}
  94. } else {
  95. remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
  96. }
  97. }
  98. if p != nil && len(localTargets) > 0 {
  99. s.xrayApi.Init(p.GetAPIPort())
  100. for _, t := range localTargets {
  101. err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
  102. if err1 == nil {
  103. logger.Debug("Client disabled by api:", t.Email)
  104. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
  105. logger.Debug("User is already disabled. Nothing to do more...")
  106. } else {
  107. logger.Debug("Error in disabling client by api:", err1)
  108. needRestart = true
  109. }
  110. }
  111. s.xrayApi.Close()
  112. }
  113. for inboundID, emails := range localByInbound {
  114. if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil {
  115. logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr)
  116. }
  117. }
  118. result := tx.Model(xray.ClientTraffic{}).
  119. Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
  120. Update("enable", false)
  121. err = result.Error
  122. count := result.RowsAffected
  123. if err != nil {
  124. return needRestart, count, nil, err
  125. }
  126. if len(depletedEmails) > 0 {
  127. if err := tx.Model(&model.ClientRecord{}).
  128. Where("email IN ?", depletedEmails).
  129. Updates(map[string]any{"enable": false, "updated_at": now}).Error; err != nil {
  130. logger.Warning("disableInvalidClients update clients.enable:", err)
  131. }
  132. }
  133. disabledNodeIDs := make(map[int]struct{})
  134. for inboundID, group := range remoteByInbound {
  135. emails := make(map[string]struct{}, len(group))
  136. for _, t := range group {
  137. emails[t.Email] = struct{}{}
  138. }
  139. if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
  140. logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
  141. needRestart = true
  142. } else {
  143. for _, t := range group {
  144. if t.NodeID != nil {
  145. disabledNodeIDs[*t.NodeID] = struct{}{}
  146. }
  147. }
  148. }
  149. }
  150. nodeIDs := make([]int, 0, len(disabledNodeIDs))
  151. for nodeID := range disabledNodeIDs {
  152. nodeIDs = append(nodeIDs, nodeID)
  153. }
  154. return needRestart, count, nodeIDs, nil
  155. }
  156. // markClientsDisabledInSettings flips client.enable=false in the inbound's
  157. // stored settings JSON for the given emails and returns both the pre and
  158. // post snapshots so a caller pushing to a remote node has the diff to hand.
  159. func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) {
  160. var ib model.Inbound
  161. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
  162. return nil, nil, err
  163. }
  164. snapshot := ib
  165. settings := map[string]any{}
  166. if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
  167. return nil, nil, err
  168. }
  169. clients, _ := settings["clients"].([]any)
  170. now := time.Now().Unix() * 1000
  171. mutated := false
  172. for i := range clients {
  173. entry, ok := clients[i].(map[string]any)
  174. if !ok {
  175. continue
  176. }
  177. email, _ := entry["email"].(string)
  178. if _, hit := emails[email]; !hit {
  179. continue
  180. }
  181. if cur, _ := entry["enable"].(bool); cur == false {
  182. continue
  183. }
  184. entry["enable"] = false
  185. entry["updated_at"] = now
  186. clients[i] = entry
  187. mutated = true
  188. }
  189. if !mutated {
  190. return &snapshot, &ib, nil
  191. }
  192. settings["clients"] = clients
  193. bs, marshalErr := json.MarshalIndent(settings, "", " ")
  194. if marshalErr != nil {
  195. return nil, nil, marshalErr
  196. }
  197. ib.Settings = string(bs)
  198. if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
  199. Update("settings", ib.Settings).Error; err != nil {
  200. return nil, nil, err
  201. }
  202. return &snapshot, &ib, nil
  203. }
  204. func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
  205. oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
  206. if err != nil {
  207. return err
  208. }
  209. rt, err := s.runtimeFor(ib)
  210. if err != nil {
  211. return err
  212. }
  213. if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil {
  214. return err
  215. }
  216. return nil
  217. }