1
0

client_bulk.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
  17. type BulkAttachResult struct {
  18. Attached []string `json:"attached"`
  19. Skipped []string `json:"skipped"`
  20. Errors []string `json:"errors"`
  21. }
  22. // BulkAttach attaches the given existing clients (by email) to each target inbound,
  23. // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
  24. // all clients to a target in a single AddInboundClient call, and reports clients already
  25. // present on a target as skipped.
  26. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
  27. result := &BulkAttachResult{}
  28. if len(emails) == 0 || len(inboundIds) == 0 {
  29. return result, false, nil
  30. }
  31. recordErr := func(format string, args ...any) {
  32. msg := fmt.Sprintf(format, args...)
  33. result.Errors = append(result.Errors, msg)
  34. logger.Warningf("[BulkAttach] %s", msg)
  35. }
  36. records := make([]*model.ClientRecord, 0, len(emails))
  37. seenEmail := make(map[string]struct{}, len(emails))
  38. for _, email := range emails {
  39. if email == "" {
  40. continue
  41. }
  42. key := strings.ToLower(email)
  43. if _, ok := seenEmail[key]; ok {
  44. continue
  45. }
  46. seenEmail[key] = struct{}{}
  47. rec, err := s.GetRecordByEmail(nil, email)
  48. if err != nil {
  49. recordErr("%s: %v", email, err)
  50. continue
  51. }
  52. records = append(records, rec)
  53. }
  54. emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
  55. if sidErr != nil {
  56. emailSubIDs = nil
  57. logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
  58. }
  59. needRestart := false
  60. for _, ibId := range inboundIds {
  61. inbound, err := inboundSvc.GetInbound(ibId)
  62. if err != nil {
  63. recordErr("inbound %d: %v", ibId, err)
  64. continue
  65. }
  66. existingClients, err := inboundSvc.GetClients(inbound)
  67. if err != nil {
  68. recordErr("inbound %d: %v", ibId, err)
  69. continue
  70. }
  71. have := make(map[string]struct{}, len(existingClients))
  72. for _, c := range existingClients {
  73. have[strings.ToLower(c.Email)] = struct{}{}
  74. }
  75. clientsToAdd := make([]model.Client, 0, len(records))
  76. for _, rec := range records {
  77. if _, attached := have[strings.ToLower(rec.Email)]; attached {
  78. result.Skipped = append(result.Skipped, rec.Email)
  79. continue
  80. }
  81. client := *rec.ToClient()
  82. client.UpdatedAt = time.Now().UnixMilli()
  83. if err := s.fillProtocolDefaults(&client, inbound); err != nil {
  84. recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
  85. continue
  86. }
  87. clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
  88. }
  89. if len(clientsToAdd) == 0 {
  90. continue
  91. }
  92. payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
  93. if err != nil {
  94. recordErr("inbound %d: %v", ibId, err)
  95. continue
  96. }
  97. nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  98. if err != nil {
  99. recordErr("inbound %d: %v", ibId, err)
  100. continue
  101. }
  102. if nr {
  103. needRestart = true
  104. }
  105. for _, c := range clientsToAdd {
  106. result.Attached = append(result.Attached, c.Email)
  107. }
  108. }
  109. return result, needRestart, nil
  110. }
  111. // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
  112. type BulkDetachResult struct {
  113. Detached []string `json:"detached"`
  114. Skipped []string `json:"skipped"`
  115. Errors []string `json:"errors"`
  116. }
  117. // BulkDetach detaches the given existing clients (by email) from each target inbound.
  118. // (email, inbound) pairs where the client is not currently attached are silently skipped
  119. // at the inbound level; emails that aren't attached to any of the requested inbounds
  120. // are reported under skipped. ClientRecord rows are kept even when they become orphaned
  121. // (matches single-client detach semantics); callers should use bulkDelete for full removal.
  122. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
  123. result := &BulkDetachResult{}
  124. if len(emails) == 0 || len(inboundIds) == 0 {
  125. return result, false, nil
  126. }
  127. recordErr := func(format string, args ...any) {
  128. msg := fmt.Sprintf(format, args...)
  129. result.Errors = append(result.Errors, msg)
  130. logger.Warningf("[BulkDetach] %s", msg)
  131. }
  132. requested := make(map[int]struct{}, len(inboundIds))
  133. for _, id := range inboundIds {
  134. requested[id] = struct{}{}
  135. }
  136. recsByInbound := make(map[int][]*model.ClientRecord)
  137. emailOrder := make([]string, 0, len(emails))
  138. emailRepr := make(map[string]string, len(emails))
  139. emailFailed := make(map[string]bool, len(emails))
  140. seenEmail := make(map[string]struct{}, len(emails))
  141. for _, email := range emails {
  142. if email == "" {
  143. continue
  144. }
  145. key := strings.ToLower(email)
  146. if _, ok := seenEmail[key]; ok {
  147. continue
  148. }
  149. seenEmail[key] = struct{}{}
  150. rec, err := s.GetRecordByEmail(nil, email)
  151. if err != nil {
  152. recordErr("%s: %v", email, err)
  153. continue
  154. }
  155. currentIds, err := s.GetInboundIdsForRecord(rec.Id)
  156. if err != nil {
  157. recordErr("%s: %v", email, err)
  158. continue
  159. }
  160. matched := false
  161. for _, id := range currentIds {
  162. if _, ok := requested[id]; ok {
  163. recsByInbound[id] = append(recsByInbound[id], rec)
  164. matched = true
  165. }
  166. }
  167. if !matched {
  168. result.Skipped = append(result.Skipped, rec.Email)
  169. continue
  170. }
  171. emailOrder = append(emailOrder, key)
  172. emailRepr[key] = rec.Email
  173. }
  174. needRestart := false
  175. for _, ibId := range inboundIds {
  176. recs, ok := recsByInbound[ibId]
  177. if !ok {
  178. continue
  179. }
  180. delete(recsByInbound, ibId)
  181. nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
  182. if err != nil {
  183. recordErr("inbound %d: %v", ibId, err)
  184. for _, rec := range recs {
  185. emailFailed[strings.ToLower(rec.Email)] = true
  186. }
  187. continue
  188. }
  189. if nr {
  190. needRestart = true
  191. }
  192. }
  193. for _, key := range emailOrder {
  194. if emailFailed[key] {
  195. continue
  196. }
  197. result.Detached = append(result.Detached, emailRepr[key])
  198. }
  199. return result, needRestart, nil
  200. }
  201. // BulkAdjustResult is returned by BulkAdjust to report how many clients were
  202. // successfully updated and which were skipped (typically because the field
  203. // being adjusted was unlimited for that client) or failed.
  204. type BulkAdjustResult struct {
  205. Adjusted int `json:"adjusted"`
  206. Skipped []BulkAdjustReport `json:"skipped,omitempty"`
  207. }
  208. type BulkAdjustReport struct {
  209. Email string `json:"email"`
  210. Reason string `json:"reason"`
  211. }
  212. type bulkAdjustEntry struct {
  213. record *model.ClientRecord
  214. applyExpiry bool
  215. newExpiry int64
  216. applyTotal bool
  217. newTotal int64
  218. }
  219. // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
  220. // for every email in the list. Clients whose corresponding field is
  221. // unlimited (0) are skipped — bulk extend should not accidentally
  222. // limit an unlimited client. addDays and addBytes may be negative.
  223. //
  224. // Like BulkDelete, the work is grouped by inbound so each inbound's
  225. // settings JSON is parsed and written exactly once regardless of how
  226. // many target emails it contains.
  227. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
  228. result := BulkAdjustResult{}
  229. if len(emails) == 0 {
  230. return result, false, nil
  231. }
  232. if addDays == 0 && addBytes == 0 {
  233. return result, false, common.NewError("no adjustment specified")
  234. }
  235. addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
  236. seen := map[string]struct{}{}
  237. cleanEmails := make([]string, 0, len(emails))
  238. for _, e := range emails {
  239. e = strings.TrimSpace(e)
  240. if e == "" {
  241. continue
  242. }
  243. if _, ok := seen[e]; ok {
  244. continue
  245. }
  246. seen[e] = struct{}{}
  247. cleanEmails = append(cleanEmails, e)
  248. }
  249. if len(cleanEmails) == 0 {
  250. return result, false, nil
  251. }
  252. db := database.GetDB()
  253. var records []model.ClientRecord
  254. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  255. var rows []model.ClientRecord
  256. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  257. return result, false, err
  258. }
  259. records = append(records, rows...)
  260. }
  261. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  262. for i := range records {
  263. recordsByEmail[records[i].Email] = &records[i]
  264. }
  265. skippedReasons := map[string]string{}
  266. for _, email := range cleanEmails {
  267. if _, ok := recordsByEmail[email]; !ok {
  268. skippedReasons[email] = "client not found"
  269. }
  270. }
  271. plan := map[string]*bulkAdjustEntry{}
  272. for email, rec := range recordsByEmail {
  273. entry := &bulkAdjustEntry{record: rec}
  274. if addDays != 0 {
  275. switch {
  276. case rec.ExpiryTime == 0:
  277. if _, exists := skippedReasons[email]; !exists {
  278. skippedReasons[email] = "unlimited expiry"
  279. }
  280. case rec.ExpiryTime > 0:
  281. next := rec.ExpiryTime + addExpiryMs
  282. if next <= 0 {
  283. if _, exists := skippedReasons[email]; !exists {
  284. skippedReasons[email] = "reduction exceeds remaining time"
  285. }
  286. } else {
  287. entry.applyExpiry = true
  288. entry.newExpiry = next
  289. }
  290. default:
  291. next := rec.ExpiryTime - addExpiryMs
  292. if next >= 0 {
  293. if _, exists := skippedReasons[email]; !exists {
  294. skippedReasons[email] = "reduction exceeds delay window"
  295. }
  296. } else {
  297. entry.applyExpiry = true
  298. entry.newExpiry = next
  299. }
  300. }
  301. }
  302. if addBytes != 0 {
  303. if rec.TotalGB == 0 {
  304. if _, exists := skippedReasons[email]; !exists {
  305. skippedReasons[email] = "unlimited traffic"
  306. }
  307. } else {
  308. next := max(rec.TotalGB+addBytes, 0)
  309. entry.applyTotal = true
  310. entry.newTotal = next
  311. }
  312. }
  313. if entry.applyExpiry || entry.applyTotal {
  314. plan[email] = entry
  315. }
  316. }
  317. if len(plan) == 0 {
  318. for email, reason := range skippedReasons {
  319. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  320. }
  321. return result, false, nil
  322. }
  323. plannedIds := make([]int, 0, len(plan))
  324. recordIdToEmail := make(map[int]string, len(plan))
  325. for email, entry := range plan {
  326. plannedIds = append(plannedIds, entry.record.Id)
  327. recordIdToEmail[entry.record.Id] = email
  328. }
  329. var mappings []model.ClientInbound
  330. for _, batch := range chunkInts(plannedIds, sqlInChunk) {
  331. var rows []model.ClientInbound
  332. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  333. return result, false, err
  334. }
  335. mappings = append(mappings, rows...)
  336. }
  337. emailsByInbound := map[int][]string{}
  338. for _, m := range mappings {
  339. email, ok := recordIdToEmail[m.ClientId]
  340. if !ok {
  341. continue
  342. }
  343. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  344. }
  345. needRestart := false
  346. for inboundId, ibEmails := range emailsByInbound {
  347. ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
  348. if ibRes.needRestart {
  349. needRestart = true
  350. }
  351. for email, reason := range ibRes.perEmailSkipped {
  352. if _, already := skippedReasons[email]; !already {
  353. skippedReasons[email] = reason
  354. }
  355. }
  356. }
  357. for email, entry := range plan {
  358. if _, skipped := skippedReasons[email]; skipped {
  359. continue
  360. }
  361. updates := map[string]any{}
  362. if entry.applyExpiry {
  363. updates["expiry_time"] = entry.newExpiry
  364. }
  365. if entry.applyTotal {
  366. updates["total"] = entry.newTotal
  367. }
  368. if len(updates) == 0 {
  369. continue
  370. }
  371. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
  372. if _, already := skippedReasons[email]; !already {
  373. skippedReasons[email] = err.Error()
  374. }
  375. continue
  376. }
  377. result.Adjusted++
  378. }
  379. for email, reason := range skippedReasons {
  380. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  381. }
  382. return result, needRestart, nil
  383. }
  384. type bulkInboundAdjustResult struct {
  385. perEmailSkipped map[string]string
  386. needRestart bool
  387. }
  388. // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
  389. // inside a single inbound's settings JSON. The xray runtime is updated
  390. // only for remote-node inbounds; local nodes do not need a notification
  391. // because the AddUser payload does not include totalGB/expiryTime —
  392. // changing those fields is identity-preserving and the panel's traffic
  393. // enforcement loop picks up the new limits from ClientTraffic directly.
  394. func (s *ClientService) bulkAdjustInboundClients(
  395. inboundSvc *InboundService,
  396. inboundId int,
  397. emails []string,
  398. plan map[string]*bulkAdjustEntry,
  399. ) bulkInboundAdjustResult {
  400. res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
  401. defer lockInbound(inboundId).Unlock()
  402. oldInbound, err := inboundSvc.GetInbound(inboundId)
  403. if err != nil {
  404. logger.Error("Load Old Data Error")
  405. for _, e := range emails {
  406. res.perEmailSkipped[e] = err.Error()
  407. }
  408. return res
  409. }
  410. var settings map[string]any
  411. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  412. for _, e := range emails {
  413. res.perEmailSkipped[e] = err.Error()
  414. }
  415. return res
  416. }
  417. // Match by email — the client's stable identity (see Delete). Credentials
  418. // can drift from the inbound JSON, so they are never used for matching.
  419. wantedEmails := make(map[string]struct{}, len(emails))
  420. for _, email := range emails {
  421. if plan[email] == nil {
  422. res.perEmailSkipped[email] = "client not found"
  423. continue
  424. }
  425. wantedEmails[email] = struct{}{}
  426. }
  427. interfaceClients, _ := settings["clients"].([]any)
  428. foundEmails := map[string]bool{}
  429. nowMs := time.Now().Unix() * 1000
  430. for i, client := range interfaceClients {
  431. c, ok := client.(map[string]any)
  432. if !ok {
  433. continue
  434. }
  435. targetEmail, _ := c["email"].(string)
  436. if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
  437. continue
  438. }
  439. entry := plan[targetEmail]
  440. if entry.applyExpiry {
  441. c["expiryTime"] = entry.newExpiry
  442. }
  443. if entry.applyTotal {
  444. c["totalGB"] = entry.newTotal
  445. }
  446. c["updated_at"] = nowMs
  447. interfaceClients[i] = c
  448. foundEmails[targetEmail] = true
  449. }
  450. for email := range wantedEmails {
  451. if !foundEmails[email] {
  452. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  453. }
  454. }
  455. if len(foundEmails) == 0 {
  456. return res
  457. }
  458. settings["clients"] = interfaceClients
  459. newSettings, err := json.MarshalIndent(settings, "", " ")
  460. if err != nil {
  461. for email := range foundEmails {
  462. res.perEmailSkipped[email] = err.Error()
  463. }
  464. return res
  465. }
  466. oldInbound.Settings = string(newSettings)
  467. markDirty := false
  468. if oldInbound.NodeID != nil {
  469. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  470. if perr != nil {
  471. for email := range foundEmails {
  472. res.perEmailSkipped[email] = perr.Error()
  473. delete(foundEmails, email)
  474. }
  475. } else {
  476. if dirty {
  477. markDirty = true
  478. }
  479. if push {
  480. for email := range foundEmails {
  481. entry := plan[email]
  482. updated := *entry.record.ToClient()
  483. if entry.applyExpiry {
  484. updated.ExpiryTime = entry.newExpiry
  485. }
  486. if entry.applyTotal {
  487. updated.TotalGB = entry.newTotal
  488. }
  489. updated.UpdatedAt = nowMs
  490. if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
  491. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  492. markDirty = true
  493. }
  494. }
  495. }
  496. }
  497. }
  498. db := database.GetDB()
  499. txErr := db.Transaction(func(tx *gorm.DB) error {
  500. if err := tx.Save(oldInbound).Error; err != nil {
  501. return err
  502. }
  503. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  504. if gcErr != nil {
  505. return gcErr
  506. }
  507. return s.SyncInbound(tx, inboundId, finalClients)
  508. })
  509. if txErr != nil {
  510. for email := range foundEmails {
  511. if _, skip := res.perEmailSkipped[email]; !skip {
  512. res.perEmailSkipped[email] = txErr.Error()
  513. }
  514. }
  515. } else if markDirty && oldInbound.NodeID != nil {
  516. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  517. logger.Warning("mark node dirty failed:", dErr)
  518. }
  519. }
  520. return res
  521. }
  522. // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
  523. // skip reasons when an email could not be processed.
  524. type BulkDeleteResult struct {
  525. Deleted int `json:"deleted"`
  526. Skipped []BulkDeleteReport `json:"skipped,omitempty"`
  527. }
  528. type BulkDeleteReport struct {
  529. Email string `json:"email"`
  530. Reason string `json:"reason"`
  531. }
  532. // BulkDelete removes every client in the list in one optimized pass.
  533. // Instead of running the full single-delete pipeline N times (which would
  534. // re-read, re-parse, and re-write each inbound's settings JSON for every
  535. // email), it groups emails by inbound and performs a single
  536. // read-modify-write per inbound. Per-row DB cleanups are also batched with
  537. // IN-clause queries at the end. Errors on a particular email are recorded
  538. // in the Skipped list and processing continues for the rest.
  539. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
  540. result := BulkDeleteResult{}
  541. seen := map[string]struct{}{}
  542. cleanEmails := make([]string, 0, len(emails))
  543. for _, e := range emails {
  544. e = strings.TrimSpace(e)
  545. if e == "" {
  546. continue
  547. }
  548. if _, ok := seen[e]; ok {
  549. continue
  550. }
  551. seen[e] = struct{}{}
  552. cleanEmails = append(cleanEmails, e)
  553. }
  554. if len(cleanEmails) == 0 {
  555. return result, false, nil
  556. }
  557. db := database.GetDB()
  558. var records []model.ClientRecord
  559. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  560. var rows []model.ClientRecord
  561. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  562. return result, false, err
  563. }
  564. records = append(records, rows...)
  565. }
  566. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  567. tombstoneEmails := make([]string, 0, len(records))
  568. for i := range records {
  569. recordsByEmail[records[i].Email] = &records[i]
  570. tombstoneEmails = append(tombstoneEmails, records[i].Email)
  571. }
  572. tombstoneClientEmails(tombstoneEmails)
  573. skippedReasons := map[string]string{}
  574. for _, email := range cleanEmails {
  575. if _, ok := recordsByEmail[email]; !ok {
  576. skippedReasons[email] = "client not found"
  577. }
  578. }
  579. clientIds := make([]int, 0, len(recordsByEmail))
  580. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  581. for _, r := range recordsByEmail {
  582. clientIds = append(clientIds, r.Id)
  583. recordIdToEmail[r.Id] = r.Email
  584. }
  585. emailsByInbound := map[int][]string{}
  586. if len(clientIds) > 0 {
  587. var mappings []model.ClientInbound
  588. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  589. var rows []model.ClientInbound
  590. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  591. return result, false, err
  592. }
  593. mappings = append(mappings, rows...)
  594. }
  595. for _, m := range mappings {
  596. email, ok := recordIdToEmail[m.ClientId]
  597. if !ok {
  598. continue
  599. }
  600. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  601. }
  602. }
  603. needRestart := false
  604. for inboundId, ibEmails := range emailsByInbound {
  605. ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
  606. if ibResult.needRestart {
  607. needRestart = true
  608. }
  609. for email, reason := range ibResult.perEmailSkipped {
  610. if _, already := skippedReasons[email]; !already {
  611. skippedReasons[email] = reason
  612. }
  613. }
  614. }
  615. successEmails := make([]string, 0, len(recordsByEmail))
  616. successIds := make([]int, 0, len(recordsByEmail))
  617. for email, rec := range recordsByEmail {
  618. if _, skipped := skippedReasons[email]; skipped {
  619. continue
  620. }
  621. successEmails = append(successEmails, email)
  622. successIds = append(successIds, rec.Id)
  623. }
  624. if len(successIds) > 0 {
  625. for _, batch := range chunkInts(successIds, sqlInChunk) {
  626. if err := db.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; err != nil {
  627. return result, needRestart, err
  628. }
  629. }
  630. if !keepTraffic && len(successEmails) > 0 {
  631. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  632. if err := db.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; err != nil {
  633. return result, needRestart, err
  634. }
  635. if err := db.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; err != nil {
  636. return result, needRestart, err
  637. }
  638. }
  639. }
  640. for _, batch := range chunkInts(successIds, sqlInChunk) {
  641. if err := db.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; err != nil {
  642. return result, needRestart, err
  643. }
  644. }
  645. }
  646. result.Deleted = len(successEmails)
  647. for email, reason := range skippedReasons {
  648. result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
  649. }
  650. return result, needRestart, nil
  651. }
  652. type bulkInboundDeleteResult struct {
  653. perEmailSkipped map[string]string
  654. needRestart bool
  655. }
  656. // bulkDelInboundClients removes multiple clients from a single inbound's
  657. // settings JSON in one read-modify-write cycle, runs the xray runtime
  658. // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
  659. // holds per-email failure reasons; emails not present in the map are
  660. // considered successful for this inbound.
  661. func (s *ClientService) bulkDelInboundClients(
  662. inboundSvc *InboundService,
  663. inboundId int,
  664. emails []string,
  665. records map[string]*model.ClientRecord,
  666. keepTraffic bool,
  667. ) bulkInboundDeleteResult {
  668. res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
  669. defer lockInbound(inboundId).Unlock()
  670. oldInbound, err := inboundSvc.GetInbound(inboundId)
  671. if err != nil {
  672. logger.Error("Load Old Data Error")
  673. for _, e := range emails {
  674. res.perEmailSkipped[e] = err.Error()
  675. }
  676. return res
  677. }
  678. var settings map[string]any
  679. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  680. for _, e := range emails {
  681. res.perEmailSkipped[e] = err.Error()
  682. }
  683. return res
  684. }
  685. // Match by email — the client's stable identity (see Delete). Removes every
  686. // entry carrying a wanted email, independent of credential drift.
  687. wantedEmails := make(map[string]struct{}, len(emails))
  688. for _, email := range emails {
  689. if records[email] == nil {
  690. res.perEmailSkipped[email] = "client not found"
  691. continue
  692. }
  693. wantedEmails[email] = struct{}{}
  694. }
  695. interfaceClients, _ := settings["clients"].([]any)
  696. newClients := make([]any, 0, len(interfaceClients))
  697. foundEmails := map[string]bool{}
  698. enableByEmail := map[string]bool{}
  699. for _, client := range interfaceClients {
  700. c, ok := client.(map[string]any)
  701. if !ok {
  702. newClients = append(newClients, client)
  703. continue
  704. }
  705. em, _ := c["email"].(string)
  706. if _, found := wantedEmails[em]; found && em != "" {
  707. foundEmails[em] = true
  708. en, _ := c["enable"].(bool)
  709. enableByEmail[em] = en
  710. continue
  711. }
  712. newClients = append(newClients, client)
  713. }
  714. for email := range wantedEmails {
  715. if !foundEmails[email] {
  716. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  717. }
  718. }
  719. db := database.GetDB()
  720. newClients = compactOrphans(db, newClients)
  721. if newClients == nil {
  722. newClients = []any{}
  723. }
  724. settings["clients"] = newClients
  725. newSettings, err := json.MarshalIndent(settings, "", " ")
  726. if err != nil {
  727. for email := range foundEmails {
  728. if _, skip := res.perEmailSkipped[email]; !skip {
  729. res.perEmailSkipped[email] = err.Error()
  730. }
  731. }
  732. return res
  733. }
  734. oldInbound.Settings = string(newSettings)
  735. foundList := make([]string, 0, len(foundEmails))
  736. for email := range foundEmails {
  737. foundList = append(foundList, email)
  738. }
  739. notDepletedByEmail := map[string]bool{}
  740. if len(foundList) > 0 {
  741. type trafficRow struct {
  742. Email string
  743. Enable bool
  744. }
  745. for _, batch := range chunkStrings(foundList, sqlInChunk) {
  746. var rows []trafficRow
  747. if err := db.Model(xray.ClientTraffic{}).
  748. Where("email IN ?", batch).
  749. Select("email, enable").
  750. Scan(&rows).Error; err == nil {
  751. for _, r := range rows {
  752. notDepletedByEmail[r.Email] = r.Enable
  753. }
  754. }
  755. }
  756. }
  757. var sharedSet map[string]bool
  758. if !keepTraffic {
  759. var sharedErr error
  760. sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
  761. if sharedErr != nil {
  762. for email := range foundEmails {
  763. res.perEmailSkipped[email] = sharedErr.Error()
  764. delete(foundEmails, email)
  765. }
  766. return res
  767. }
  768. }
  769. if !keepTraffic {
  770. purge := make([]string, 0, len(foundEmails))
  771. for email := range foundEmails {
  772. if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
  773. purge = append(purge, email)
  774. }
  775. }
  776. if len(purge) > 0 {
  777. if delErr := inboundSvc.delClientIPsByEmails(db, purge); delErr != nil {
  778. logger.Error("Error in delete client IPs")
  779. for _, email := range purge {
  780. res.perEmailSkipped[email] = delErr.Error()
  781. delete(foundEmails, email)
  782. }
  783. } else if delErr := inboundSvc.delClientStatsByEmails(db, purge); delErr != nil {
  784. logger.Error("Delete stats Data Error")
  785. for _, email := range purge {
  786. res.perEmailSkipped[email] = delErr.Error()
  787. delete(foundEmails, email)
  788. }
  789. }
  790. }
  791. }
  792. markDirty := false
  793. if oldInbound.NodeID == nil {
  794. rt, rterr := inboundSvc.runtimeFor(oldInbound)
  795. if rterr != nil {
  796. res.needRestart = true
  797. } else {
  798. for email := range foundEmails {
  799. if !enableByEmail[email] || !notDepletedByEmail[email] {
  800. continue
  801. }
  802. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  803. if err1 == nil {
  804. logger.Debug("Client deleted on", rt.Name(), ":", email)
  805. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  806. logger.Debug("User is already deleted. Nothing to do more...")
  807. } else {
  808. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  809. res.needRestart = true
  810. }
  811. }
  812. }
  813. } else {
  814. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  815. if perr != nil {
  816. for email := range foundEmails {
  817. res.perEmailSkipped[email] = perr.Error()
  818. delete(foundEmails, email)
  819. }
  820. } else {
  821. if dirty {
  822. markDirty = true
  823. }
  824. if push {
  825. for email := range foundEmails {
  826. if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
  827. logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
  828. markDirty = true
  829. }
  830. }
  831. }
  832. }
  833. }
  834. txErr := db.Transaction(func(tx *gorm.DB) error {
  835. if err := tx.Save(oldInbound).Error; err != nil {
  836. return err
  837. }
  838. finalClients, err := inboundSvc.GetClients(oldInbound)
  839. if err != nil {
  840. return err
  841. }
  842. return s.SyncInbound(tx, inboundId, finalClients)
  843. })
  844. if txErr != nil {
  845. for email := range foundEmails {
  846. if _, skip := res.perEmailSkipped[email]; !skip {
  847. res.perEmailSkipped[email] = txErr.Error()
  848. }
  849. }
  850. } else if markDirty && oldInbound.NodeID != nil {
  851. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  852. logger.Warning("mark node dirty failed:", dErr)
  853. }
  854. }
  855. return res
  856. }
  857. // BulkCreateResult mirrors BulkAdjustResult for the create flow.
  858. type BulkCreateResult struct {
  859. Created int `json:"created"`
  860. Skipped []BulkCreateReport `json:"skipped,omitempty"`
  861. }
  862. type BulkCreateReport struct {
  863. Email string `json:"email"`
  864. Reason string `json:"reason"`
  865. }
  866. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
  867. result := BulkCreateResult{}
  868. if len(payloads) == 0 {
  869. return result, false, nil
  870. }
  871. skip := func(email, reason string) {
  872. if strings.TrimSpace(email) == "" {
  873. email = "(missing email)"
  874. }
  875. result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
  876. }
  877. emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
  878. if err != nil {
  879. emailSubIDs = nil
  880. }
  881. type prepared struct {
  882. client model.Client
  883. inboundIds []int
  884. }
  885. prep := make([]prepared, 0, len(payloads))
  886. emails := make([]string, 0, len(payloads))
  887. subIDs := make([]string, 0, len(payloads))
  888. seenEmail := make(map[string]struct{}, len(payloads))
  889. seenSubID := make(map[string]string, len(payloads))
  890. for i := range payloads {
  891. client := payloads[i].Client
  892. email := strings.TrimSpace(client.Email)
  893. if email == "" {
  894. skip("", "client email is required")
  895. continue
  896. }
  897. if verr := validateClientEmail(email); verr != nil {
  898. skip(email, verr.Error())
  899. continue
  900. }
  901. if verr := validateClientSubID(client.SubID); verr != nil {
  902. skip(email, verr.Error())
  903. continue
  904. }
  905. if len(payloads[i].InboundIds) == 0 {
  906. skip(email, "at least one inbound is required")
  907. continue
  908. }
  909. client.Email = email
  910. if client.SubID == "" {
  911. client.SubID = uuid.NewString()
  912. }
  913. if !client.Enable {
  914. client.Enable = true
  915. }
  916. now := time.Now().UnixMilli()
  917. if client.CreatedAt == 0 {
  918. client.CreatedAt = now
  919. }
  920. client.UpdatedAt = now
  921. le := strings.ToLower(email)
  922. if _, dup := seenEmail[le]; dup {
  923. skip(email, "email already in use: "+email)
  924. continue
  925. }
  926. if owner, ok := seenSubID[client.SubID]; ok && owner != le {
  927. skip(email, "subId already in use: "+client.SubID)
  928. continue
  929. }
  930. seenEmail[le] = struct{}{}
  931. seenSubID[client.SubID] = le
  932. prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
  933. emails = append(emails, email)
  934. subIDs = append(subIDs, client.SubID)
  935. }
  936. if len(prep) == 0 {
  937. return result, false, nil
  938. }
  939. db := database.GetDB()
  940. const lookupChunk = 400
  941. existingEmailSub := make(map[string]string, len(emails))
  942. for start := 0; start < len(emails); start += lookupChunk {
  943. end := min(start+lookupChunk, len(emails))
  944. var rows []model.ClientRecord
  945. if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
  946. return result, false, e
  947. }
  948. for i := range rows {
  949. existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
  950. }
  951. }
  952. existingSubOwner := make(map[string]string, len(subIDs))
  953. for start := 0; start < len(subIDs); start += lookupChunk {
  954. end := min(start+lookupChunk, len(subIDs))
  955. var rows []model.ClientRecord
  956. if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
  957. return result, false, e
  958. }
  959. for i := range rows {
  960. existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
  961. }
  962. }
  963. inboundCache := make(map[int]*model.Inbound)
  964. getIb := func(id int) (*model.Inbound, error) {
  965. if ib, ok := inboundCache[id]; ok {
  966. return ib, nil
  967. }
  968. ib, e := inboundSvc.GetInbound(id)
  969. if e != nil {
  970. return nil, e
  971. }
  972. inboundCache[id] = ib
  973. return ib, nil
  974. }
  975. byInbound := make(map[int][]model.Client)
  976. idxByInbound := make(map[int][]int)
  977. inboundOrder := make([]int, 0)
  978. failed := make([]bool, len(prep))
  979. reason := make([]string, len(prep))
  980. for idx := range prep {
  981. le := strings.ToLower(prep[idx].client.Email)
  982. if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
  983. failed[idx] = true
  984. reason[idx] = "email already in use: " + prep[idx].client.Email
  985. continue
  986. }
  987. if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
  988. failed[idx] = true
  989. reason[idx] = "subId already in use: " + prep[idx].client.SubID
  990. continue
  991. }
  992. ok := true
  993. for _, ibId := range prep[idx].inboundIds {
  994. ib, e := getIb(ibId)
  995. if e != nil {
  996. failed[idx] = true
  997. reason[idx] = e.Error()
  998. ok = false
  999. break
  1000. }
  1001. if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
  1002. failed[idx] = true
  1003. reason[idx] = e.Error()
  1004. ok = false
  1005. break
  1006. }
  1007. }
  1008. if !ok {
  1009. continue
  1010. }
  1011. for _, ibId := range prep[idx].inboundIds {
  1012. ib, _ := getIb(ibId)
  1013. if _, seen := byInbound[ibId]; !seen {
  1014. inboundOrder = append(inboundOrder, ibId)
  1015. }
  1016. byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
  1017. idxByInbound[ibId] = append(idxByInbound[ibId], idx)
  1018. }
  1019. }
  1020. needRestart := false
  1021. for _, ibId := range inboundOrder {
  1022. payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
  1023. if e == nil {
  1024. var nr bool
  1025. nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  1026. if e == nil && nr {
  1027. needRestart = true
  1028. }
  1029. }
  1030. if e != nil {
  1031. for _, idx := range idxByInbound[ibId] {
  1032. failed[idx] = true
  1033. if reason[idx] == "" {
  1034. reason[idx] = e.Error()
  1035. }
  1036. }
  1037. }
  1038. }
  1039. for idx := range prep {
  1040. if failed[idx] {
  1041. skip(prep[idx].client.Email, reason[idx])
  1042. } else {
  1043. result.Created++
  1044. }
  1045. }
  1046. return result, needRestart, nil
  1047. }
  1048. func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
  1049. db := database.GetDB()
  1050. now := time.Now().UnixMilli()
  1051. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  1052. var rows []xray.ClientTraffic
  1053. if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
  1054. return 0, false, err
  1055. }
  1056. if len(rows) == 0 {
  1057. return 0, false, nil
  1058. }
  1059. seen := make(map[string]struct{}, len(rows))
  1060. emails := make([]string, 0, len(rows))
  1061. for _, r := range rows {
  1062. if r.Email == "" {
  1063. continue
  1064. }
  1065. if _, ok := seen[r.Email]; ok {
  1066. continue
  1067. }
  1068. seen[r.Email] = struct{}{}
  1069. emails = append(emails, r.Email)
  1070. }
  1071. if len(emails) == 0 {
  1072. return 0, false, nil
  1073. }
  1074. res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
  1075. if err != nil {
  1076. return res.Deleted, needRestart, err
  1077. }
  1078. return res.Deleted, needRestart, nil
  1079. }