1
0

client_bulk.go 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
  17. type BulkAttachResult struct {
  18. Attached []string `json:"attached"`
  19. Skipped []string `json:"skipped"`
  20. Errors []string `json:"errors"`
  21. }
  22. // BulkAttach attaches the given existing clients (by email) to each target inbound,
  23. // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
  24. // all clients to a target in a single AddInboundClient call, and reports clients already
  25. // present on a target as skipped.
  26. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
  27. result := &BulkAttachResult{}
  28. if len(emails) == 0 || len(inboundIds) == 0 {
  29. return result, false, nil
  30. }
  31. recordErr := func(format string, args ...any) {
  32. msg := fmt.Sprintf(format, args...)
  33. result.Errors = append(result.Errors, msg)
  34. logger.Warningf("[BulkAttach] %s", msg)
  35. }
  36. records := make([]*model.ClientRecord, 0, len(emails))
  37. seenEmail := make(map[string]struct{}, len(emails))
  38. for _, email := range emails {
  39. if email == "" {
  40. continue
  41. }
  42. key := strings.ToLower(email)
  43. if _, ok := seenEmail[key]; ok {
  44. continue
  45. }
  46. seenEmail[key] = struct{}{}
  47. rec, err := s.GetRecordByEmail(nil, email)
  48. if err != nil {
  49. recordErr("%s: %v", email, err)
  50. continue
  51. }
  52. records = append(records, rec)
  53. }
  54. emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
  55. if sidErr != nil {
  56. emailSubIDs = nil
  57. logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
  58. }
  59. needRestart := false
  60. for _, ibId := range inboundIds {
  61. inbound, err := inboundSvc.GetInbound(ibId)
  62. if err != nil {
  63. recordErr("inbound %d: %v", ibId, err)
  64. continue
  65. }
  66. existingClients, err := inboundSvc.GetClients(inbound)
  67. if err != nil {
  68. recordErr("inbound %d: %v", ibId, err)
  69. continue
  70. }
  71. have := make(map[string]struct{}, len(existingClients))
  72. for _, c := range existingClients {
  73. have[strings.ToLower(c.Email)] = struct{}{}
  74. }
  75. clientsToAdd := make([]model.Client, 0, len(records))
  76. for _, rec := range records {
  77. if _, attached := have[strings.ToLower(rec.Email)]; attached {
  78. result.Skipped = append(result.Skipped, rec.Email)
  79. continue
  80. }
  81. client := *rec.ToClient()
  82. client.UpdatedAt = time.Now().UnixMilli()
  83. if err := s.fillProtocolDefaults(&client, inbound); err != nil {
  84. recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
  85. continue
  86. }
  87. clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
  88. }
  89. if len(clientsToAdd) == 0 {
  90. continue
  91. }
  92. payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
  93. if err != nil {
  94. recordErr("inbound %d: %v", ibId, err)
  95. continue
  96. }
  97. nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  98. if err != nil {
  99. recordErr("inbound %d: %v", ibId, err)
  100. continue
  101. }
  102. if nr {
  103. needRestart = true
  104. }
  105. for _, c := range clientsToAdd {
  106. result.Attached = append(result.Attached, c.Email)
  107. }
  108. }
  109. return result, needRestart, nil
  110. }
  111. // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
  112. type BulkDetachResult struct {
  113. Detached []string `json:"detached"`
  114. Skipped []string `json:"skipped"`
  115. Errors []string `json:"errors"`
  116. }
  117. // BulkDetach detaches the given existing clients (by email) from each target inbound.
  118. // (email, inbound) pairs where the client is not currently attached are silently skipped
  119. // at the inbound level; emails that aren't attached to any of the requested inbounds
  120. // are reported under skipped. ClientRecord rows are kept even when they become orphaned
  121. // (matches single-client detach semantics); callers should use bulkDelete for full removal.
  122. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
  123. result := &BulkDetachResult{}
  124. if len(emails) == 0 || len(inboundIds) == 0 {
  125. return result, false, nil
  126. }
  127. recordErr := func(format string, args ...any) {
  128. msg := fmt.Sprintf(format, args...)
  129. result.Errors = append(result.Errors, msg)
  130. logger.Warningf("[BulkDetach] %s", msg)
  131. }
  132. requested := make(map[int]struct{}, len(inboundIds))
  133. for _, id := range inboundIds {
  134. requested[id] = struct{}{}
  135. }
  136. recsByInbound := make(map[int][]*model.ClientRecord)
  137. emailOrder := make([]string, 0, len(emails))
  138. emailRepr := make(map[string]string, len(emails))
  139. emailFailed := make(map[string]bool, len(emails))
  140. seenEmail := make(map[string]struct{}, len(emails))
  141. for _, email := range emails {
  142. if email == "" {
  143. continue
  144. }
  145. key := strings.ToLower(email)
  146. if _, ok := seenEmail[key]; ok {
  147. continue
  148. }
  149. seenEmail[key] = struct{}{}
  150. rec, err := s.GetRecordByEmail(nil, email)
  151. if err != nil {
  152. recordErr("%s: %v", email, err)
  153. continue
  154. }
  155. currentIds, err := s.GetInboundIdsForRecord(rec.Id)
  156. if err != nil {
  157. recordErr("%s: %v", email, err)
  158. continue
  159. }
  160. matched := false
  161. for _, id := range currentIds {
  162. if _, ok := requested[id]; ok {
  163. recsByInbound[id] = append(recsByInbound[id], rec)
  164. matched = true
  165. }
  166. }
  167. if !matched {
  168. result.Skipped = append(result.Skipped, rec.Email)
  169. continue
  170. }
  171. emailOrder = append(emailOrder, key)
  172. emailRepr[key] = rec.Email
  173. }
  174. needRestart := false
  175. for _, ibId := range inboundIds {
  176. recs, ok := recsByInbound[ibId]
  177. if !ok {
  178. continue
  179. }
  180. delete(recsByInbound, ibId)
  181. nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
  182. if err != nil {
  183. recordErr("inbound %d: %v", ibId, err)
  184. for _, rec := range recs {
  185. emailFailed[strings.ToLower(rec.Email)] = true
  186. }
  187. continue
  188. }
  189. if nr {
  190. needRestart = true
  191. }
  192. }
  193. for _, key := range emailOrder {
  194. if emailFailed[key] {
  195. continue
  196. }
  197. result.Detached = append(result.Detached, emailRepr[key])
  198. }
  199. return result, needRestart, nil
  200. }
  201. // BulkAdjustResult is returned by BulkAdjust to report how many clients were
  202. // successfully updated and which were skipped (typically because the field
  203. // being adjusted was unlimited for that client) or failed.
  204. type BulkAdjustResult struct {
  205. Adjusted int `json:"adjusted"`
  206. Skipped []BulkAdjustReport `json:"skipped,omitempty"`
  207. }
  208. type BulkAdjustReport struct {
  209. Email string `json:"email"`
  210. Reason string `json:"reason"`
  211. }
  212. type bulkAdjustEntry struct {
  213. record *model.ClientRecord
  214. applyExpiry bool
  215. newExpiry int64
  216. applyTotal bool
  217. newTotal int64
  218. }
  219. // bulkFlowClear is the directive that strips the XTLS flow from every selected
  220. // client. The vision values are the only positive flows xray accepts.
  221. const bulkFlowClear = "none"
  222. // bulkFlowAllowed whitelists the flow directives BulkAdjust accepts. Anything
  223. // outside this set is treated as "" (leave flow untouched) so a malformed or
  224. // hostile value can never be injected into a client's settings. The dropdown in
  225. // ClientBulkAdjustModal.tsx offers the same set ("" / "none" / TLS_FLOW_CONTROL);
  226. // keep the two in sync.
  227. var bulkFlowAllowed = map[string]struct{}{
  228. "": {},
  229. bulkFlowClear: {},
  230. "xtls-rprx-vision": {},
  231. "xtls-rprx-vision-udp443": {},
  232. }
  233. // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
  234. // for every email in the list. Clients whose corresponding field is
  235. // unlimited (0) are skipped — bulk extend should not accidentally
  236. // limit an unlimited client. addDays and addBytes may be negative.
  237. //
  238. // Like BulkDelete, the work is grouped by inbound so each inbound's
  239. // settings JSON is parsed and written exactly once regardless of how
  240. // many target emails it contains.
  241. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64, flow string) (BulkAdjustResult, bool, error) {
  242. result := BulkAdjustResult{}
  243. if len(emails) == 0 {
  244. return result, false, nil
  245. }
  246. flow = strings.TrimSpace(flow)
  247. if _, ok := bulkFlowAllowed[flow]; !ok {
  248. flow = "" // ignore unknown directives — "" means "leave flow untouched"
  249. }
  250. adjustFlow := flow != ""
  251. if addDays == 0 && addBytes == 0 && !adjustFlow {
  252. return result, false, common.NewError("no adjustment specified")
  253. }
  254. addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
  255. seen := map[string]struct{}{}
  256. cleanEmails := make([]string, 0, len(emails))
  257. for _, e := range emails {
  258. e = strings.TrimSpace(e)
  259. if e == "" {
  260. continue
  261. }
  262. if _, ok := seen[e]; ok {
  263. continue
  264. }
  265. seen[e] = struct{}{}
  266. cleanEmails = append(cleanEmails, e)
  267. }
  268. if len(cleanEmails) == 0 {
  269. return result, false, nil
  270. }
  271. db := database.GetDB()
  272. var records []model.ClientRecord
  273. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  274. var rows []model.ClientRecord
  275. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  276. return result, false, err
  277. }
  278. records = append(records, rows...)
  279. }
  280. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  281. for i := range records {
  282. recordsByEmail[records[i].Email] = &records[i]
  283. }
  284. skippedReasons := map[string]string{}
  285. for _, email := range cleanEmails {
  286. if _, ok := recordsByEmail[email]; !ok {
  287. skippedReasons[email] = "client not found"
  288. }
  289. }
  290. plan := map[string]*bulkAdjustEntry{}
  291. for email, rec := range recordsByEmail {
  292. entry := &bulkAdjustEntry{record: rec}
  293. if addDays != 0 {
  294. switch {
  295. case rec.ExpiryTime == 0:
  296. if _, exists := skippedReasons[email]; !exists {
  297. skippedReasons[email] = "unlimited expiry"
  298. }
  299. case rec.ExpiryTime > 0:
  300. next := rec.ExpiryTime + addExpiryMs
  301. if next <= 0 {
  302. if _, exists := skippedReasons[email]; !exists {
  303. skippedReasons[email] = "reduction exceeds remaining time"
  304. }
  305. } else {
  306. entry.applyExpiry = true
  307. entry.newExpiry = next
  308. }
  309. default:
  310. next := rec.ExpiryTime - addExpiryMs
  311. if next >= 0 {
  312. if _, exists := skippedReasons[email]; !exists {
  313. skippedReasons[email] = "reduction exceeds delay window"
  314. }
  315. } else {
  316. entry.applyExpiry = true
  317. entry.newExpiry = next
  318. }
  319. }
  320. }
  321. if addBytes != 0 {
  322. if rec.TotalGB == 0 {
  323. if _, exists := skippedReasons[email]; !exists {
  324. skippedReasons[email] = "unlimited traffic"
  325. }
  326. } else {
  327. next := max(rec.TotalGB+addBytes, 0)
  328. entry.applyTotal = true
  329. entry.newTotal = next
  330. }
  331. }
  332. if entry.applyExpiry || entry.applyTotal || adjustFlow {
  333. plan[email] = entry
  334. }
  335. }
  336. if len(plan) == 0 {
  337. for email, reason := range skippedReasons {
  338. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  339. }
  340. return result, false, nil
  341. }
  342. plannedIds := make([]int, 0, len(plan))
  343. recordIdToEmail := make(map[int]string, len(plan))
  344. for email, entry := range plan {
  345. plannedIds = append(plannedIds, entry.record.Id)
  346. recordIdToEmail[entry.record.Id] = email
  347. }
  348. var mappings []model.ClientInbound
  349. for _, batch := range chunkInts(plannedIds, sqlInChunk) {
  350. var rows []model.ClientInbound
  351. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  352. return result, false, err
  353. }
  354. mappings = append(mappings, rows...)
  355. }
  356. emailsByInbound := map[int][]string{}
  357. for _, m := range mappings {
  358. email, ok := recordIdToEmail[m.ClientId]
  359. if !ok {
  360. continue
  361. }
  362. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  363. }
  364. needRestart := false
  365. flowHonored := map[string]bool{}
  366. flowIneligible := map[string]bool{}
  367. for inboundId, ibEmails := range emailsByInbound {
  368. ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan, flow)
  369. if ibRes.needRestart {
  370. needRestart = true
  371. }
  372. for email := range ibRes.flowHonored {
  373. flowHonored[email] = true
  374. }
  375. for email := range ibRes.flowIneligible {
  376. flowIneligible[email] = true
  377. }
  378. for email, reason := range ibRes.perEmailSkipped {
  379. if _, already := skippedReasons[email]; !already {
  380. skippedReasons[email] = reason
  381. }
  382. }
  383. }
  384. now := time.Now().Unix() * 1000
  385. cond := depletedCond(db)
  386. candidateEmails := make([]string, 0, len(plan))
  387. for email, entry := range plan {
  388. if entry.applyExpiry || entry.applyTotal {
  389. candidateEmails = append(candidateEmails, email)
  390. }
  391. }
  392. wasDisabledDepleted := map[string]struct{}{}
  393. for _, batch := range chunkStrings(candidateEmails, sqlInChunk) {
  394. var rows []string
  395. if err := db.Model(xray.ClientTraffic{}).
  396. Where(cond+" AND enable = ? AND email IN ?", now, false, batch).
  397. Pluck("email", &rows).Error; err != nil {
  398. return result, needRestart, err
  399. }
  400. for _, e := range rows {
  401. wasDisabledDepleted[e] = struct{}{}
  402. }
  403. }
  404. adjusted := map[string]struct{}{}
  405. for email, entry := range plan {
  406. if _, skipped := skippedReasons[email]; skipped {
  407. continue
  408. }
  409. updates := map[string]any{}
  410. if entry.applyExpiry {
  411. updates["expiry_time"] = entry.newExpiry
  412. }
  413. if entry.applyTotal {
  414. updates["total"] = entry.newTotal
  415. }
  416. if len(updates) > 0 {
  417. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
  418. if _, already := skippedReasons[email]; !already {
  419. skippedReasons[email] = err.Error()
  420. }
  421. continue
  422. }
  423. }
  424. // Counted when expiry/total changed, or a flow directive was honored
  425. // for this client (flow lives in the inbound JSON, not ClientTraffic).
  426. if len(updates) > 0 || flowHonored[email] {
  427. adjusted[email] = struct{}{}
  428. }
  429. }
  430. result.Adjusted = len(adjusted)
  431. for email, reason := range skippedReasons {
  432. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  433. }
  434. // Report a flow directive that no inbound could carry — only when it was not
  435. // honored anywhere and the client has no other (expiry/total) skip reason.
  436. // The expiry/total part, if any, has already been applied and counted above.
  437. for email := range flowIneligible {
  438. if flowHonored[email] {
  439. continue
  440. }
  441. if _, already := skippedReasons[email]; already {
  442. continue
  443. }
  444. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "flow not supported on inbound"})
  445. }
  446. if len(wasDisabledDepleted) > 0 {
  447. stillDepleted := map[string]struct{}{}
  448. wasList := make([]string, 0, len(wasDisabledDepleted))
  449. for e := range wasDisabledDepleted {
  450. wasList = append(wasList, e)
  451. }
  452. for _, batch := range chunkStrings(wasList, sqlInChunk) {
  453. var rows []string
  454. if err := db.Model(xray.ClientTraffic{}).
  455. Where(cond+" AND email IN ?", now, batch).
  456. Pluck("email", &rows).Error; err != nil {
  457. return result, needRestart, err
  458. }
  459. for _, e := range rows {
  460. stillDepleted[e] = struct{}{}
  461. }
  462. }
  463. reEnable := make([]string, 0, len(wasDisabledDepleted))
  464. for e := range wasDisabledDepleted {
  465. if _, still := stillDepleted[e]; !still {
  466. reEnable = append(reEnable, e)
  467. }
  468. }
  469. if len(reEnable) > 0 {
  470. _, nr, err := s.BulkSetEnable(inboundSvc, reEnable, true)
  471. if err != nil {
  472. return result, needRestart, err
  473. }
  474. if nr {
  475. needRestart = true
  476. }
  477. }
  478. }
  479. return result, needRestart, nil
  480. }
  481. type bulkInboundAdjustResult struct {
  482. perEmailSkipped map[string]string
  483. flowHonored map[string]bool
  484. // flowIneligible is tracked apart from perEmailSkipped: a flow directive
  485. // that an inbound cannot carry must not suppress the expiry/total write for
  486. // the same client (which would diverge the inbound JSON / ClientRecord from
  487. // ClientTraffic). It only feeds the final Skipped report.
  488. flowIneligible map[string]bool
  489. needRestart bool
  490. }
  491. // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
  492. // inside a single inbound's settings JSON. The xray runtime is updated
  493. // only for remote-node inbounds; local nodes do not need a notification
  494. // because the AddUser payload does not include totalGB/expiryTime —
  495. // changing those fields is identity-preserving and the panel's traffic
  496. // enforcement loop picks up the new limits from ClientTraffic directly.
  497. func (s *ClientService) bulkAdjustInboundClients(
  498. inboundSvc *InboundService,
  499. inboundId int,
  500. emails []string,
  501. plan map[string]*bulkAdjustEntry,
  502. flow string,
  503. ) bulkInboundAdjustResult {
  504. res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}, flowHonored: map[string]bool{}, flowIneligible: map[string]bool{}}
  505. defer lockInbound(inboundId).Unlock()
  506. oldInbound, err := inboundSvc.GetInbound(inboundId)
  507. if err != nil {
  508. logger.Error("Load Old Data Error")
  509. for _, e := range emails {
  510. res.perEmailSkipped[e] = err.Error()
  511. }
  512. return res
  513. }
  514. var settings map[string]any
  515. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  516. for _, e := range emails {
  517. res.perEmailSkipped[e] = err.Error()
  518. }
  519. return res
  520. }
  521. // Match by email — the client's stable identity (see Delete). Credentials
  522. // can drift from the inbound JSON, so they are never used for matching.
  523. wantedEmails := make(map[string]struct{}, len(emails))
  524. for _, email := range emails {
  525. if plan[email] == nil {
  526. res.perEmailSkipped[email] = "client not found"
  527. continue
  528. }
  529. wantedEmails[email] = struct{}{}
  530. }
  531. // Flow eligibility is a property of the inbound (protocol + transport), so
  532. // resolve it once. Clearing flow is always allowed; setting a vision flow
  533. // is only honored on an inbound that can carry it.
  534. flowEligible := flow == bulkFlowClear ||
  535. inboundCanEnableTlsFlow(string(oldInbound.Protocol), oldInbound.StreamSettings, oldInbound.Settings)
  536. interfaceClients, _ := settings["clients"].([]any)
  537. foundEmails := map[string]bool{}
  538. flowChanged := false
  539. nowMs := time.Now().Unix() * 1000
  540. for i, client := range interfaceClients {
  541. c, ok := client.(map[string]any)
  542. if !ok {
  543. continue
  544. }
  545. targetEmail, _ := c["email"].(string)
  546. if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
  547. continue
  548. }
  549. entry := plan[targetEmail]
  550. if entry.applyExpiry {
  551. c["expiryTime"] = entry.newExpiry
  552. }
  553. if entry.applyTotal {
  554. c["totalGB"] = entry.newTotal
  555. }
  556. if flow != "" {
  557. if flowEligible {
  558. want := ""
  559. if flow != bulkFlowClear {
  560. want = flow
  561. }
  562. if cur, _ := c["flow"].(string); cur != want {
  563. c["flow"] = want
  564. flowChanged = true
  565. }
  566. res.flowHonored[targetEmail] = true
  567. } else {
  568. // Record separately so this never suppresses the expiry/total
  569. // write for the same client (see flowIneligible doc).
  570. res.flowIneligible[targetEmail] = true
  571. }
  572. }
  573. c["updated_at"] = nowMs
  574. interfaceClients[i] = c
  575. foundEmails[targetEmail] = true
  576. }
  577. for email := range wantedEmails {
  578. if !foundEmails[email] {
  579. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  580. }
  581. }
  582. if len(foundEmails) == 0 {
  583. return res
  584. }
  585. settings["clients"] = interfaceClients
  586. newSettings, err := json.MarshalIndent(settings, "", " ")
  587. if err != nil {
  588. for email := range foundEmails {
  589. res.perEmailSkipped[email] = err.Error()
  590. }
  591. return res
  592. }
  593. oldInbound.Settings = string(newSettings)
  594. // A flow change rewrites the user's xray config, which the lightweight
  595. // UpdateUser push below does not carry. Local nodes reload via restart;
  596. // remote nodes get a full reconcile (MarkNodeDirty) instead of a per-user push.
  597. if flowChanged && oldInbound.NodeID == nil {
  598. res.needRestart = true
  599. }
  600. if oldInbound.NodeID != nil {
  601. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  602. if perr != nil {
  603. for email := range foundEmails {
  604. res.perEmailSkipped[email] = perr.Error()
  605. delete(foundEmails, email)
  606. }
  607. } else {
  608. if flowChanged {
  609. push = false
  610. }
  611. // Large batches collapse into one reconcile push rather than M updates.
  612. if push && len(foundEmails) > nodeBulkPushThreshold {
  613. push = false
  614. }
  615. if push {
  616. for email := range foundEmails {
  617. entry := plan[email]
  618. updated := *entry.record.ToClient()
  619. if entry.applyExpiry {
  620. updated.ExpiryTime = entry.newExpiry
  621. }
  622. if entry.applyTotal {
  623. updated.TotalGB = entry.newTotal
  624. }
  625. updated.UpdatedAt = nowMs
  626. if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
  627. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  628. }
  629. }
  630. }
  631. }
  632. }
  633. // Serialize against the traffic poll to avoid the cross-transaction
  634. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  635. txErr := runSerializedTx(func(tx *gorm.DB) error {
  636. if err := tx.Save(oldInbound).Error; err != nil {
  637. return err
  638. }
  639. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  640. if gcErr != nil {
  641. return gcErr
  642. }
  643. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  644. return err
  645. }
  646. if oldInbound.NodeID != nil {
  647. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  648. }
  649. return nil
  650. })
  651. if txErr != nil {
  652. for email := range foundEmails {
  653. if _, skip := res.perEmailSkipped[email]; !skip {
  654. res.perEmailSkipped[email] = txErr.Error()
  655. }
  656. }
  657. }
  658. return res
  659. }
  660. // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
  661. // skip reasons when an email could not be processed.
  662. type BulkDeleteResult struct {
  663. Deleted int `json:"deleted"`
  664. Skipped []BulkDeleteReport `json:"skipped,omitempty"`
  665. }
  666. type BulkDeleteReport struct {
  667. Email string `json:"email"`
  668. Reason string `json:"reason"`
  669. }
  670. // BulkDelete removes every client in the list in one optimized pass.
  671. // Instead of running the full single-delete pipeline N times (which would
  672. // re-read, re-parse, and re-write each inbound's settings JSON for every
  673. // email), it groups emails by inbound and performs a single
  674. // read-modify-write per inbound. Per-row DB cleanups are also batched with
  675. // IN-clause queries at the end. Errors on a particular email are recorded
  676. // in the Skipped list and processing continues for the rest.
  677. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
  678. result := BulkDeleteResult{}
  679. seen := map[string]struct{}{}
  680. cleanEmails := make([]string, 0, len(emails))
  681. for _, e := range emails {
  682. e = strings.TrimSpace(e)
  683. if e == "" {
  684. continue
  685. }
  686. if _, ok := seen[e]; ok {
  687. continue
  688. }
  689. seen[e] = struct{}{}
  690. cleanEmails = append(cleanEmails, e)
  691. }
  692. if len(cleanEmails) == 0 {
  693. return result, false, nil
  694. }
  695. db := database.GetDB()
  696. var records []model.ClientRecord
  697. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  698. var rows []model.ClientRecord
  699. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  700. return result, false, err
  701. }
  702. records = append(records, rows...)
  703. }
  704. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  705. tombstoneEmails := make([]string, 0, len(records))
  706. for i := range records {
  707. recordsByEmail[records[i].Email] = &records[i]
  708. tombstoneEmails = append(tombstoneEmails, records[i].Email)
  709. }
  710. tombstoneClientEmails(tombstoneEmails)
  711. skippedReasons := map[string]string{}
  712. for _, email := range cleanEmails {
  713. if _, ok := recordsByEmail[email]; !ok {
  714. skippedReasons[email] = "client not found"
  715. }
  716. }
  717. clientIds := make([]int, 0, len(recordsByEmail))
  718. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  719. for _, r := range recordsByEmail {
  720. clientIds = append(clientIds, r.Id)
  721. recordIdToEmail[r.Id] = r.Email
  722. }
  723. emailsByInbound := map[int][]string{}
  724. if len(clientIds) > 0 {
  725. var mappings []model.ClientInbound
  726. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  727. var rows []model.ClientInbound
  728. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  729. return result, false, err
  730. }
  731. mappings = append(mappings, rows...)
  732. }
  733. for _, m := range mappings {
  734. email, ok := recordIdToEmail[m.ClientId]
  735. if !ok {
  736. continue
  737. }
  738. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  739. }
  740. }
  741. needRestart := false
  742. for inboundId, ibEmails := range emailsByInbound {
  743. ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
  744. if ibResult.needRestart {
  745. needRestart = true
  746. }
  747. for email, reason := range ibResult.perEmailSkipped {
  748. if _, already := skippedReasons[email]; !already {
  749. skippedReasons[email] = reason
  750. }
  751. }
  752. }
  753. successEmails := make([]string, 0, len(recordsByEmail))
  754. successIds := make([]int, 0, len(recordsByEmail))
  755. for email, rec := range recordsByEmail {
  756. if _, skipped := skippedReasons[email]; skipped {
  757. continue
  758. }
  759. successEmails = append(successEmails, email)
  760. successIds = append(successIds, rec.Id)
  761. }
  762. if len(successIds) > 0 {
  763. // Serialize the row cleanup against the traffic poll to avoid the
  764. // cross-transaction lock-order deadlock on client_traffics/inbounds.
  765. if err := runSerializedTx(func(tx *gorm.DB) error {
  766. for _, batch := range chunkInts(successIds, sqlInChunk) {
  767. if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
  768. return e
  769. }
  770. }
  771. if !keepTraffic && len(successEmails) > 0 {
  772. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  773. if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
  774. return e
  775. }
  776. if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
  777. return e
  778. }
  779. }
  780. }
  781. for _, batch := range chunkInts(successIds, sqlInChunk) {
  782. if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
  783. return e
  784. }
  785. }
  786. return nil
  787. }); err != nil {
  788. return result, needRestart, err
  789. }
  790. }
  791. result.Deleted = len(successEmails)
  792. for email, reason := range skippedReasons {
  793. result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
  794. }
  795. return result, needRestart, nil
  796. }
  797. type bulkInboundDeleteResult struct {
  798. perEmailSkipped map[string]string
  799. needRestart bool
  800. }
  801. // bulkDelInboundClients removes multiple clients from a single inbound's
  802. // settings JSON in one read-modify-write cycle, runs the xray runtime
  803. // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
  804. // holds per-email failure reasons; emails not present in the map are
  805. // considered successful for this inbound.
  806. func (s *ClientService) bulkDelInboundClients(
  807. inboundSvc *InboundService,
  808. inboundId int,
  809. emails []string,
  810. records map[string]*model.ClientRecord,
  811. keepTraffic bool,
  812. ) bulkInboundDeleteResult {
  813. res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
  814. defer lockInbound(inboundId).Unlock()
  815. oldInbound, err := inboundSvc.GetInbound(inboundId)
  816. if err != nil {
  817. logger.Error("Load Old Data Error")
  818. for _, e := range emails {
  819. res.perEmailSkipped[e] = err.Error()
  820. }
  821. return res
  822. }
  823. var settings map[string]any
  824. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  825. for _, e := range emails {
  826. res.perEmailSkipped[e] = err.Error()
  827. }
  828. return res
  829. }
  830. // Match by email — the client's stable identity (see Delete). Removes every
  831. // entry carrying a wanted email, independent of credential drift.
  832. wantedEmails := make(map[string]struct{}, len(emails))
  833. for _, email := range emails {
  834. if records[email] == nil {
  835. res.perEmailSkipped[email] = "client not found"
  836. continue
  837. }
  838. wantedEmails[email] = struct{}{}
  839. }
  840. interfaceClients, _ := settings["clients"].([]any)
  841. newClients := make([]any, 0, len(interfaceClients))
  842. foundEmails := map[string]bool{}
  843. enableByEmail := map[string]bool{}
  844. for _, client := range interfaceClients {
  845. c, ok := client.(map[string]any)
  846. if !ok {
  847. newClients = append(newClients, client)
  848. continue
  849. }
  850. em, _ := c["email"].(string)
  851. if _, found := wantedEmails[em]; found && em != "" {
  852. foundEmails[em] = true
  853. en, _ := c["enable"].(bool)
  854. enableByEmail[em] = en
  855. continue
  856. }
  857. newClients = append(newClients, client)
  858. }
  859. for email := range wantedEmails {
  860. if !foundEmails[email] {
  861. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  862. }
  863. }
  864. db := database.GetDB()
  865. newClients = compactOrphans(db, newClients)
  866. if newClients == nil {
  867. newClients = []any{}
  868. }
  869. settings["clients"] = newClients
  870. newSettings, err := json.MarshalIndent(settings, "", " ")
  871. if err != nil {
  872. for email := range foundEmails {
  873. if _, skip := res.perEmailSkipped[email]; !skip {
  874. res.perEmailSkipped[email] = err.Error()
  875. }
  876. }
  877. return res
  878. }
  879. oldInbound.Settings = string(newSettings)
  880. foundList := make([]string, 0, len(foundEmails))
  881. for email := range foundEmails {
  882. foundList = append(foundList, email)
  883. }
  884. notDepletedByEmail := map[string]bool{}
  885. if len(foundList) > 0 {
  886. type trafficRow struct {
  887. Email string
  888. Enable bool
  889. }
  890. for _, batch := range chunkStrings(foundList, sqlInChunk) {
  891. var rows []trafficRow
  892. if err := db.Model(xray.ClientTraffic{}).
  893. Where("email IN ?", batch).
  894. Select("email, enable").
  895. Scan(&rows).Error; err == nil {
  896. for _, r := range rows {
  897. notDepletedByEmail[r.Email] = r.Enable
  898. }
  899. }
  900. }
  901. }
  902. var sharedSet map[string]bool
  903. if !keepTraffic {
  904. var sharedErr error
  905. sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
  906. if sharedErr != nil {
  907. for email := range foundEmails {
  908. res.perEmailSkipped[email] = sharedErr.Error()
  909. delete(foundEmails, email)
  910. }
  911. return res
  912. }
  913. }
  914. if !keepTraffic {
  915. purge := make([]string, 0, len(foundEmails))
  916. for email := range foundEmails {
  917. if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
  918. purge = append(purge, email)
  919. }
  920. }
  921. if len(purge) > 0 {
  922. // Serialize the IP/stat purge against the traffic poll to avoid the
  923. // cross-transaction lock-order deadlock on client_traffics.
  924. if delErr := runSerializedTx(func(tx *gorm.DB) error {
  925. if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
  926. logger.Error("Error in delete client IPs")
  927. return e
  928. }
  929. if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
  930. logger.Error("Delete stats Data Error")
  931. return e
  932. }
  933. return nil
  934. }); delErr != nil {
  935. for _, email := range purge {
  936. res.perEmailSkipped[email] = delErr.Error()
  937. delete(foundEmails, email)
  938. }
  939. }
  940. }
  941. }
  942. if oldInbound.NodeID == nil {
  943. rt, rterr := inboundSvc.runtimeFor(oldInbound)
  944. if rterr != nil {
  945. res.needRestart = true
  946. } else {
  947. for email := range foundEmails {
  948. if !enableByEmail[email] || !notDepletedByEmail[email] {
  949. continue
  950. }
  951. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  952. if err1 == nil {
  953. logger.Debug("Client deleted on", rt.Name(), ":", email)
  954. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  955. logger.Debug("User is already deleted. Nothing to do more...")
  956. } else {
  957. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  958. res.needRestart = true
  959. }
  960. }
  961. }
  962. } else {
  963. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  964. if perr != nil {
  965. for email := range foundEmails {
  966. res.perEmailSkipped[email] = perr.Error()
  967. delete(foundEmails, email)
  968. }
  969. } else {
  970. // Large batches collapse into one reconcile push rather than M deletes.
  971. if push && len(foundEmails) > nodeBulkPushThreshold {
  972. push = false
  973. }
  974. if push {
  975. for email := range foundEmails {
  976. if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
  977. logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
  978. }
  979. }
  980. }
  981. }
  982. }
  983. // Serialize against the traffic poll to avoid the cross-transaction
  984. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  985. txErr := runSerializedTx(func(tx *gorm.DB) error {
  986. if err := tx.Save(oldInbound).Error; err != nil {
  987. return err
  988. }
  989. finalClients, err := inboundSvc.GetClients(oldInbound)
  990. if err != nil {
  991. return err
  992. }
  993. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  994. return err
  995. }
  996. if oldInbound.NodeID != nil {
  997. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  998. }
  999. return nil
  1000. })
  1001. if txErr != nil {
  1002. for email := range foundEmails {
  1003. if _, skip := res.perEmailSkipped[email]; !skip {
  1004. res.perEmailSkipped[email] = txErr.Error()
  1005. }
  1006. }
  1007. }
  1008. return res
  1009. }
  1010. // BulkCreateResult mirrors BulkAdjustResult for the create flow.
  1011. type BulkCreateResult struct {
  1012. Created int `json:"created"`
  1013. Skipped []BulkCreateReport `json:"skipped,omitempty"`
  1014. }
  1015. type BulkCreateReport struct {
  1016. Email string `json:"email"`
  1017. Reason string `json:"reason"`
  1018. }
  1019. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
  1020. result := BulkCreateResult{}
  1021. if len(payloads) == 0 {
  1022. return result, false, nil
  1023. }
  1024. skip := func(email, reason string) {
  1025. if strings.TrimSpace(email) == "" {
  1026. email = "(missing email)"
  1027. }
  1028. result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
  1029. }
  1030. emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
  1031. if err != nil {
  1032. emailSubIDs = nil
  1033. }
  1034. type prepared struct {
  1035. client model.Client
  1036. inboundIds []int
  1037. }
  1038. prep := make([]prepared, 0, len(payloads))
  1039. emails := make([]string, 0, len(payloads))
  1040. subIDs := make([]string, 0, len(payloads))
  1041. seenEmail := make(map[string]struct{}, len(payloads))
  1042. seenSubID := make(map[string]string, len(payloads))
  1043. for i := range payloads {
  1044. client := payloads[i].Client
  1045. email := strings.TrimSpace(client.Email)
  1046. if email == "" {
  1047. skip("", "client email is required")
  1048. continue
  1049. }
  1050. if verr := validateClientEmail(email); verr != nil {
  1051. skip(email, verr.Error())
  1052. continue
  1053. }
  1054. if verr := validateClientSubID(client.SubID); verr != nil {
  1055. skip(email, verr.Error())
  1056. continue
  1057. }
  1058. if len(payloads[i].InboundIds) == 0 {
  1059. skip(email, "at least one inbound is required")
  1060. continue
  1061. }
  1062. client.Email = email
  1063. if client.SubID == "" {
  1064. client.SubID = uuid.NewString()
  1065. }
  1066. if !client.Enable {
  1067. client.Enable = true
  1068. }
  1069. now := time.Now().UnixMilli()
  1070. if client.CreatedAt == 0 {
  1071. client.CreatedAt = now
  1072. }
  1073. client.UpdatedAt = now
  1074. le := strings.ToLower(email)
  1075. if _, dup := seenEmail[le]; dup {
  1076. skip(email, "email already in use: "+email)
  1077. continue
  1078. }
  1079. if owner, ok := seenSubID[client.SubID]; ok && owner != le {
  1080. skip(email, "subId already in use: "+client.SubID)
  1081. continue
  1082. }
  1083. seenEmail[le] = struct{}{}
  1084. seenSubID[client.SubID] = le
  1085. prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
  1086. emails = append(emails, email)
  1087. subIDs = append(subIDs, client.SubID)
  1088. }
  1089. if len(prep) == 0 {
  1090. return result, false, nil
  1091. }
  1092. db := database.GetDB()
  1093. const lookupChunk = 400
  1094. existingEmailSub := make(map[string]string, len(emails))
  1095. for start := 0; start < len(emails); start += lookupChunk {
  1096. end := min(start+lookupChunk, len(emails))
  1097. var rows []model.ClientRecord
  1098. if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
  1099. return result, false, e
  1100. }
  1101. for i := range rows {
  1102. existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
  1103. }
  1104. }
  1105. existingSubOwner := make(map[string]string, len(subIDs))
  1106. for start := 0; start < len(subIDs); start += lookupChunk {
  1107. end := min(start+lookupChunk, len(subIDs))
  1108. var rows []model.ClientRecord
  1109. if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
  1110. return result, false, e
  1111. }
  1112. for i := range rows {
  1113. existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
  1114. }
  1115. }
  1116. inboundCache := make(map[int]*model.Inbound)
  1117. getIb := func(id int) (*model.Inbound, error) {
  1118. if ib, ok := inboundCache[id]; ok {
  1119. return ib, nil
  1120. }
  1121. ib, e := inboundSvc.GetInbound(id)
  1122. if e != nil {
  1123. return nil, e
  1124. }
  1125. inboundCache[id] = ib
  1126. return ib, nil
  1127. }
  1128. byInbound := make(map[int][]model.Client)
  1129. idxByInbound := make(map[int][]int)
  1130. inboundOrder := make([]int, 0)
  1131. failed := make([]bool, len(prep))
  1132. reason := make([]string, len(prep))
  1133. for idx := range prep {
  1134. le := strings.ToLower(prep[idx].client.Email)
  1135. if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
  1136. failed[idx] = true
  1137. reason[idx] = "email already in use: " + prep[idx].client.Email
  1138. continue
  1139. }
  1140. if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
  1141. failed[idx] = true
  1142. reason[idx] = "subId already in use: " + prep[idx].client.SubID
  1143. continue
  1144. }
  1145. ok := true
  1146. for _, ibId := range prep[idx].inboundIds {
  1147. ib, e := getIb(ibId)
  1148. if e != nil {
  1149. failed[idx] = true
  1150. reason[idx] = e.Error()
  1151. ok = false
  1152. break
  1153. }
  1154. if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
  1155. failed[idx] = true
  1156. reason[idx] = e.Error()
  1157. ok = false
  1158. break
  1159. }
  1160. }
  1161. if !ok {
  1162. continue
  1163. }
  1164. for _, ibId := range prep[idx].inboundIds {
  1165. ib, _ := getIb(ibId)
  1166. if _, seen := byInbound[ibId]; !seen {
  1167. inboundOrder = append(inboundOrder, ibId)
  1168. }
  1169. byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
  1170. idxByInbound[ibId] = append(idxByInbound[ibId], idx)
  1171. }
  1172. }
  1173. needRestart := false
  1174. for _, ibId := range inboundOrder {
  1175. payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
  1176. if e == nil {
  1177. var nr bool
  1178. nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  1179. if e == nil && nr {
  1180. needRestart = true
  1181. }
  1182. }
  1183. if e != nil {
  1184. for _, idx := range idxByInbound[ibId] {
  1185. failed[idx] = true
  1186. if reason[idx] == "" {
  1187. reason[idx] = e.Error()
  1188. }
  1189. }
  1190. }
  1191. }
  1192. for idx := range prep {
  1193. if failed[idx] {
  1194. skip(prep[idx].client.Email, reason[idx])
  1195. } else {
  1196. result.Created++
  1197. }
  1198. }
  1199. return result, needRestart, nil
  1200. }
  1201. func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
  1202. db := database.GetDB()
  1203. now := time.Now().UnixMilli()
  1204. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  1205. var rows []xray.ClientTraffic
  1206. if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
  1207. return 0, false, err
  1208. }
  1209. if len(rows) == 0 {
  1210. return 0, false, nil
  1211. }
  1212. seen := make(map[string]struct{}, len(rows))
  1213. emails := make([]string, 0, len(rows))
  1214. for _, r := range rows {
  1215. if r.Email == "" {
  1216. continue
  1217. }
  1218. if _, ok := seen[r.Email]; ok {
  1219. continue
  1220. }
  1221. seen[r.Email] = struct{}{}
  1222. emails = append(emails, r.Email)
  1223. }
  1224. if len(emails) == 0 {
  1225. return 0, false, nil
  1226. }
  1227. res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
  1228. if err != nil {
  1229. return res.Deleted, needRestart, err
  1230. }
  1231. return res.Deleted, needRestart, nil
  1232. }
  1233. type BulkSetEnableResult struct {
  1234. Changed int `json:"changed"`
  1235. Skipped []BulkSetEnableReport `json:"skipped,omitempty"`
  1236. }
  1237. type BulkSetEnableReport struct {
  1238. Email string `json:"email"`
  1239. Reason string `json:"reason"`
  1240. }
  1241. func (s *ClientService) BulkSetEnable(inboundSvc *InboundService, emails []string, enable bool) (BulkSetEnableResult, bool, error) {
  1242. result := BulkSetEnableResult{}
  1243. seen := map[string]struct{}{}
  1244. cleanEmails := make([]string, 0, len(emails))
  1245. for _, e := range emails {
  1246. e = strings.TrimSpace(e)
  1247. if e == "" {
  1248. continue
  1249. }
  1250. if _, ok := seen[e]; ok {
  1251. continue
  1252. }
  1253. seen[e] = struct{}{}
  1254. cleanEmails = append(cleanEmails, e)
  1255. }
  1256. if len(cleanEmails) == 0 {
  1257. return result, false, nil
  1258. }
  1259. db := database.GetDB()
  1260. var records []model.ClientRecord
  1261. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  1262. var rows []model.ClientRecord
  1263. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  1264. return result, false, err
  1265. }
  1266. records = append(records, rows...)
  1267. }
  1268. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  1269. for i := range records {
  1270. recordsByEmail[records[i].Email] = &records[i]
  1271. }
  1272. skippedReasons := map[string]string{}
  1273. for _, email := range cleanEmails {
  1274. if _, ok := recordsByEmail[email]; !ok {
  1275. skippedReasons[email] = "client not found"
  1276. }
  1277. }
  1278. clientIds := make([]int, 0, len(recordsByEmail))
  1279. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  1280. for _, r := range recordsByEmail {
  1281. clientIds = append(clientIds, r.Id)
  1282. recordIdToEmail[r.Id] = r.Email
  1283. }
  1284. emailsByInbound := map[int][]string{}
  1285. if len(clientIds) > 0 {
  1286. var mappings []model.ClientInbound
  1287. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  1288. var rows []model.ClientInbound
  1289. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  1290. return result, false, err
  1291. }
  1292. mappings = append(mappings, rows...)
  1293. }
  1294. for _, m := range mappings {
  1295. email, ok := recordIdToEmail[m.ClientId]
  1296. if !ok {
  1297. continue
  1298. }
  1299. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  1300. }
  1301. }
  1302. needRestart := false
  1303. for inboundId, ibEmails := range emailsByInbound {
  1304. ibRes := s.bulkSetEnableInboundClients(inboundSvc, inboundId, ibEmails, enable)
  1305. if ibRes.needRestart {
  1306. needRestart = true
  1307. }
  1308. for email, reason := range ibRes.perEmailSkipped {
  1309. if _, already := skippedReasons[email]; !already {
  1310. skippedReasons[email] = reason
  1311. }
  1312. }
  1313. }
  1314. successEmails := make([]string, 0, len(recordsByEmail))
  1315. for email := range recordsByEmail {
  1316. if _, skipped := skippedReasons[email]; skipped {
  1317. continue
  1318. }
  1319. successEmails = append(successEmails, email)
  1320. }
  1321. if len(successEmails) > 0 {
  1322. now := time.Now().UnixMilli()
  1323. if err := runSerializedTx(func(tx *gorm.DB) error {
  1324. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  1325. if e := tx.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("enable", enable).Error; e != nil {
  1326. return e
  1327. }
  1328. if e := tx.Model(&model.ClientRecord{}).Where("email IN ?", batch).
  1329. Updates(map[string]any{"enable": enable, "updated_at": now}).Error; e != nil {
  1330. return e
  1331. }
  1332. }
  1333. return nil
  1334. }); err != nil {
  1335. return result, needRestart, err
  1336. }
  1337. }
  1338. result.Changed = len(successEmails)
  1339. for email, reason := range skippedReasons {
  1340. result.Skipped = append(result.Skipped, BulkSetEnableReport{Email: email, Reason: reason})
  1341. }
  1342. return result, needRestart, nil
  1343. }
  1344. type bulkSetEnableInboundResult struct {
  1345. perEmailSkipped map[string]string
  1346. needRestart bool
  1347. }
  1348. func (s *ClientService) bulkSetEnableInboundClients(inboundSvc *InboundService, inboundId int, emails []string, enable bool) bulkSetEnableInboundResult {
  1349. res := bulkSetEnableInboundResult{perEmailSkipped: map[string]string{}}
  1350. defer lockInbound(inboundId).Unlock()
  1351. oldInbound, err := inboundSvc.GetInbound(inboundId)
  1352. if err != nil {
  1353. for _, e := range emails {
  1354. res.perEmailSkipped[e] = err.Error()
  1355. }
  1356. return res
  1357. }
  1358. var settings map[string]any
  1359. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  1360. for _, e := range emails {
  1361. res.perEmailSkipped[e] = err.Error()
  1362. }
  1363. return res
  1364. }
  1365. wanted := make(map[string]struct{}, len(emails))
  1366. for _, email := range emails {
  1367. wanted[email] = struct{}{}
  1368. }
  1369. cipher := ""
  1370. if oldInbound.Protocol == model.Shadowsocks {
  1371. cipher, _ = settings["method"].(string)
  1372. }
  1373. type changedClient struct {
  1374. email string
  1375. wasEnable bool
  1376. client model.Client
  1377. }
  1378. var changed []changedClient
  1379. found := map[string]bool{}
  1380. nowMs := time.Now().UnixMilli()
  1381. interfaceClients, _ := settings["clients"].([]any)
  1382. for i, c := range interfaceClients {
  1383. entry, ok := c.(map[string]any)
  1384. if !ok {
  1385. continue
  1386. }
  1387. email, _ := entry["email"].(string)
  1388. if _, want := wanted[email]; !want || email == "" {
  1389. continue
  1390. }
  1391. found[email] = true
  1392. prev, _ := entry["enable"].(bool)
  1393. if prev == enable {
  1394. continue
  1395. }
  1396. entry["enable"] = enable
  1397. entry["updated_at"] = nowMs
  1398. interfaceClients[i] = entry
  1399. // Build the pushed client from the inbound JSON (the per-inbound source of
  1400. // truth), so a remote UpdateUser carries every field and never zeroes
  1401. // subId/totalGB/expiry from drifting ClientRecord columns (#4628/#4792).
  1402. var parsed model.Client
  1403. if b, mErr := json.Marshal(entry); mErr == nil {
  1404. _ = json.Unmarshal(b, &parsed)
  1405. }
  1406. parsed.Email = email
  1407. parsed.Enable = enable
  1408. changed = append(changed, changedClient{email: email, wasEnable: prev, client: parsed})
  1409. }
  1410. for email := range wanted {
  1411. if !found[email] {
  1412. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  1413. }
  1414. }
  1415. if len(changed) == 0 {
  1416. return res
  1417. }
  1418. settings["clients"] = interfaceClients
  1419. newSettings, err := json.MarshalIndent(settings, "", " ")
  1420. if err != nil {
  1421. for _, ch := range changed {
  1422. res.perEmailSkipped[ch.email] = err.Error()
  1423. }
  1424. return res
  1425. }
  1426. oldInbound.Settings = string(newSettings)
  1427. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  1428. if perr != nil {
  1429. for _, ch := range changed {
  1430. res.perEmailSkipped[ch.email] = perr.Error()
  1431. }
  1432. return res
  1433. }
  1434. if oldInbound.NodeID != nil && push && len(changed) > nodeBulkPushThreshold {
  1435. push = false
  1436. }
  1437. txErr := runSerializedTx(func(tx *gorm.DB) error {
  1438. if e := tx.Save(oldInbound).Error; e != nil {
  1439. return e
  1440. }
  1441. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  1442. if gcErr != nil {
  1443. return gcErr
  1444. }
  1445. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  1446. return err
  1447. }
  1448. if oldInbound.NodeID != nil {
  1449. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  1450. }
  1451. return nil
  1452. })
  1453. if txErr != nil {
  1454. for _, ch := range changed {
  1455. res.perEmailSkipped[ch.email] = txErr.Error()
  1456. }
  1457. return res
  1458. }
  1459. if oldInbound.NodeID == nil {
  1460. if !push {
  1461. res.needRestart = true
  1462. } else {
  1463. for _, ch := range changed {
  1464. if enable {
  1465. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  1466. "email": ch.client.Email,
  1467. "id": ch.client.ID,
  1468. "security": ch.client.Security,
  1469. "flow": ch.client.Flow,
  1470. "auth": ch.client.Auth,
  1471. "password": ch.client.Password,
  1472. "cipher": cipher,
  1473. })
  1474. if err1 != nil {
  1475. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  1476. res.needRestart = true
  1477. }
  1478. } else if ch.wasEnable {
  1479. err1 := rt.RemoveUser(context.Background(), oldInbound, ch.email)
  1480. if err1 != nil && !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", ch.email)) {
  1481. logger.Debug("Error in removing client on", rt.Name(), ":", err1)
  1482. res.needRestart = true
  1483. }
  1484. }
  1485. }
  1486. }
  1487. } else if push {
  1488. for _, ch := range changed {
  1489. updated := ch.client
  1490. updated.UpdatedAt = nowMs
  1491. if err1 := rt.UpdateUser(context.Background(), oldInbound, ch.email, updated); err1 != nil {
  1492. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  1493. }
  1494. }
  1495. }
  1496. return res
  1497. }