client_bulk.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/google/uuid"
  9. "github.com/mhsanaei/3x-ui/v3/internal/database"
  10. "github.com/mhsanaei/3x-ui/v3/internal/database/model"
  11. "github.com/mhsanaei/3x-ui/v3/internal/logger"
  12. "github.com/mhsanaei/3x-ui/v3/internal/util/common"
  13. "github.com/mhsanaei/3x-ui/v3/internal/xray"
  14. "gorm.io/gorm"
  15. )
  16. // BulkAttachResult reports the outcome of a bulk attach across target inbounds.
  17. type BulkAttachResult struct {
  18. Attached []string `json:"attached"`
  19. Skipped []string `json:"skipped"`
  20. Errors []string `json:"errors"`
  21. }
  22. // BulkAttach attaches the given existing clients (by email) to each target inbound,
  23. // reusing their identity (email/UUID/password/subId) and a shared traffic row. It adds
  24. // all clients to a target in a single AddInboundClient call, and reports clients already
  25. // present on a target as skipped.
  26. func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkAttachResult, bool, error) {
  27. result := &BulkAttachResult{}
  28. if len(emails) == 0 || len(inboundIds) == 0 {
  29. return result, false, nil
  30. }
  31. recordErr := func(format string, args ...any) {
  32. msg := fmt.Sprintf(format, args...)
  33. result.Errors = append(result.Errors, msg)
  34. logger.Warningf("[BulkAttach] %s", msg)
  35. }
  36. records := make([]*model.ClientRecord, 0, len(emails))
  37. seenEmail := make(map[string]struct{}, len(emails))
  38. for _, email := range emails {
  39. if email == "" {
  40. continue
  41. }
  42. key := strings.ToLower(email)
  43. if _, ok := seenEmail[key]; ok {
  44. continue
  45. }
  46. seenEmail[key] = struct{}{}
  47. rec, err := s.GetRecordByEmail(nil, email)
  48. if err != nil {
  49. recordErr("%s: %v", email, err)
  50. continue
  51. }
  52. records = append(records, rec)
  53. }
  54. emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs()
  55. if sidErr != nil {
  56. emailSubIDs = nil
  57. logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr)
  58. }
  59. needRestart := false
  60. for _, ibId := range inboundIds {
  61. inbound, err := inboundSvc.GetInbound(ibId)
  62. if err != nil {
  63. recordErr("inbound %d: %v", ibId, err)
  64. continue
  65. }
  66. existingClients, err := inboundSvc.GetClients(inbound)
  67. if err != nil {
  68. recordErr("inbound %d: %v", ibId, err)
  69. continue
  70. }
  71. have := make(map[string]struct{}, len(existingClients))
  72. for _, c := range existingClients {
  73. have[strings.ToLower(c.Email)] = struct{}{}
  74. }
  75. clientsToAdd := make([]model.Client, 0, len(records))
  76. for _, rec := range records {
  77. if _, attached := have[strings.ToLower(rec.Email)]; attached {
  78. result.Skipped = append(result.Skipped, rec.Email)
  79. continue
  80. }
  81. client := *rec.ToClient()
  82. client.UpdatedAt = time.Now().UnixMilli()
  83. if err := s.fillProtocolDefaults(&client, inbound); err != nil {
  84. recordErr("%s -> inbound %d: %v", rec.Email, ibId, err)
  85. continue
  86. }
  87. clientsToAdd = append(clientsToAdd, clientWithInboundFlow(client, inbound))
  88. }
  89. if len(clientsToAdd) == 0 {
  90. continue
  91. }
  92. payload, err := json.Marshal(map[string][]model.Client{"clients": clientsToAdd})
  93. if err != nil {
  94. recordErr("inbound %d: %v", ibId, err)
  95. continue
  96. }
  97. nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  98. if err != nil {
  99. recordErr("inbound %d: %v", ibId, err)
  100. continue
  101. }
  102. if nr {
  103. needRestart = true
  104. }
  105. for _, c := range clientsToAdd {
  106. result.Attached = append(result.Attached, c.Email)
  107. }
  108. }
  109. return result, needRestart, nil
  110. }
  111. // BulkDetachResult reports the outcome of a bulk detach across target inbounds.
  112. type BulkDetachResult struct {
  113. Detached []string `json:"detached"`
  114. Skipped []string `json:"skipped"`
  115. Errors []string `json:"errors"`
  116. }
  117. // BulkDetach detaches the given existing clients (by email) from each target inbound.
  118. // (email, inbound) pairs where the client is not currently attached are silently skipped
  119. // at the inbound level; emails that aren't attached to any of the requested inbounds
  120. // are reported under skipped. ClientRecord rows are kept even when they become orphaned
  121. // (matches single-client detach semantics); callers should use bulkDelete for full removal.
  122. func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, inboundIds []int) (*BulkDetachResult, bool, error) {
  123. result := &BulkDetachResult{}
  124. if len(emails) == 0 || len(inboundIds) == 0 {
  125. return result, false, nil
  126. }
  127. recordErr := func(format string, args ...any) {
  128. msg := fmt.Sprintf(format, args...)
  129. result.Errors = append(result.Errors, msg)
  130. logger.Warningf("[BulkDetach] %s", msg)
  131. }
  132. requested := make(map[int]struct{}, len(inboundIds))
  133. for _, id := range inboundIds {
  134. requested[id] = struct{}{}
  135. }
  136. recsByInbound := make(map[int][]*model.ClientRecord)
  137. emailOrder := make([]string, 0, len(emails))
  138. emailRepr := make(map[string]string, len(emails))
  139. emailFailed := make(map[string]bool, len(emails))
  140. seenEmail := make(map[string]struct{}, len(emails))
  141. for _, email := range emails {
  142. if email == "" {
  143. continue
  144. }
  145. key := strings.ToLower(email)
  146. if _, ok := seenEmail[key]; ok {
  147. continue
  148. }
  149. seenEmail[key] = struct{}{}
  150. rec, err := s.GetRecordByEmail(nil, email)
  151. if err != nil {
  152. recordErr("%s: %v", email, err)
  153. continue
  154. }
  155. currentIds, err := s.GetInboundIdsForRecord(rec.Id)
  156. if err != nil {
  157. recordErr("%s: %v", email, err)
  158. continue
  159. }
  160. matched := false
  161. for _, id := range currentIds {
  162. if _, ok := requested[id]; ok {
  163. recsByInbound[id] = append(recsByInbound[id], rec)
  164. matched = true
  165. }
  166. }
  167. if !matched {
  168. result.Skipped = append(result.Skipped, rec.Email)
  169. continue
  170. }
  171. emailOrder = append(emailOrder, key)
  172. emailRepr[key] = rec.Email
  173. }
  174. needRestart := false
  175. for _, ibId := range inboundIds {
  176. recs, ok := recsByInbound[ibId]
  177. if !ok {
  178. continue
  179. }
  180. delete(recsByInbound, ibId)
  181. nr, err := s.delInboundClients(inboundSvc, ibId, recs, true)
  182. if err != nil {
  183. recordErr("inbound %d: %v", ibId, err)
  184. for _, rec := range recs {
  185. emailFailed[strings.ToLower(rec.Email)] = true
  186. }
  187. continue
  188. }
  189. if nr {
  190. needRestart = true
  191. }
  192. }
  193. for _, key := range emailOrder {
  194. if emailFailed[key] {
  195. continue
  196. }
  197. result.Detached = append(result.Detached, emailRepr[key])
  198. }
  199. return result, needRestart, nil
  200. }
  201. // BulkAdjustResult is returned by BulkAdjust to report how many clients were
  202. // successfully updated and which were skipped (typically because the field
  203. // being adjusted was unlimited for that client) or failed.
  204. type BulkAdjustResult struct {
  205. Adjusted int `json:"adjusted"`
  206. Skipped []BulkAdjustReport `json:"skipped,omitempty"`
  207. }
  208. type BulkAdjustReport struct {
  209. Email string `json:"email"`
  210. Reason string `json:"reason"`
  211. }
  212. type bulkAdjustEntry struct {
  213. record *model.ClientRecord
  214. applyExpiry bool
  215. newExpiry int64
  216. applyTotal bool
  217. newTotal int64
  218. }
  219. // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
  220. // for every email in the list. Clients whose corresponding field is
  221. // unlimited (0) are skipped — bulk extend should not accidentally
  222. // limit an unlimited client. addDays and addBytes may be negative.
  223. //
  224. // Like BulkDelete, the work is grouped by inbound so each inbound's
  225. // settings JSON is parsed and written exactly once regardless of how
  226. // many target emails it contains.
  227. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
  228. result := BulkAdjustResult{}
  229. if len(emails) == 0 {
  230. return result, false, nil
  231. }
  232. if addDays == 0 && addBytes == 0 {
  233. return result, false, common.NewError("no adjustment specified")
  234. }
  235. addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
  236. seen := map[string]struct{}{}
  237. cleanEmails := make([]string, 0, len(emails))
  238. for _, e := range emails {
  239. e = strings.TrimSpace(e)
  240. if e == "" {
  241. continue
  242. }
  243. if _, ok := seen[e]; ok {
  244. continue
  245. }
  246. seen[e] = struct{}{}
  247. cleanEmails = append(cleanEmails, e)
  248. }
  249. if len(cleanEmails) == 0 {
  250. return result, false, nil
  251. }
  252. db := database.GetDB()
  253. var records []model.ClientRecord
  254. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  255. var rows []model.ClientRecord
  256. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  257. return result, false, err
  258. }
  259. records = append(records, rows...)
  260. }
  261. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  262. for i := range records {
  263. recordsByEmail[records[i].Email] = &records[i]
  264. }
  265. skippedReasons := map[string]string{}
  266. for _, email := range cleanEmails {
  267. if _, ok := recordsByEmail[email]; !ok {
  268. skippedReasons[email] = "client not found"
  269. }
  270. }
  271. plan := map[string]*bulkAdjustEntry{}
  272. for email, rec := range recordsByEmail {
  273. entry := &bulkAdjustEntry{record: rec}
  274. if addDays != 0 {
  275. switch {
  276. case rec.ExpiryTime == 0:
  277. if _, exists := skippedReasons[email]; !exists {
  278. skippedReasons[email] = "unlimited expiry"
  279. }
  280. case rec.ExpiryTime > 0:
  281. next := rec.ExpiryTime + addExpiryMs
  282. if next <= 0 {
  283. if _, exists := skippedReasons[email]; !exists {
  284. skippedReasons[email] = "reduction exceeds remaining time"
  285. }
  286. } else {
  287. entry.applyExpiry = true
  288. entry.newExpiry = next
  289. }
  290. default:
  291. next := rec.ExpiryTime - addExpiryMs
  292. if next >= 0 {
  293. if _, exists := skippedReasons[email]; !exists {
  294. skippedReasons[email] = "reduction exceeds delay window"
  295. }
  296. } else {
  297. entry.applyExpiry = true
  298. entry.newExpiry = next
  299. }
  300. }
  301. }
  302. if addBytes != 0 {
  303. if rec.TotalGB == 0 {
  304. if _, exists := skippedReasons[email]; !exists {
  305. skippedReasons[email] = "unlimited traffic"
  306. }
  307. } else {
  308. next := max(rec.TotalGB+addBytes, 0)
  309. entry.applyTotal = true
  310. entry.newTotal = next
  311. }
  312. }
  313. if entry.applyExpiry || entry.applyTotal {
  314. plan[email] = entry
  315. }
  316. }
  317. if len(plan) == 0 {
  318. for email, reason := range skippedReasons {
  319. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  320. }
  321. return result, false, nil
  322. }
  323. plannedIds := make([]int, 0, len(plan))
  324. recordIdToEmail := make(map[int]string, len(plan))
  325. for email, entry := range plan {
  326. plannedIds = append(plannedIds, entry.record.Id)
  327. recordIdToEmail[entry.record.Id] = email
  328. }
  329. var mappings []model.ClientInbound
  330. for _, batch := range chunkInts(plannedIds, sqlInChunk) {
  331. var rows []model.ClientInbound
  332. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  333. return result, false, err
  334. }
  335. mappings = append(mappings, rows...)
  336. }
  337. emailsByInbound := map[int][]string{}
  338. for _, m := range mappings {
  339. email, ok := recordIdToEmail[m.ClientId]
  340. if !ok {
  341. continue
  342. }
  343. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  344. }
  345. needRestart := false
  346. for inboundId, ibEmails := range emailsByInbound {
  347. ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
  348. if ibRes.needRestart {
  349. needRestart = true
  350. }
  351. for email, reason := range ibRes.perEmailSkipped {
  352. if _, already := skippedReasons[email]; !already {
  353. skippedReasons[email] = reason
  354. }
  355. }
  356. }
  357. for email, entry := range plan {
  358. if _, skipped := skippedReasons[email]; skipped {
  359. continue
  360. }
  361. updates := map[string]any{}
  362. if entry.applyExpiry {
  363. updates["expiry_time"] = entry.newExpiry
  364. }
  365. if entry.applyTotal {
  366. updates["total"] = entry.newTotal
  367. }
  368. if len(updates) == 0 {
  369. continue
  370. }
  371. if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
  372. if _, already := skippedReasons[email]; !already {
  373. skippedReasons[email] = err.Error()
  374. }
  375. continue
  376. }
  377. result.Adjusted++
  378. }
  379. for email, reason := range skippedReasons {
  380. result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
  381. }
  382. return result, needRestart, nil
  383. }
  384. type bulkInboundAdjustResult struct {
  385. perEmailSkipped map[string]string
  386. needRestart bool
  387. }
  388. // bulkAdjustInboundClients applies expiry/total deltas to multiple clients
  389. // inside a single inbound's settings JSON. The xray runtime is updated
  390. // only for remote-node inbounds; local nodes do not need a notification
  391. // because the AddUser payload does not include totalGB/expiryTime —
  392. // changing those fields is identity-preserving and the panel's traffic
  393. // enforcement loop picks up the new limits from ClientTraffic directly.
  394. func (s *ClientService) bulkAdjustInboundClients(
  395. inboundSvc *InboundService,
  396. inboundId int,
  397. emails []string,
  398. plan map[string]*bulkAdjustEntry,
  399. ) bulkInboundAdjustResult {
  400. res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
  401. defer lockInbound(inboundId).Unlock()
  402. oldInbound, err := inboundSvc.GetInbound(inboundId)
  403. if err != nil {
  404. logger.Error("Load Old Data Error")
  405. for _, e := range emails {
  406. res.perEmailSkipped[e] = err.Error()
  407. }
  408. return res
  409. }
  410. var settings map[string]any
  411. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  412. for _, e := range emails {
  413. res.perEmailSkipped[e] = err.Error()
  414. }
  415. return res
  416. }
  417. // Match by email — the client's stable identity (see Delete). Credentials
  418. // can drift from the inbound JSON, so they are never used for matching.
  419. wantedEmails := make(map[string]struct{}, len(emails))
  420. for _, email := range emails {
  421. if plan[email] == nil {
  422. res.perEmailSkipped[email] = "client not found"
  423. continue
  424. }
  425. wantedEmails[email] = struct{}{}
  426. }
  427. interfaceClients, _ := settings["clients"].([]any)
  428. foundEmails := map[string]bool{}
  429. nowMs := time.Now().Unix() * 1000
  430. for i, client := range interfaceClients {
  431. c, ok := client.(map[string]any)
  432. if !ok {
  433. continue
  434. }
  435. targetEmail, _ := c["email"].(string)
  436. if _, want := wantedEmails[targetEmail]; !want || targetEmail == "" {
  437. continue
  438. }
  439. entry := plan[targetEmail]
  440. if entry.applyExpiry {
  441. c["expiryTime"] = entry.newExpiry
  442. }
  443. if entry.applyTotal {
  444. c["totalGB"] = entry.newTotal
  445. }
  446. c["updated_at"] = nowMs
  447. interfaceClients[i] = c
  448. foundEmails[targetEmail] = true
  449. }
  450. for email := range wantedEmails {
  451. if !foundEmails[email] {
  452. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  453. }
  454. }
  455. if len(foundEmails) == 0 {
  456. return res
  457. }
  458. settings["clients"] = interfaceClients
  459. newSettings, err := json.MarshalIndent(settings, "", " ")
  460. if err != nil {
  461. for email := range foundEmails {
  462. res.perEmailSkipped[email] = err.Error()
  463. }
  464. return res
  465. }
  466. oldInbound.Settings = string(newSettings)
  467. markDirty := false
  468. if oldInbound.NodeID != nil {
  469. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  470. if perr != nil {
  471. for email := range foundEmails {
  472. res.perEmailSkipped[email] = perr.Error()
  473. delete(foundEmails, email)
  474. }
  475. } else {
  476. if dirty {
  477. markDirty = true
  478. }
  479. // Large batches collapse into one reconcile push rather than M updates.
  480. if push && len(foundEmails) > nodeBulkPushThreshold {
  481. markDirty = true
  482. push = false
  483. }
  484. if push {
  485. for email := range foundEmails {
  486. entry := plan[email]
  487. updated := *entry.record.ToClient()
  488. if entry.applyExpiry {
  489. updated.ExpiryTime = entry.newExpiry
  490. }
  491. if entry.applyTotal {
  492. updated.TotalGB = entry.newTotal
  493. }
  494. updated.UpdatedAt = nowMs
  495. if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
  496. logger.Warning("Error in updating client on", rt.Name(), ":", err1)
  497. markDirty = true
  498. }
  499. }
  500. }
  501. }
  502. }
  503. // Serialize against the traffic poll to avoid the cross-transaction
  504. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  505. txErr := runSerializedTx(func(tx *gorm.DB) error {
  506. if err := tx.Save(oldInbound).Error; err != nil {
  507. return err
  508. }
  509. finalClients, gcErr := inboundSvc.GetClients(oldInbound)
  510. if gcErr != nil {
  511. return gcErr
  512. }
  513. return s.SyncInbound(tx, inboundId, finalClients)
  514. })
  515. if txErr != nil {
  516. for email := range foundEmails {
  517. if _, skip := res.perEmailSkipped[email]; !skip {
  518. res.perEmailSkipped[email] = txErr.Error()
  519. }
  520. }
  521. } else if markDirty && oldInbound.NodeID != nil {
  522. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  523. logger.Warning("mark node dirty failed:", dErr)
  524. }
  525. }
  526. return res
  527. }
  528. // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
  529. // skip reasons when an email could not be processed.
  530. type BulkDeleteResult struct {
  531. Deleted int `json:"deleted"`
  532. Skipped []BulkDeleteReport `json:"skipped,omitempty"`
  533. }
  534. type BulkDeleteReport struct {
  535. Email string `json:"email"`
  536. Reason string `json:"reason"`
  537. }
  538. // BulkDelete removes every client in the list in one optimized pass.
  539. // Instead of running the full single-delete pipeline N times (which would
  540. // re-read, re-parse, and re-write each inbound's settings JSON for every
  541. // email), it groups emails by inbound and performs a single
  542. // read-modify-write per inbound. Per-row DB cleanups are also batched with
  543. // IN-clause queries at the end. Errors on a particular email are recorded
  544. // in the Skipped list and processing continues for the rest.
  545. func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
  546. result := BulkDeleteResult{}
  547. seen := map[string]struct{}{}
  548. cleanEmails := make([]string, 0, len(emails))
  549. for _, e := range emails {
  550. e = strings.TrimSpace(e)
  551. if e == "" {
  552. continue
  553. }
  554. if _, ok := seen[e]; ok {
  555. continue
  556. }
  557. seen[e] = struct{}{}
  558. cleanEmails = append(cleanEmails, e)
  559. }
  560. if len(cleanEmails) == 0 {
  561. return result, false, nil
  562. }
  563. db := database.GetDB()
  564. var records []model.ClientRecord
  565. for _, batch := range chunkStrings(cleanEmails, sqlInChunk) {
  566. var rows []model.ClientRecord
  567. if err := db.Where("email IN ?", batch).Find(&rows).Error; err != nil {
  568. return result, false, err
  569. }
  570. records = append(records, rows...)
  571. }
  572. recordsByEmail := make(map[string]*model.ClientRecord, len(records))
  573. tombstoneEmails := make([]string, 0, len(records))
  574. for i := range records {
  575. recordsByEmail[records[i].Email] = &records[i]
  576. tombstoneEmails = append(tombstoneEmails, records[i].Email)
  577. }
  578. tombstoneClientEmails(tombstoneEmails)
  579. skippedReasons := map[string]string{}
  580. for _, email := range cleanEmails {
  581. if _, ok := recordsByEmail[email]; !ok {
  582. skippedReasons[email] = "client not found"
  583. }
  584. }
  585. clientIds := make([]int, 0, len(recordsByEmail))
  586. recordIdToEmail := make(map[int]string, len(recordsByEmail))
  587. for _, r := range recordsByEmail {
  588. clientIds = append(clientIds, r.Id)
  589. recordIdToEmail[r.Id] = r.Email
  590. }
  591. emailsByInbound := map[int][]string{}
  592. if len(clientIds) > 0 {
  593. var mappings []model.ClientInbound
  594. for _, batch := range chunkInts(clientIds, sqlInChunk) {
  595. var rows []model.ClientInbound
  596. if err := db.Where("client_id IN ?", batch).Find(&rows).Error; err != nil {
  597. return result, false, err
  598. }
  599. mappings = append(mappings, rows...)
  600. }
  601. for _, m := range mappings {
  602. email, ok := recordIdToEmail[m.ClientId]
  603. if !ok {
  604. continue
  605. }
  606. emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
  607. }
  608. }
  609. needRestart := false
  610. for inboundId, ibEmails := range emailsByInbound {
  611. ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail, false)
  612. if ibResult.needRestart {
  613. needRestart = true
  614. }
  615. for email, reason := range ibResult.perEmailSkipped {
  616. if _, already := skippedReasons[email]; !already {
  617. skippedReasons[email] = reason
  618. }
  619. }
  620. }
  621. successEmails := make([]string, 0, len(recordsByEmail))
  622. successIds := make([]int, 0, len(recordsByEmail))
  623. for email, rec := range recordsByEmail {
  624. if _, skipped := skippedReasons[email]; skipped {
  625. continue
  626. }
  627. successEmails = append(successEmails, email)
  628. successIds = append(successIds, rec.Id)
  629. }
  630. if len(successIds) > 0 {
  631. // Serialize the row cleanup against the traffic poll to avoid the
  632. // cross-transaction lock-order deadlock on client_traffics/inbounds.
  633. if err := runSerializedTx(func(tx *gorm.DB) error {
  634. for _, batch := range chunkInts(successIds, sqlInChunk) {
  635. if e := tx.Where("client_id IN ?", batch).Delete(&model.ClientInbound{}).Error; e != nil {
  636. return e
  637. }
  638. }
  639. if !keepTraffic && len(successEmails) > 0 {
  640. for _, batch := range chunkStrings(successEmails, sqlInChunk) {
  641. if e := tx.Where("email IN ?", batch).Delete(&xray.ClientTraffic{}).Error; e != nil {
  642. return e
  643. }
  644. if e := tx.Where("client_email IN ?", batch).Delete(&model.InboundClientIps{}).Error; e != nil {
  645. return e
  646. }
  647. }
  648. }
  649. for _, batch := range chunkInts(successIds, sqlInChunk) {
  650. if e := tx.Where("id IN ?", batch).Delete(&model.ClientRecord{}).Error; e != nil {
  651. return e
  652. }
  653. }
  654. return nil
  655. }); err != nil {
  656. return result, needRestart, err
  657. }
  658. }
  659. result.Deleted = len(successEmails)
  660. for email, reason := range skippedReasons {
  661. result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
  662. }
  663. return result, needRestart, nil
  664. }
  665. type bulkInboundDeleteResult struct {
  666. perEmailSkipped map[string]string
  667. needRestart bool
  668. }
  669. // bulkDelInboundClients removes multiple clients from a single inbound's
  670. // settings JSON in one read-modify-write cycle, runs the xray runtime
  671. // RemoveUser/DeleteUser calls, and persists the inbound. The returned map
  672. // holds per-email failure reasons; emails not present in the map are
  673. // considered successful for this inbound.
  674. func (s *ClientService) bulkDelInboundClients(
  675. inboundSvc *InboundService,
  676. inboundId int,
  677. emails []string,
  678. records map[string]*model.ClientRecord,
  679. keepTraffic bool,
  680. ) bulkInboundDeleteResult {
  681. res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
  682. defer lockInbound(inboundId).Unlock()
  683. oldInbound, err := inboundSvc.GetInbound(inboundId)
  684. if err != nil {
  685. logger.Error("Load Old Data Error")
  686. for _, e := range emails {
  687. res.perEmailSkipped[e] = err.Error()
  688. }
  689. return res
  690. }
  691. var settings map[string]any
  692. if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
  693. for _, e := range emails {
  694. res.perEmailSkipped[e] = err.Error()
  695. }
  696. return res
  697. }
  698. // Match by email — the client's stable identity (see Delete). Removes every
  699. // entry carrying a wanted email, independent of credential drift.
  700. wantedEmails := make(map[string]struct{}, len(emails))
  701. for _, email := range emails {
  702. if records[email] == nil {
  703. res.perEmailSkipped[email] = "client not found"
  704. continue
  705. }
  706. wantedEmails[email] = struct{}{}
  707. }
  708. interfaceClients, _ := settings["clients"].([]any)
  709. newClients := make([]any, 0, len(interfaceClients))
  710. foundEmails := map[string]bool{}
  711. enableByEmail := map[string]bool{}
  712. for _, client := range interfaceClients {
  713. c, ok := client.(map[string]any)
  714. if !ok {
  715. newClients = append(newClients, client)
  716. continue
  717. }
  718. em, _ := c["email"].(string)
  719. if _, found := wantedEmails[em]; found && em != "" {
  720. foundEmails[em] = true
  721. en, _ := c["enable"].(bool)
  722. enableByEmail[em] = en
  723. continue
  724. }
  725. newClients = append(newClients, client)
  726. }
  727. for email := range wantedEmails {
  728. if !foundEmails[email] {
  729. res.perEmailSkipped[email] = "Client Not Found In Inbound"
  730. }
  731. }
  732. db := database.GetDB()
  733. newClients = compactOrphans(db, newClients)
  734. if newClients == nil {
  735. newClients = []any{}
  736. }
  737. settings["clients"] = newClients
  738. newSettings, err := json.MarshalIndent(settings, "", " ")
  739. if err != nil {
  740. for email := range foundEmails {
  741. if _, skip := res.perEmailSkipped[email]; !skip {
  742. res.perEmailSkipped[email] = err.Error()
  743. }
  744. }
  745. return res
  746. }
  747. oldInbound.Settings = string(newSettings)
  748. foundList := make([]string, 0, len(foundEmails))
  749. for email := range foundEmails {
  750. foundList = append(foundList, email)
  751. }
  752. notDepletedByEmail := map[string]bool{}
  753. if len(foundList) > 0 {
  754. type trafficRow struct {
  755. Email string
  756. Enable bool
  757. }
  758. for _, batch := range chunkStrings(foundList, sqlInChunk) {
  759. var rows []trafficRow
  760. if err := db.Model(xray.ClientTraffic{}).
  761. Where("email IN ?", batch).
  762. Select("email, enable").
  763. Scan(&rows).Error; err == nil {
  764. for _, r := range rows {
  765. notDepletedByEmail[r.Email] = r.Enable
  766. }
  767. }
  768. }
  769. }
  770. var sharedSet map[string]bool
  771. if !keepTraffic {
  772. var sharedErr error
  773. sharedSet, sharedErr = inboundSvc.emailsUsedByOtherInbounds(foundList, inboundId)
  774. if sharedErr != nil {
  775. for email := range foundEmails {
  776. res.perEmailSkipped[email] = sharedErr.Error()
  777. delete(foundEmails, email)
  778. }
  779. return res
  780. }
  781. }
  782. if !keepTraffic {
  783. purge := make([]string, 0, len(foundEmails))
  784. for email := range foundEmails {
  785. if !sharedSet[strings.ToLower(strings.TrimSpace(email))] {
  786. purge = append(purge, email)
  787. }
  788. }
  789. if len(purge) > 0 {
  790. // Serialize the IP/stat purge against the traffic poll to avoid the
  791. // cross-transaction lock-order deadlock on client_traffics.
  792. if delErr := runSerializedTx(func(tx *gorm.DB) error {
  793. if e := inboundSvc.delClientIPsByEmails(tx, purge); e != nil {
  794. logger.Error("Error in delete client IPs")
  795. return e
  796. }
  797. if e := inboundSvc.delClientStatsByEmails(tx, purge); e != nil {
  798. logger.Error("Delete stats Data Error")
  799. return e
  800. }
  801. return nil
  802. }); delErr != nil {
  803. for _, email := range purge {
  804. res.perEmailSkipped[email] = delErr.Error()
  805. delete(foundEmails, email)
  806. }
  807. }
  808. }
  809. }
  810. markDirty := false
  811. if oldInbound.NodeID == nil {
  812. rt, rterr := inboundSvc.runtimeFor(oldInbound)
  813. if rterr != nil {
  814. res.needRestart = true
  815. } else {
  816. for email := range foundEmails {
  817. if !enableByEmail[email] || !notDepletedByEmail[email] {
  818. continue
  819. }
  820. err1 := rt.RemoveUser(context.Background(), oldInbound, email)
  821. if err1 == nil {
  822. logger.Debug("Client deleted on", rt.Name(), ":", email)
  823. } else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
  824. logger.Debug("User is already deleted. Nothing to do more...")
  825. } else {
  826. logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
  827. res.needRestart = true
  828. }
  829. }
  830. }
  831. } else {
  832. rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
  833. if perr != nil {
  834. for email := range foundEmails {
  835. res.perEmailSkipped[email] = perr.Error()
  836. delete(foundEmails, email)
  837. }
  838. } else {
  839. if dirty {
  840. markDirty = true
  841. }
  842. // Large batches collapse into one reconcile push rather than M deletes.
  843. if push && len(foundEmails) > nodeBulkPushThreshold {
  844. markDirty = true
  845. push = false
  846. }
  847. if push {
  848. for email := range foundEmails {
  849. if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
  850. logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
  851. markDirty = true
  852. }
  853. }
  854. }
  855. }
  856. }
  857. // Serialize against the traffic poll to avoid the cross-transaction
  858. // lock-order deadlock on inbounds/client_records (runSerializedTx).
  859. txErr := runSerializedTx(func(tx *gorm.DB) error {
  860. if err := tx.Save(oldInbound).Error; err != nil {
  861. return err
  862. }
  863. finalClients, err := inboundSvc.GetClients(oldInbound)
  864. if err != nil {
  865. return err
  866. }
  867. return s.SyncInbound(tx, inboundId, finalClients)
  868. })
  869. if txErr != nil {
  870. for email := range foundEmails {
  871. if _, skip := res.perEmailSkipped[email]; !skip {
  872. res.perEmailSkipped[email] = txErr.Error()
  873. }
  874. }
  875. } else if markDirty && oldInbound.NodeID != nil {
  876. if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
  877. logger.Warning("mark node dirty failed:", dErr)
  878. }
  879. }
  880. return res
  881. }
  882. // BulkCreateResult mirrors BulkAdjustResult for the create flow.
  883. type BulkCreateResult struct {
  884. Created int `json:"created"`
  885. Skipped []BulkCreateReport `json:"skipped,omitempty"`
  886. }
  887. type BulkCreateReport struct {
  888. Email string `json:"email"`
  889. Reason string `json:"reason"`
  890. }
  891. func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
  892. result := BulkCreateResult{}
  893. if len(payloads) == 0 {
  894. return result, false, nil
  895. }
  896. skip := func(email, reason string) {
  897. if strings.TrimSpace(email) == "" {
  898. email = "(missing email)"
  899. }
  900. result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: reason})
  901. }
  902. emailSubIDs, err := inboundSvc.getAllEmailSubIDs()
  903. if err != nil {
  904. emailSubIDs = nil
  905. }
  906. type prepared struct {
  907. client model.Client
  908. inboundIds []int
  909. }
  910. prep := make([]prepared, 0, len(payloads))
  911. emails := make([]string, 0, len(payloads))
  912. subIDs := make([]string, 0, len(payloads))
  913. seenEmail := make(map[string]struct{}, len(payloads))
  914. seenSubID := make(map[string]string, len(payloads))
  915. for i := range payloads {
  916. client := payloads[i].Client
  917. email := strings.TrimSpace(client.Email)
  918. if email == "" {
  919. skip("", "client email is required")
  920. continue
  921. }
  922. if verr := validateClientEmail(email); verr != nil {
  923. skip(email, verr.Error())
  924. continue
  925. }
  926. if verr := validateClientSubID(client.SubID); verr != nil {
  927. skip(email, verr.Error())
  928. continue
  929. }
  930. if len(payloads[i].InboundIds) == 0 {
  931. skip(email, "at least one inbound is required")
  932. continue
  933. }
  934. client.Email = email
  935. if client.SubID == "" {
  936. client.SubID = uuid.NewString()
  937. }
  938. if !client.Enable {
  939. client.Enable = true
  940. }
  941. now := time.Now().UnixMilli()
  942. if client.CreatedAt == 0 {
  943. client.CreatedAt = now
  944. }
  945. client.UpdatedAt = now
  946. le := strings.ToLower(email)
  947. if _, dup := seenEmail[le]; dup {
  948. skip(email, "email already in use: "+email)
  949. continue
  950. }
  951. if owner, ok := seenSubID[client.SubID]; ok && owner != le {
  952. skip(email, "subId already in use: "+client.SubID)
  953. continue
  954. }
  955. seenEmail[le] = struct{}{}
  956. seenSubID[client.SubID] = le
  957. prep = append(prep, prepared{client: client, inboundIds: payloads[i].InboundIds})
  958. emails = append(emails, email)
  959. subIDs = append(subIDs, client.SubID)
  960. }
  961. if len(prep) == 0 {
  962. return result, false, nil
  963. }
  964. db := database.GetDB()
  965. const lookupChunk = 400
  966. existingEmailSub := make(map[string]string, len(emails))
  967. for start := 0; start < len(emails); start += lookupChunk {
  968. end := min(start+lookupChunk, len(emails))
  969. var rows []model.ClientRecord
  970. if e := db.Where("email IN ?", emails[start:end]).Find(&rows).Error; e != nil {
  971. return result, false, e
  972. }
  973. for i := range rows {
  974. existingEmailSub[strings.ToLower(rows[i].Email)] = rows[i].SubID
  975. }
  976. }
  977. existingSubOwner := make(map[string]string, len(subIDs))
  978. for start := 0; start < len(subIDs); start += lookupChunk {
  979. end := min(start+lookupChunk, len(subIDs))
  980. var rows []model.ClientRecord
  981. if e := db.Where("sub_id IN ?", subIDs[start:end]).Find(&rows).Error; e != nil {
  982. return result, false, e
  983. }
  984. for i := range rows {
  985. existingSubOwner[rows[i].SubID] = strings.ToLower(rows[i].Email)
  986. }
  987. }
  988. inboundCache := make(map[int]*model.Inbound)
  989. getIb := func(id int) (*model.Inbound, error) {
  990. if ib, ok := inboundCache[id]; ok {
  991. return ib, nil
  992. }
  993. ib, e := inboundSvc.GetInbound(id)
  994. if e != nil {
  995. return nil, e
  996. }
  997. inboundCache[id] = ib
  998. return ib, nil
  999. }
  1000. byInbound := make(map[int][]model.Client)
  1001. idxByInbound := make(map[int][]int)
  1002. inboundOrder := make([]int, 0)
  1003. failed := make([]bool, len(prep))
  1004. reason := make([]string, len(prep))
  1005. for idx := range prep {
  1006. le := strings.ToLower(prep[idx].client.Email)
  1007. if existSub, ok := existingEmailSub[le]; ok && existSub != prep[idx].client.SubID {
  1008. failed[idx] = true
  1009. reason[idx] = "email already in use: " + prep[idx].client.Email
  1010. continue
  1011. }
  1012. if owner, ok := existingSubOwner[prep[idx].client.SubID]; ok && owner != le {
  1013. failed[idx] = true
  1014. reason[idx] = "subId already in use: " + prep[idx].client.SubID
  1015. continue
  1016. }
  1017. ok := true
  1018. for _, ibId := range prep[idx].inboundIds {
  1019. ib, e := getIb(ibId)
  1020. if e != nil {
  1021. failed[idx] = true
  1022. reason[idx] = e.Error()
  1023. ok = false
  1024. break
  1025. }
  1026. if e := s.fillProtocolDefaults(&prep[idx].client, ib); e != nil {
  1027. failed[idx] = true
  1028. reason[idx] = e.Error()
  1029. ok = false
  1030. break
  1031. }
  1032. }
  1033. if !ok {
  1034. continue
  1035. }
  1036. for _, ibId := range prep[idx].inboundIds {
  1037. ib, _ := getIb(ibId)
  1038. if _, seen := byInbound[ibId]; !seen {
  1039. inboundOrder = append(inboundOrder, ibId)
  1040. }
  1041. byInbound[ibId] = append(byInbound[ibId], clientWithInboundFlow(prep[idx].client, ib))
  1042. idxByInbound[ibId] = append(idxByInbound[ibId], idx)
  1043. }
  1044. }
  1045. needRestart := false
  1046. for _, ibId := range inboundOrder {
  1047. payload, e := json.Marshal(map[string][]model.Client{"clients": byInbound[ibId]})
  1048. if e == nil {
  1049. var nr bool
  1050. nr, e = s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs)
  1051. if e == nil && nr {
  1052. needRestart = true
  1053. }
  1054. }
  1055. if e != nil {
  1056. for _, idx := range idxByInbound[ibId] {
  1057. failed[idx] = true
  1058. if reason[idx] == "" {
  1059. reason[idx] = e.Error()
  1060. }
  1061. }
  1062. }
  1063. }
  1064. for idx := range prep {
  1065. if failed[idx] {
  1066. skip(prep[idx].client.Email, reason[idx])
  1067. } else {
  1068. result.Created++
  1069. }
  1070. }
  1071. return result, needRestart, nil
  1072. }
  1073. func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
  1074. db := database.GetDB()
  1075. now := time.Now().UnixMilli()
  1076. depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
  1077. var rows []xray.ClientTraffic
  1078. if err := db.Where(depletedClause, now).Find(&rows).Error; err != nil {
  1079. return 0, false, err
  1080. }
  1081. if len(rows) == 0 {
  1082. return 0, false, nil
  1083. }
  1084. seen := make(map[string]struct{}, len(rows))
  1085. emails := make([]string, 0, len(rows))
  1086. for _, r := range rows {
  1087. if r.Email == "" {
  1088. continue
  1089. }
  1090. if _, ok := seen[r.Email]; ok {
  1091. continue
  1092. }
  1093. seen[r.Email] = struct{}{}
  1094. emails = append(emails, r.Email)
  1095. }
  1096. if len(emails) == 0 {
  1097. return 0, false, nil
  1098. }
  1099. res, needRestart, err := s.BulkDelete(inboundSvc, emails, false)
  1100. if err != nil {
  1101. return res.Deleted, needRestart, err
  1102. }
  1103. return res.Deleted, needRestart, nil
  1104. }