client_bulk.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
  17. type BulkAttachResult struct {
  18. Attached []string `json:"attached"`
  19. Skipped []string `json:"skipped"`
  20. Errors []string `json:"errors"`
  21. }
  22. // BulkAttach attaches the given existing clients (by email) to each target inbound,
  23. // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
  24. // all clients to a target in a single AddInboundClient call, and reports clients already
  25. // present on a target as skipped.
  26. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
  27. result := &BulkAttachResult{}
  28. if len(emails) == 0 || len(inboundIds) == 0 {
  29. return result, false, nil
  30. }
  31. recordErr := func(format string, args ...any) {
  32. msg := fmt.Sprintf(format, args...)
  33. result.Errors = append(result.Errors, msg)
  34. logger.Warningf("[BulkAttach] %s", msg)
  35. }
  36. records := make([]*model.ClientRecord, 0, len(emails))
  37. seenEmail := make(map[string]struct{}, len(emails))
  38. for _, email := range emails {
  39. if email == "" {
  40. continue
  41. }
  42. key := strings.ToLower(email)
  43. if _, ok := seenEmail[key]; ok {
  44. continue
  45. }
  46. seenEmail[key] = struct{}{}
  47. rec, err := s.GetRecordByEmail(nil, email)
  48. if err != nil {
  49. recordErr("%s: %v", email, err)
  50. continue
  51. }
  52. records = append(records, rec)
  53. }
  54. emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
  55. if sidErr != nil {
  56. emailSubIDs = nil
  57. logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
  58. }
  59. needRestart := false
  60. for _, ibId := range inboundIds {
  61. inbound, err := inboundSvc.GetInbound(ibId)
  62. if err != nil {
  63. recordErr("inbound %d: %v", ibId, err)
  64. continue
  65. }
  66. existingClients, err := inboundSvc.GetClients(inbound)
  67. if err != nil {
  68. recordErr("inbound %d: %v", ibId, err)
  69. continue
  70. }
  71. have := make(map[string]struct{}, len(existingClients))
  72. for _, c := range existingClients {
  73. have[strings.ToLower(c.Email)] = struct{}{}
  74. }
  75. clientsToAdd := make([]model.Client, 0, len(records))
  76. for _, rec := range records {
  77. if _, attached := have[strings.ToLower(rec.Email)]; attached {
  78. result.Skipped = append(result.Skipped, rec.Email)
  79. continue
  80. }
  81. client := *rec.ToClient()
  82. client.UpdatedAt = time.Now().UnixMilli()
  83. if err := s.fillProtocolDefaults(&client, inbound); err != nil {
  84. recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
  85. continue
  86. }
  87. clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
  88. }
  89. if len(clientsToAdd) == 0 {
  90. continue
  91. }
  92. payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
  93. if err != nil {
  94. recordErr("inbound %d: %v", ibId, err)
  95. continue
  96. }
  97. nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  98. if err != nil {
  99. recordErr("inbound %d: %v", ibId, err)
  100. continue
  101. }
  102. if nr {
  103. needRestart = true
  104. }
  105. for _, c := range clientsToAdd {
  106. result.Attached = append(result.Attached, c.Email)
  107. }
  108. }
  109. return result, needRestart, nil
  110. }
  111. // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
  112. type BulkDetachResult struct {
  113. Detached []string `json:"detached"`
  114. Skipped []string `json:"skipped"`
  115. Errors []string `json:"errors"`
  116. }
  117. // BulkDetach detaches the given existing clients (by email) from each target inbound.
  118. // (email, inbound) pairs where the client is not currently attached are silently skipped
  119. // at the inbound level; emails that aren't attached to any of the requested inbounds
  120. // are reported under skipped. ClientRecord rows are kept even when they become orphaned
  121. // (matches single-client detach semantics); callers should use bulkDelete for full removal.
  122. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
  123. result := &BulkDetachResult{}
  124. if len(emails) == 0 || len(inboundIds) == 0 {
  125. return result, false, nil
  126. }
  127. recordErr := func(format string, args ...any) {
  128. msg := fmt.Sprintf(format, args...)
  129. result.Errors = append(result.Errors, msg)
  130. logger.Warningf("[BulkDetach] %s", msg)
  131. }
  132. requested := make(map[int]struct{}, len(inboundIds))
  133. for _, id := range inboundIds {
  134. requested[id] = struct{}{}
  135. }
  136. recsByInbound := make(map[int][]*model.ClientRecord)
  137. emailOrder := make([]string, 0, len(emails))
  138. emailRepr := make(map[string]string, len(emails))
  139. emailFailed := make(map[string]bool, len(emails))
  140. seenEmail := make(map[string]struct{}, len(emails))
  141. for _, email := range emails {
  142. if email == "" {
  143. continue
  144. }
  145. key := strings.ToLower(email)
  146. if _, ok := seenEmail[key]; ok {
  147. continue
  148. }
  149. seenEmail[key] = struct{}{}
  150. rec, err := s.GetRecordByEmail(nil, email)
  151. if err != nil {
  152. recordErr("%s: %v", email, err)
  153. continue
  154. }
  155. currentIds, err := s.GetInboundIdsForRecord(rec.Id)
  156. if err != nil {
  157. recordErr("%s: %v", email, err)
  158. continue
  159. }
  160. matched := false
  161. for _, id := range currentIds {
  162. if _, ok := requested[id]; ok {
  163. recsByInbound[id] = append(recsByInbound[id], rec)
  164. matched = true
  165. }
  166. }
  167. if !matched {
  168. result.Skipped = append(result.Skipped, rec.Email)
  169. continue
  170. }
  171. emailOrder = append(emailOrder, key)
  172. emailRepr[key] = rec.Email
  173. }
  174. needRestart := false
  175. for _, ibId := range inboundIds {
  176. recs, ok := recsByInbound[ibId]
  177. if !ok {
  178. continue
  179. }
  180. delete(recsByInbound, ibId)
  181. nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
  182. if err != nil {
  183. recordErr("inbound %d: %v", ibId, err)
  184. for _, rec := range recs {
  185. emailFailed[strings.ToLower(rec.Email)] = true
  186. }
  187. continue
  188. }
  189. if nr {
  190. needRestart = true
  191. }
  192. }
  193. for _, key := range emailOrder {
  194. if emailFailed[key] {
  195. continue
  196. }
  197. result.Detached = append(result.Detached, emailRepr[key])
  198. }
  199. return result, needRestart, nil
  200. }
  201. // BulkAdjustResult is returned by BulkAdjust to report how many clients were
  202. // successfully updated and which were skipped (typically because the field
  203. // being adjusted was unlimited for that client) or failed.
  204. type BulkAdjustResult struct {
  205. Adjusted int `json:"adjusted"`
  206. Skipped []BulkAdjustReport `json:"skipped,omitempty"`
  207. }
  208. type BulkAdjustReport struct {
  209. Email string `json:"email"`
  210. Reason string `json:"reason"`
  211. }
  212. type bulkAdjustEntry struct {
  213. record *model.ClientRecord
  214. applyExpiry bool
  215. newExpiry int64
  216. applyTotal bool
  217. newTotal int64
  218. }
  219. // bulkFlowClear is the directive that strips the XTLS flow from every selected
  220. // client. The vision values are the only positive flows xray accepts.
  221. const bulkFlowClear = "none"
  222. // bulkFlowAllowed whitelists the flow directives BulkAdjust accepts. Anything
  223. // outside this set is treated as "" (leave flow untouched) so a malformed or
  224. // hostile value can never be injected into a client's settings. The dropdown in
  225. // ClientBulkAdjustModal.tsx offers the same set ("" / "none" / TLS_FLOW_CONTROL);
  226. // keep the two in sync.
  227. var bulkFlowAllowed = map[string]struct{}{
  228. "": {},
  229. bulkFlowClear: {},
  230. "xtls-rprx-vision": {},
  231. "xtls-rprx-vision-udp443": {},
  232. }
  233. // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
  234. // for every email in the list. Clients whose corresponding field is
  235. // unlimited (0) are skipped — bulk extend should not accidentally
  236. // limit an unlimited client. addDays and addBytes may be negative.
  237. //
  238. // Like BulkDelete, the work is grouped by inbound so each inbound's
  239. // settings JSON is parsed and written exactly once regardless of how
  240. // many target emails it contains.
  241. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64, flow string) (BulkAdjustResult, bool, error) {
  242. result := BulkAdjustResult{}
  243. if len(emails) == 0 {
  244. return result, false, nil
  245. }
  246. flow = strings.TrimSpace(flow)
  247. if _, ok := bulkFlowAllowed[flow]; !ok {
  248. flow = "" // ignore unknown directives — "" means "leave flow untouched"
  249. }
  250. adjustFlow := flow != ""
  251. if addDays == 0 && addBytes == 0 && !adjustFlow {
  252. return result, false, common.NewError("no adjustment specified")
  253. }
  254. addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
  255. seen := map[string]struct{}{}
  256. cleanEmails := make([]string, 0, len(emails))
  257. for _, e := range emails {
  258. e = strings.TrimSpace(e)
  259. if e == "" {
  260. continue
  261. }
  262. if _, ok := seen[e]; ok {
  263. continue
  264. }
  265. seen[e] = struct{}{}
  266. cleanEmails = append(cleanEmails, e)
  267. }
  268. if len(cleanEmails) == 0 {
  269. return result, false, nil
  270. }
  271. db := database.GetDB()
  272. var records []model.ClientRecord
  273. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  274. var rows []model.ClientRecord
  275. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  276. return result, false, err
  277. }
  278. records = append(records, rows...)
  279. }
  280. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  281. for i := range records {
  282. recordsByEmail[records[i].Email] = &records[i]
  283. }
  284. skippedReasons := map[string]string{}
  285. for _, email := range cleanEmails {
  286. if _, ok := recordsByEmail[email]; !ok {
  287. skippedReasons[email] = "client not found"
  288. }
  289. }
  290. plan := map[string]*bulkAdjustEntry{}
  291. for email, rec := range recordsByEmail {
  292. entry := &bulkAdjustEntry{record: rec}
  293. if addDays != 0 {
  294. switch {
  295. case rec.ExpiryTime == 0:
  296. if _, exists := skippedReasons[email]; !exists {
  297. skippedReasons[email] = "unlimited expiry"
  298. }
  299. case rec.ExpiryTime > 0:
  300. next := rec.ExpiryTime + addExpiryMs
  301. if next <= 0 {
  302. if _, exists := skippedReasons[email]; !exists {
  303. skippedReasons[email] = "reduction exceeds remaining time"
  304. }
  305. } else {
  306. entry.applyExpiry = true
  307. entry.newExpiry = next
  308. }
  309. default:
  310. next := rec.ExpiryTime - addExpiryMs
  311. if next >= 0 {
  312. if _, exists := skippedReasons[email]; !exists {
  313. skippedReasons[email] = "reduction exceeds delay window"
  314. }
  315. } else {
  316. entry.applyExpiry = true
  317. entry.newExpiry = next
  318. }
  319. }
  320. }
  321. if addBytes != 0 {
  322. if rec.TotalGB == 0 {
  323. if _, exists := skippedReasons[email]; !exists {
  324. skippedReasons[email] = "unlimited traffic"
  325. }
  326. } else {
  327. next := max(rec.TotalGB+addBytes, 0)
  328. entry.applyTotal = true
  329. entry.newTotal = next
  330. }
  331. }
  332. if entry.applyExpiry || entry.applyTotal || adjustFlow {
  333. plan[email] = entry
  334. }
  335. }
  336. if len(plan) == 0 {
  337. for email, reason := range skippedReasons {
  338. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  339. }
  340. return result, false, nil
  341. }
  342. plannedIds := make([]int, 0, len(plan))
  343. recordIdToEmail := make(map[int]string, len(plan))
  344. for email, entry := range plan {
  345. plannedIds = append(plannedIds, entry.record.Id)
  346. recordIdToEmail[entry.record.Id] = email
  347. }
  348. var mappings []model.ClientInbound
  349. for _, batch := range chunkInts(plannedIds, sqlInChunk) {
  350. var rows []model.ClientInbound
  351. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  352. return result, false, err
  353. }
  354. mappings = append(mappings, rows...)
  355. }
  356. emailsByInbound := map[int][]string{}
  357. for _, m := range mappings {
  358. email, ok := recordIdToEmail[m.ClientId]
  359. if !ok {
  360. continue
  361. }
  362. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  363. }
  364. needRestart := false
  365. flowHonored := map[string]bool{}
  366. flowIneligible := map[string]bool{}
  367. for inboundId, ibEmails := range emailsByInbound {
  368. ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan, flow)
  369. if ibRes.needRestart {
  370. needRestart = true
  371. }
  372. for email := range ibRes.flowHonored {
  373. flowHonored[email] = true
  374. }
  375. for email := range ibRes.flowIneligible {
  376. flowIneligible[email] = true
  377. }
  378. for email, reason := range ibRes.perEmailSkipped {
  379. if _, already := skippedReasons[email]; !already {
  380. skippedReasons[email] = reason
  381. }
  382. }
  383. }
  384. adjusted := map[string]struct{}{}
  385. for email, entry := range plan {
  386. if _, skipped := skippedReasons[email]; skipped {
  387. continue
  388. }
  389. updates := map[string]any{}
  390. if entry.applyExpiry {
  391. updates["expiry_time"] = entry.newExpiry
  392. }
  393. if entry.applyTotal {
  394. updates["total"] = entry.newTotal
  395. }
  396. if len(updates) > 0 {
  397. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
  398. if _, already := skippedReasons[email]; !already {
  399. skippedReasons[email] = err.Error()
  400. }
  401. continue
  402. }
  403. }
  404. // Counted when expiry/total changed, or a flow directive was honored
  405. // for this client (flow lives in the inbound JSON, not ClientTraffic).
  406. if len(updates) > 0 || flowHonored[email] {
  407. adjusted[email] = struct{}{}
  408. }
  409. }
  410. result.Adjusted = len(adjusted)
  411. for email, reason := range skippedReasons {
  412. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  413. }
  414. // Report a flow directive that no inbound could carry — only when it was not
  415. // honored anywhere and the client has no other (expiry/total) skip reason.
  416. // The expiry/total part, if any, has already been applied and counted above.
  417. for email := range flowIneligible {
  418. if flowHonored[email] {
  419. continue
  420. }
  421. if _, already := skippedReasons[email]; already {
  422. continue
  423. }
  424. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "flow not supported on inbound"})
  425. }
  426. return result, needRestart, nil
  427. }
  428. type bulkInboundAdjustResult struct {
  429. perEmailSkipped map[string]string
  430. flowHonored map[string]bool
  431. // flowIneligible is tracked apart from perEmailSkipped: a flow directive
  432. // that an inbound cannot carry must not suppress the expiry/total write for
  433. // the same client (which would diverge the inbound JSON / ClientRecord from
  434. // ClientTraffic). It only feeds the final Skipped report.
  435. flowIneligible map[string]bool
  436. needRestart bool
  437. }
  438. // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
  439. // inside a single inbound's settings JSON. The xray runtime is updated
  440. // only for remote-node inbounds; local nodes do not need a notification
  441. // because the AddUser payload does not include totalGB/expiryTime —
  442. // changing those fields is identity-preserving and the panel's traffic
  443. // enforcement loop picks up the new limits from ClientTraffic directly.
  444. func (s *ClientService) bulkAdjustInboundClients(
  445. inboundSvc *InboundService,
  446. inboundId int,
  447. emails []string,
  448. plan map[string]*bulkAdjustEntry,
  449. flow string,
  450. ) bulkInboundAdjustResult {
  451. res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}, flowHonored: map[string]bool{}, flowIneligible: map[string]bool{}}
  452. defer lockInbound(inboundId).Unlock()
  453. oldInbound, err := inboundSvc.GetInbound(inboundId)
  454. if err != nil {
  455. logger.Error("Load Old Data Error")
  456. for _, e := range emails {
  457. res.perEmailSkipped[e] = err.Error()
  458. }
  459. return res
  460. }
  461. var settings map[string]any
  462. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  463. for _, e := range emails {
  464. res.perEmailSkipped[e] = err.Error()
  465. }
  466. return res
  467. }
  468. // Match by email — the client's stable identity (see Delete). Credentials
  469. // can drift from the inbound JSON, so they are never used for matching.
  470. wantedEmails := make(map[string]struct{}, len(emails))
  471. for _, email := range emails {
  472. if plan[email] == nil {
  473. res.perEmailSkipped[email] = "client not found"
  474. continue
  475. }
  476. wantedEmails[email] = struct{}{}
  477. }
  478. // Flow eligibility is a property of the inbound (protocol + transport), so
  479. // resolve it once. Clearing flow is always allowed; setting a vision flow
  480. // is only honored on an inbound that can carry it.
  481. flowEligible := flow == bulkFlowClear ||
  482. inboundCanEnableTlsFlow(string(oldInbound.Protocol), oldInbound.StreamSettings, oldInbound.Settings)
  483. interfaceClients, _ := settings["clients"].([]any)
  484. foundEmails := map[string]bool{}
  485. flowChanged := false
  486. nowMs := time.Now().Unix() * 1000
  487. for i, client := range interfaceClients {
  488. c, ok := client.(map[string]any)
  489. if !ok {
  490. continue
  491. }
  492. targetEmail, _ := c["email"].(string)
  493. if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
  494. continue
  495. }
  496. entry := plan[targetEmail]
  497. if entry.applyExpiry {
  498. c["expiryTime"] = entry.newExpiry
  499. }
  500. if entry.applyTotal {
  501. c["totalGB"] = entry.newTotal
  502. }
  503. if flow != "" {
  504. if flowEligible {
  505. want := ""
  506. if flow != bulkFlowClear {
  507. want = flow
  508. }
  509. if cur, _ := c["flow"].(string); cur != want {
  510. c["flow"] = want
  511. flowChanged = true
  512. }
  513. res.flowHonored[targetEmail] = true
  514. } else {
  515. // Record separately so this never suppresses the expiry/total
  516. // write for the same client (see flowIneligible doc).
  517. res.flowIneligible[targetEmail] = true
  518. }
  519. }
  520. c["updated_at"] = nowMs
  521. interfaceClients[i] = c
  522. foundEmails[targetEmail] = true
  523. }
  524. for email := range wantedEmails {
  525. if !foundEmails[email] {
  526. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  527. }
  528. }
  529. if len(foundEmails) == 0 {
  530. return res
  531. }
  532. settings["clients"] = interfaceClients
  533. newSettings, err := json.MarshalIndent(settings, "", " ")
  534. if err != nil {
  535. for email := range foundEmails {
  536. res.perEmailSkipped[email] = err.Error()
  537. }
  538. return res
  539. }
  540. oldInbound.Settings = string(newSettings)
  541. // A flow change rewrites the user's xray config, which the lightweight
  542. // UpdateUser push below does not carry. Local nodes reload via restart;
  543. // remote nodes get a full reconcile (MarkNodeDirty) instead of a per-user push.
  544. if flowChanged && oldInbound.NodeID == nil {
  545. res.needRestart = true
  546. }
  547. if oldInbound.NodeID != nil {
  548. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  549. if perr != nil {
  550. for email := range foundEmails {
  551. res.perEmailSkipped[email] = perr.Error()
  552. delete(foundEmails, email)
  553. }
  554. } else {
  555. if flowChanged {
  556. push = false
  557. }
  558. // Large batches collapse into one reconcile push rather than M updates.
  559. if push && len(foundEmails) > nodeBulkPushThreshold {
  560. push = false
  561. }
  562. if push {
  563. for email := range foundEmails {
  564. entry := plan[email]
  565. updated := *entry.record.ToClient()
  566. if entry.applyExpiry {
  567. updated.ExpiryTime = entry.newExpiry
  568. }
  569. if entry.applyTotal {
  570. updated.TotalGB = entry.newTotal
  571. }
  572. updated.UpdatedAt = nowMs
  573. if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
  574. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  575. }
  576. }
  577. }
  578. }
  579. }
  580. // Serialize against the traffic poll to avoid the cross-transaction
  581. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  582. txErr := runSerializedTx(func(tx *gorm.DB) error {
  583. if err := tx.Save(oldInbound).Error; err != nil {
  584. return err
  585. }
  586. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  587. if gcErr != nil {
  588. return gcErr
  589. }
  590. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  591. return err
  592. }
  593. if oldInbound.NodeID != nil {
  594. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  595. }
  596. return nil
  597. })
  598. if txErr != nil {
  599. for email := range foundEmails {
  600. if _, skip := res.perEmailSkipped[email]; !skip {
  601. res.perEmailSkipped[email] = txErr.Error()
  602. }
  603. }
  604. }
  605. return res
  606. }
  607. // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
  608. // skip reasons when an email could not be processed.
  609. type BulkDeleteResult struct {
  610. Deleted int `json:"deleted"`
  611. Skipped []BulkDeleteReport `json:"skipped,omitempty"`
  612. }
  613. type BulkDeleteReport struct {
  614. Email string `json:"email"`
  615. Reason string `json:"reason"`
  616. }
  617. // BulkDelete removes every client in the list in one optimized pass.
  618. // Instead of running the full single-delete pipeline N times (which would
  619. // re-read, re-parse, and re-write each inbound's settings JSON for every
  620. // email), it groups emails by inbound and performs a single
  621. // read-modify-write per inbound. Per-row DB cleanups are also batched with
  622. // IN-clause queries at the end. Errors on a particular email are recorded
  623. // in the Skipped list and processing continues for the rest.
  624. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
  625. result := BulkDeleteResult{}
  626. seen := map[string]struct{}{}
  627. cleanEmails := make([]string, 0, len(emails))
  628. for _, e := range emails {
  629. e = strings.TrimSpace(e)
  630. if e == "" {
  631. continue
  632. }
  633. if _, ok := seen[e]; ok {
  634. continue
  635. }
  636. seen[e] = struct{}{}
  637. cleanEmails = append(cleanEmails, e)
  638. }
  639. if len(cleanEmails) == 0 {
  640. return result, false, nil
  641. }
  642. db := database.GetDB()
  643. var records []model.ClientRecord
  644. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  645. var rows []model.ClientRecord
  646. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  647. return result, false, err
  648. }
  649. records = append(records, rows...)
  650. }
  651. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  652. tombstoneEmails := make([]string, 0, len(records))
  653. for i := range records {
  654. recordsByEmail[records[i].Email] = &records[i]
  655. tombstoneEmails = append(tombstoneEmails, records[i].Email)
  656. }
  657. tombstoneClientEmails(tombstoneEmails)
  658. skippedReasons := map[string]string{}
  659. for _, email := range cleanEmails {
  660. if _, ok := recordsByEmail[email]; !ok {
  661. skippedReasons[email] = "client not found"
  662. }
  663. }
  664. clientIds := make([]int, 0, len(recordsByEmail))
  665. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  666. for _, r := range recordsByEmail {
  667. clientIds = append(clientIds, r.Id)
  668. recordIdToEmail[r.Id] = r.Email
  669. }
  670. emailsByInbound := map[int][]string{}
  671. if len(clientIds) > 0 {
  672. var mappings []model.ClientInbound
  673. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  674. var rows []model.ClientInbound
  675. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  676. return result, false, err
  677. }
  678. mappings = append(mappings, rows...)
  679. }
  680. for _, m := range mappings {
  681. email, ok := recordIdToEmail[m.ClientId]
  682. if !ok {
  683. continue
  684. }
  685. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  686. }
  687. }
  688. needRestart := false
  689. for inboundId, ibEmails := range emailsByInbound {
  690. ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
  691. if ibResult.needRestart {
  692. needRestart = true
  693. }
  694. for email, reason := range ibResult.perEmailSkipped {
  695. if _, already := skippedReasons[email]; !already {
  696. skippedReasons[email] = reason
  697. }
  698. }
  699. }
  700. successEmails := make([]string, 0, len(recordsByEmail))
  701. successIds := make([]int, 0, len(recordsByEmail))
  702. for email, rec := range recordsByEmail {
  703. if _, skipped := skippedReasons[email]; skipped {
  704. continue
  705. }
  706. successEmails = append(successEmails, email)
  707. successIds = append(successIds, rec.Id)
  708. }
  709. if len(successIds) > 0 {
  710. // Serialize the row cleanup against the traffic poll to avoid the
  711. // cross-transaction lock-order deadlock on client_traffics/inbounds.
  712. if err := runSerializedTx(func(tx *gorm.DB) error {
  713. for _, batch := range chunkInts(successIds, sqlInChunk) {
  714. if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
  715. return e
  716. }
  717. }
  718. if !keepTraffic && len(successEmails) > 0 {
  719. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  720. if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
  721. return e
  722. }
  723. if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
  724. return e
  725. }
  726. }
  727. }
  728. for _, batch := range chunkInts(successIds, sqlInChunk) {
  729. if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
  730. return e
  731. }
  732. }
  733. return nil
  734. }); err != nil {
  735. return result, needRestart, err
  736. }
  737. }
  738. result.Deleted = len(successEmails)
  739. for email, reason := range skippedReasons {
  740. result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
  741. }
  742. return result, needRestart, nil
  743. }
  744. type bulkInboundDeleteResult struct {
  745. perEmailSkipped map[string]string
  746. needRestart bool
  747. }
  748. // bulkDelInboundClients removes multiple clients from a single inbound's
  749. // settings JSON in one read-modify-write cycle, runs the xray runtime
  750. // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
  751. // holds per-email failure reasons; emails not present in the map are
  752. // considered successful for this inbound.
  753. func (s *ClientService) bulkDelInboundClients(
  754. inboundSvc *InboundService,
  755. inboundId int,
  756. emails []string,
  757. records map[string]*model.ClientRecord,
  758. keepTraffic bool,
  759. ) bulkInboundDeleteResult {
  760. res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
  761. defer lockInbound(inboundId).Unlock()
  762. oldInbound, err := inboundSvc.GetInbound(inboundId)
  763. if err != nil {
  764. logger.Error("Load Old Data Error")
  765. for _, e := range emails {
  766. res.perEmailSkipped[e] = err.Error()
  767. }
  768. return res
  769. }
  770. var settings map[string]any
  771. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  772. for _, e := range emails {
  773. res.perEmailSkipped[e] = err.Error()
  774. }
  775. return res
  776. }
  777. // Match by email — the client's stable identity (see Delete). Removes every
  778. // entry carrying a wanted email, independent of credential drift.
  779. wantedEmails := make(map[string]struct{}, len(emails))
  780. for _, email := range emails {
  781. if records[email] == nil {
  782. res.perEmailSkipped[email] = "client not found"
  783. continue
  784. }
  785. wantedEmails[email] = struct{}{}
  786. }
  787. interfaceClients, _ := settings["clients"].([]any)
  788. newClients := make([]any, 0, len(interfaceClients))
  789. foundEmails := map[string]bool{}
  790. enableByEmail := map[string]bool{}
  791. for _, client := range interfaceClients {
  792. c, ok := client.(map[string]any)
  793. if !ok {
  794. newClients = append(newClients, client)
  795. continue
  796. }
  797. em, _ := c["email"].(string)
  798. if _, found := wantedEmails[em]; found && em != "" {
  799. foundEmails[em] = true
  800. en, _ := c["enable"].(bool)
  801. enableByEmail[em] = en
  802. continue
  803. }
  804. newClients = append(newClients, client)
  805. }
  806. for email := range wantedEmails {
  807. if !foundEmails[email] {
  808. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  809. }
  810. }
  811. db := database.GetDB()
  812. newClients = compactOrphans(db, newClients)
  813. if newClients == nil {
  814. newClients = []any{}
  815. }
  816. settings["clients"] = newClients
  817. newSettings, err := json.MarshalIndent(settings, "", " ")
  818. if err != nil {
  819. for email := range foundEmails {
  820. if _, skip := res.perEmailSkipped[email]; !skip {
  821. res.perEmailSkipped[email] = err.Error()
  822. }
  823. }
  824. return res
  825. }
  826. oldInbound.Settings = string(newSettings)
  827. foundList := make([]string, 0, len(foundEmails))
  828. for email := range foundEmails {
  829. foundList = append(foundList, email)
  830. }
  831. notDepletedByEmail := map[string]bool{}
  832. if len(foundList) > 0 {
  833. type trafficRow struct {
  834. Email string
  835. Enable bool
  836. }
  837. for _, batch := range chunkStrings(foundList, sqlInChunk) {
  838. var rows []trafficRow
  839. if err := db.Model(xray.ClientTraffic{}).
  840. Where("email IN ?", batch).
  841. Select("email, enable").
  842. Scan(&rows).Error; err == nil {
  843. for _, r := range rows {
  844. notDepletedByEmail[r.Email] = r.Enable
  845. }
  846. }
  847. }
  848. }
  849. var sharedSet map[string]bool
  850. if !keepTraffic {
  851. var sharedErr error
  852. sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
  853. if sharedErr != nil {
  854. for email := range foundEmails {
  855. res.perEmailSkipped[email] = sharedErr.Error()
  856. delete(foundEmails, email)
  857. }
  858. return res
  859. }
  860. }
  861. if !keepTraffic {
  862. purge := make([]string, 0, len(foundEmails))
  863. for email := range foundEmails {
  864. if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
  865. purge = append(purge, email)
  866. }
  867. }
  868. if len(purge) > 0 {
  869. // Serialize the IP/stat purge against the traffic poll to avoid the
  870. // cross-transaction lock-order deadlock on client_traffics.
  871. if delErr := runSerializedTx(func(tx *gorm.DB) error {
  872. if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
  873. logger.Error("Error in delete client IPs")
  874. return e
  875. }
  876. if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
  877. logger.Error("Delete stats Data Error")
  878. return e
  879. }
  880. return nil
  881. }); delErr != nil {
  882. for _, email := range purge {
  883. res.perEmailSkipped[email] = delErr.Error()
  884. delete(foundEmails, email)
  885. }
  886. }
  887. }
  888. }
  889. if oldInbound.NodeID == nil {
  890. rt, rterr := inboundSvc.runtimeFor(oldInbound)
  891. if rterr != nil {
  892. res.needRestart = true
  893. } else {
  894. for email := range foundEmails {
  895. if !enableByEmail[email] || !notDepletedByEmail[email] {
  896. continue
  897. }
  898. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  899. if err1 == nil {
  900. logger.Debug("Client deleted on", rt.Name(), ":", email)
  901. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  902. logger.Debug("User is already deleted. Nothing to do more...")
  903. } else {
  904. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  905. res.needRestart = true
  906. }
  907. }
  908. }
  909. } else {
  910. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  911. if perr != nil {
  912. for email := range foundEmails {
  913. res.perEmailSkipped[email] = perr.Error()
  914. delete(foundEmails, email)
  915. }
  916. } else {
  917. // Large batches collapse into one reconcile push rather than M deletes.
  918. if push && len(foundEmails) > nodeBulkPushThreshold {
  919. push = false
  920. }
  921. if push {
  922. for email := range foundEmails {
  923. if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
  924. logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
  925. }
  926. }
  927. }
  928. }
  929. }
  930. // Serialize against the traffic poll to avoid the cross-transaction
  931. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  932. txErr := runSerializedTx(func(tx *gorm.DB) error {
  933. if err := tx.Save(oldInbound).Error; err != nil {
  934. return err
  935. }
  936. finalClients, err := inboundSvc.GetClients(oldInbound)
  937. if err != nil {
  938. return err
  939. }
  940. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  941. return err
  942. }
  943. if oldInbound.NodeID != nil {
  944. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  945. }
  946. return nil
  947. })
  948. if txErr != nil {
  949. for email := range foundEmails {
  950. if _, skip := res.perEmailSkipped[email]; !skip {
  951. res.perEmailSkipped[email] = txErr.Error()
  952. }
  953. }
  954. }
  955. return res
  956. }
  957. // BulkCreateResult mirrors BulkAdjustResult for the create flow.
  958. type BulkCreateResult struct {
  959. Created int `json:"created"`
  960. Skipped []BulkCreateReport `json:"skipped,omitempty"`
  961. }
  962. type BulkCreateReport struct {
  963. Email string `json:"email"`
  964. Reason string `json:"reason"`
  965. }
  966. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
  967. result := BulkCreateResult{}
  968. if len(payloads) == 0 {
  969. return result, false, nil
  970. }
  971. skip := func(email, reason string) {
  972. if strings.TrimSpace(email) == "" {
  973. email = "(missing email)"
  974. }
  975. result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
  976. }
  977. emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
  978. if err != nil {
  979. emailSubIDs = nil
  980. }
  981. type prepared struct {
  982. client model.Client
  983. inboundIds []int
  984. }
  985. prep := make([]prepared, 0, len(payloads))
  986. emails := make([]string, 0, len(payloads))
  987. subIDs := make([]string, 0, len(payloads))
  988. seenEmail := make(map[string]struct{}, len(payloads))
  989. seenSubID := make(map[string]string, len(payloads))
  990. for i := range payloads {
  991. client := payloads[i].Client
  992. email := strings.TrimSpace(client.Email)
  993. if email == "" {
  994. skip("", "client email is required")
  995. continue
  996. }
  997. if verr := validateClientEmail(email); verr != nil {
  998. skip(email, verr.Error())
  999. continue
  1000. }
  1001. if verr := validateClientSubID(client.SubID); verr != nil {
  1002. skip(email, verr.Error())
  1003. continue
  1004. }
  1005. if len(payloads[i].InboundIds) == 0 {
  1006. skip(email, "at least one inbound is required")
  1007. continue
  1008. }
  1009. client.Email = email
  1010. if client.SubID == "" {
  1011. client.SubID = uuid.NewString()
  1012. }
  1013. if !client.Enable {
  1014. client.Enable = true
  1015. }
  1016. now := time.Now().UnixMilli()
  1017. if client.CreatedAt == 0 {
  1018. client.CreatedAt = now
  1019. }
  1020. client.UpdatedAt = now
  1021. le := strings.ToLower(email)
  1022. if _, dup := seenEmail[le]; dup {
  1023. skip(email, "email already in use: "+email)
  1024. continue
  1025. }
  1026. if owner, ok := seenSubID[client.SubID]; ok && owner != le {
  1027. skip(email, "subId already in use: "+client.SubID)
  1028. continue
  1029. }
  1030. seenEmail[le] = struct{}{}
  1031. seenSubID[client.SubID] = le
  1032. prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
  1033. emails = append(emails, email)
  1034. subIDs = append(subIDs, client.SubID)
  1035. }
  1036. if len(prep) == 0 {
  1037. return result, false, nil
  1038. }
  1039. db := database.GetDB()
  1040. const lookupChunk = 400
  1041. existingEmailSub := make(map[string]string, len(emails))
  1042. for start := 0; start < len(emails); start += lookupChunk {
  1043. end := min(start+lookupChunk, len(emails))
  1044. var rows []model.ClientRecord
  1045. if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
  1046. return result, false, e
  1047. }
  1048. for i := range rows {
  1049. existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
  1050. }
  1051. }
  1052. existingSubOwner := make(map[string]string, len(subIDs))
  1053. for start := 0; start < len(subIDs); start += lookupChunk {
  1054. end := min(start+lookupChunk, len(subIDs))
  1055. var rows []model.ClientRecord
  1056. if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
  1057. return result, false, e
  1058. }
  1059. for i := range rows {
  1060. existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
  1061. }
  1062. }
  1063. inboundCache := make(map[int]*model.Inbound)
  1064. getIb := func(id int) (*model.Inbound, error) {
  1065. if ib, ok := inboundCache[id]; ok {
  1066. return ib, nil
  1067. }
  1068. ib, e := inboundSvc.GetInbound(id)
  1069. if e != nil {
  1070. return nil, e
  1071. }
  1072. inboundCache[id] = ib
  1073. return ib, nil
  1074. }
  1075. byInbound := make(map[int][]model.Client)
  1076. idxByInbound := make(map[int][]int)
  1077. inboundOrder := make([]int, 0)
  1078. failed := make([]bool, len(prep))
  1079. reason := make([]string, len(prep))
  1080. for idx := range prep {
  1081. le := strings.ToLower(prep[idx].client.Email)
  1082. if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
  1083. failed[idx] = true
  1084. reason[idx] = "email already in use: " + prep[idx].client.Email
  1085. continue
  1086. }
  1087. if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
  1088. failed[idx] = true
  1089. reason[idx] = "subId already in use: " + prep[idx].client.SubID
  1090. continue
  1091. }
  1092. ok := true
  1093. for _, ibId := range prep[idx].inboundIds {
  1094. ib, e := getIb(ibId)
  1095. if e != nil {
  1096. failed[idx] = true
  1097. reason[idx] = e.Error()
  1098. ok = false
  1099. break
  1100. }
  1101. if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
  1102. failed[idx] = true
  1103. reason[idx] = e.Error()
  1104. ok = false
  1105. break
  1106. }
  1107. }
  1108. if !ok {
  1109. continue
  1110. }
  1111. for _, ibId := range prep[idx].inboundIds {
  1112. ib, _ := getIb(ibId)
  1113. if _, seen := byInbound[ibId]; !seen {
  1114. inboundOrder = append(inboundOrder, ibId)
  1115. }
  1116. byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
  1117. idxByInbound[ibId] = append(idxByInbound[ibId], idx)
  1118. }
  1119. }
  1120. needRestart := false
  1121. for _, ibId := range inboundOrder {
  1122. payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
  1123. if e == nil {
  1124. var nr bool
  1125. nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  1126. if e == nil && nr {
  1127. needRestart = true
  1128. }
  1129. }
  1130. if e != nil {
  1131. for _, idx := range idxByInbound[ibId] {
  1132. failed[idx] = true
  1133. if reason[idx] == "" {
  1134. reason[idx] = e.Error()
  1135. }
  1136. }
  1137. }
  1138. }
  1139. for idx := range prep {
  1140. if failed[idx] {
  1141. skip(prep[idx].client.Email, reason[idx])
  1142. } else {
  1143. result.Created++
  1144. }
  1145. }
  1146. return result, needRestart, nil
  1147. }
  1148. func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
  1149. db := database.GetDB()
  1150. now := time.Now().UnixMilli()
  1151. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  1152. var rows []xray.ClientTraffic
  1153. if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
  1154. return 0, false, err
  1155. }
  1156. if len(rows) == 0 {
  1157. return 0, false, nil
  1158. }
  1159. seen := make(map[string]struct{}, len(rows))
  1160. emails := make([]string, 0, len(rows))
  1161. for _, r := range rows {
  1162. if r.Email == "" {
  1163. continue
  1164. }
  1165. if _, ok := seen[r.Email]; ok {
  1166. continue
  1167. }
  1168. seen[r.Email] = struct{}{}
  1169. emails = append(emails, r.Email)
  1170. }
  1171. if len(emails) == 0 {
  1172. return 0, false, nil
  1173. }
  1174. res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
  1175. if err != nil {
  1176. return res.Deleted, needRestart, err
  1177. }
  1178. return res.Deleted, needRestart, nil
  1179. }
  1180. type BulkSetEnableResult struct {
  1181. Changed int `json:"changed"`
  1182. Skipped []BulkSetEnableReport `json:"skipped,omitempty"`
  1183. }
  1184. type BulkSetEnableReport struct {
  1185. Email string `json:"email"`
  1186. Reason string `json:"reason"`
  1187. }
  1188. func (s *ClientService) BulkSetEnable(inboundSvc *InboundService, emails []string, enable bool) (BulkSetEnableResult, bool, error) {
  1189. result := BulkSetEnableResult{}
  1190. seen := map[string]struct{}{}
  1191. cleanEmails := make([]string, 0, len(emails))
  1192. for _, e := range emails {
  1193. e = strings.TrimSpace(e)
  1194. if e == "" {
  1195. continue
  1196. }
  1197. if _, ok := seen[e]; ok {
  1198. continue
  1199. }
  1200. seen[e] = struct{}{}
  1201. cleanEmails = append(cleanEmails, e)
  1202. }
  1203. if len(cleanEmails) == 0 {
  1204. return result, false, nil
  1205. }
  1206. db := database.GetDB()
  1207. var records []model.ClientRecord
  1208. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  1209. var rows []model.ClientRecord
  1210. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  1211. return result, false, err
  1212. }
  1213. records = append(records, rows...)
  1214. }
  1215. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  1216. for i := range records {
  1217. recordsByEmail[records[i].Email] = &records[i]
  1218. }
  1219. skippedReasons := map[string]string{}
  1220. for _, email := range cleanEmails {
  1221. if _, ok := recordsByEmail[email]; !ok {
  1222. skippedReasons[email] = "client not found"
  1223. }
  1224. }
  1225. clientIds := make([]int, 0, len(recordsByEmail))
  1226. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  1227. for _, r := range recordsByEmail {
  1228. clientIds = append(clientIds, r.Id)
  1229. recordIdToEmail[r.Id] = r.Email
  1230. }
  1231. emailsByInbound := map[int][]string{}
  1232. if len(clientIds) > 0 {
  1233. var mappings []model.ClientInbound
  1234. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  1235. var rows []model.ClientInbound
  1236. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  1237. return result, false, err
  1238. }
  1239. mappings = append(mappings, rows...)
  1240. }
  1241. for _, m := range mappings {
  1242. email, ok := recordIdToEmail[m.ClientId]
  1243. if !ok {
  1244. continue
  1245. }
  1246. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  1247. }
  1248. }
  1249. needRestart := false
  1250. for inboundId, ibEmails := range emailsByInbound {
  1251. ibRes := s.bulkSetEnableInboundClients(inboundSvc, inboundId, ibEmails, enable)
  1252. if ibRes.needRestart {
  1253. needRestart = true
  1254. }
  1255. for email, reason := range ibRes.perEmailSkipped {
  1256. if _, already := skippedReasons[email]; !already {
  1257. skippedReasons[email] = reason
  1258. }
  1259. }
  1260. }
  1261. successEmails := make([]string, 0, len(recordsByEmail))
  1262. for email := range recordsByEmail {
  1263. if _, skipped := skippedReasons[email]; skipped {
  1264. continue
  1265. }
  1266. successEmails = append(successEmails, email)
  1267. }
  1268. if len(successEmails) > 0 {
  1269. now := time.Now().UnixMilli()
  1270. if err := runSerializedTx(func(tx *gorm.DB) error {
  1271. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  1272. if e := tx.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Update("enable", enable).Error; e != nil {
  1273. return e
  1274. }
  1275. if e := tx.Model(&model.ClientRecord{}).Where("email IN ?", batch).
  1276. Updates(map[string]any{"enable": enable, "updated_at": now}).Error; e != nil {
  1277. return e
  1278. }
  1279. }
  1280. return nil
  1281. }); err != nil {
  1282. return result, needRestart, err
  1283. }
  1284. }
  1285. result.Changed = len(successEmails)
  1286. for email, reason := range skippedReasons {
  1287. result.Skipped = append(result.Skipped, BulkSetEnableReport{Email: email, Reason: reason})
  1288. }
  1289. return result, needRestart, nil
  1290. }
  1291. type bulkSetEnableInboundResult struct {
  1292. perEmailSkipped map[string]string
  1293. needRestart bool
  1294. }
  1295. func (s *ClientService) bulkSetEnableInboundClients(inboundSvc *InboundService, inboundId int, emails []string, enable bool) bulkSetEnableInboundResult {
  1296. res := bulkSetEnableInboundResult{perEmailSkipped: map[string]string{}}
  1297. defer lockInbound(inboundId).Unlock()
  1298. oldInbound, err := inboundSvc.GetInbound(inboundId)
  1299. if err != nil {
  1300. for _, e := range emails {
  1301. res.perEmailSkipped[e] = err.Error()
  1302. }
  1303. return res
  1304. }
  1305. var settings map[string]any
  1306. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  1307. for _, e := range emails {
  1308. res.perEmailSkipped[e] = err.Error()
  1309. }
  1310. return res
  1311. }
  1312. wanted := make(map[string]struct{}, len(emails))
  1313. for _, email := range emails {
  1314. wanted[email] = struct{}{}
  1315. }
  1316. cipher := ""
  1317. if oldInbound.Protocol == model.Shadowsocks {
  1318. cipher, _ = settings["method"].(string)
  1319. }
  1320. type changedClient struct {
  1321. email string
  1322. wasEnable bool
  1323. client model.Client
  1324. }
  1325. var changed []changedClient
  1326. found := map[string]bool{}
  1327. nowMs := time.Now().UnixMilli()
  1328. interfaceClients, _ := settings["clients"].([]any)
  1329. for i, c := range interfaceClients {
  1330. entry, ok := c.(map[string]any)
  1331. if !ok {
  1332. continue
  1333. }
  1334. email, _ := entry["email"].(string)
  1335. if _, want := wanted[email]; !want || email == "" {
  1336. continue
  1337. }
  1338. found[email] = true
  1339. prev, _ := entry["enable"].(bool)
  1340. if prev == enable {
  1341. continue
  1342. }
  1343. entry["enable"] = enable
  1344. entry["updated_at"] = nowMs
  1345. interfaceClients[i] = entry
  1346. // Build the pushed client from the inbound JSON (the per-inbound source of
  1347. // truth), so a remote UpdateUser carries every field and never zeroes
  1348. // subId/totalGB/expiry from drifting ClientRecord columns (#4628/#4792).
  1349. var parsed model.Client
  1350. if b, mErr := json.Marshal(entry); mErr == nil {
  1351. _ = json.Unmarshal(b, &parsed)
  1352. }
  1353. parsed.Email = email
  1354. parsed.Enable = enable
  1355. changed = append(changed, changedClient{email: email, wasEnable: prev, client: parsed})
  1356. }
  1357. for email := range wanted {
  1358. if !found[email] {
  1359. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  1360. }
  1361. }
  1362. if len(changed) == 0 {
  1363. return res
  1364. }
  1365. settings["clients"] = interfaceClients
  1366. newSettings, err := json.MarshalIndent(settings, "", " ")
  1367. if err != nil {
  1368. for _, ch := range changed {
  1369. res.perEmailSkipped[ch.email] = err.Error()
  1370. }
  1371. return res
  1372. }
  1373. oldInbound.Settings = string(newSettings)
  1374. rt, push, _, perr := inboundSvc.nodePushPlan(oldInbound)
  1375. if perr != nil {
  1376. for _, ch := range changed {
  1377. res.perEmailSkipped[ch.email] = perr.Error()
  1378. }
  1379. return res
  1380. }
  1381. if oldInbound.NodeID != nil && push && len(changed) > nodeBulkPushThreshold {
  1382. push = false
  1383. }
  1384. txErr := runSerializedTx(func(tx *gorm.DB) error {
  1385. if e := tx.Save(oldInbound).Error; e != nil {
  1386. return e
  1387. }
  1388. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  1389. if gcErr != nil {
  1390. return gcErr
  1391. }
  1392. if err := s.SyncInbound(tx, inboundId, finalClients); err != nil {
  1393. return err
  1394. }
  1395. if oldInbound.NodeID != nil {
  1396. return (&NodeService{}).MarkNodeDirtyTx(tx, *oldInbound.NodeID)
  1397. }
  1398. return nil
  1399. })
  1400. if txErr != nil {
  1401. for _, ch := range changed {
  1402. res.perEmailSkipped[ch.email] = txErr.Error()
  1403. }
  1404. return res
  1405. }
  1406. if oldInbound.NodeID == nil {
  1407. if !push {
  1408. res.needRestart = true
  1409. } else {
  1410. for _, ch := range changed {
  1411. if enable {
  1412. err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
  1413. "email": ch.client.Email,
  1414. "id": ch.client.ID,
  1415. "security": ch.client.Security,
  1416. "flow": ch.client.Flow,
  1417. "auth": ch.client.Auth,
  1418. "password": ch.client.Password,
  1419. "cipher": cipher,
  1420. })
  1421. if err1 != nil {
  1422. logger.Debug("Error in adding client on", rt.Name(), ":", err1)
  1423. res.needRestart = true
  1424. }
  1425. } else if ch.wasEnable {
  1426. err1 := rt.RemoveUser(context.Background(), oldInbound, ch.email)
  1427. if err1 != nil && !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", ch.email)) {
  1428. logger.Debug("Error in removing client on", rt.Name(), ":", err1)
  1429. res.needRestart = true
  1430. }
  1431. }
  1432. }
  1433. }
  1434. } else if push {
  1435. for _, ch := range changed {
  1436. updated := ch.client
  1437. updated.UpdatedAt = nowMs
  1438. if err1 := rt.UpdateUser(context.Background(), oldInbound, ch.email, updated); err1 != nil {
  1439. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  1440. }
  1441. }
  1442. }
  1443. return res
  1444. }